Merge branch 'develop' into feature/multimodel_memory
# Conflicts: # api/app/core/memory/storage_services/extraction_engine/knowledge_extraction/embedding_generation.py # api/app/repositories/neo4j/add_nodes.py # api/app/repositories/neo4j/cypher_queries.py # api/app/repositories/neo4j/graph_saver.py # api/app/services/memory_agent_service.py # api/app/services/multimodal_service.py
This commit is contained in:
@@ -90,27 +90,27 @@ class ConversationRepository:
|
||||
self,
|
||||
user_id: uuid.UUID,
|
||||
workspace_id: uuid.UUID = None,
|
||||
limit: int = 10,
|
||||
is_activate: bool = True
|
||||
) -> list[Conversation]:
|
||||
is_activate: bool = True,
|
||||
page: int = 1,
|
||||
page_size: int = 20
|
||||
) -> tuple[list[Conversation], int]:
|
||||
"""
|
||||
Retrieve recent conversations for a specific user.
|
||||
Retrieve recent conversations for a specific user with pagination.
|
||||
|
||||
This method queries conversations associated with the given user ID,
|
||||
optionally scoped to a specific workspace. Results are ordered by the
|
||||
most recently updated conversations and limited to a fixed number.
|
||||
most recently updated conversations.
|
||||
|
||||
Args:
|
||||
user_id (uuid.UUID): Unique identifier of the user.
|
||||
workspace_id (uuid.UUID, optional): Workspace scope for the query.
|
||||
If provided, only conversations under this workspace will be returned.
|
||||
limit (int): Maximum number of conversations to return.
|
||||
Defaults to 10.
|
||||
is_activate (bool): Convsersation State limit
|
||||
is_activate (bool): Conversation State limit.
|
||||
page (int): Page number (1-based). Defaults to 1.
|
||||
page_size (int): Number of items per page. Defaults to 20.
|
||||
|
||||
Returns:
|
||||
list[Conversation]: A list of conversation entities ordered by
|
||||
last updated time (descending).
|
||||
tuple[list[Conversation], int]: A list of conversation entities and total count.
|
||||
"""
|
||||
logger.info(f"Fetching conversation by user_id: {user_id}")
|
||||
|
||||
@@ -122,18 +122,25 @@ class ConversationRepository:
|
||||
if workspace_id:
|
||||
stmt = stmt.where(Conversation.workspace_id == workspace_id)
|
||||
|
||||
stmt = stmt.order_by(desc(Conversation.updated_at))
|
||||
stmt = stmt.limit(limit)
|
||||
# Calculate total count
|
||||
total = int(self.db.execute(
|
||||
select(func.count()).select_from(stmt.subquery())
|
||||
).scalar_one())
|
||||
|
||||
convsersations = list(self.db.scalars(stmt).all())
|
||||
# Apply ordering and pagination
|
||||
stmt = stmt.order_by(desc(Conversation.updated_at))
|
||||
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
|
||||
|
||||
conversations = list(self.db.scalars(stmt).all())
|
||||
logger.info(
|
||||
"Conversation fetched successfully",
|
||||
extra={
|
||||
"user_id": str(user_id),
|
||||
"workspace_id": str(workspace_id),
|
||||
"total": total,
|
||||
}
|
||||
)
|
||||
return convsersations
|
||||
return conversations, total
|
||||
|
||||
def list_conversations(
|
||||
self,
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from app.core.logging_config import get_logger
|
||||
from app.core.memory.models.graph_models import DialogueNode, StatementNode, ChunkNode, MemorySummaryNode
|
||||
from app.repositories.neo4j.cypher_queries import DIALOGUE_NODE_SAVE, STATEMENT_NODE_SAVE, CHUNK_NODE_SAVE, \
|
||||
MEMORY_SUMMARY_NODE_SAVE
|
||||
# 使用新的仓储层
|
||||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||||
|
||||
logger = get_logger(__name__)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def delete_all_nodes(end_user_id: str, connector: Neo4jConnector):
|
||||
@@ -57,7 +57,7 @@ async def add_dialogue_nodes(dialogues: List[DialogueNode], connector: Neo4jConn
|
||||
return created_uuids
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"Error creating dialogue nodes: {e}")
|
||||
logger.error(f"Error creating dialogue nodes: {e}")
|
||||
return None
|
||||
|
||||
|
||||
@@ -129,7 +129,7 @@ async def add_statement_nodes(statements: List[StatementNode], connector: Neo4jC
|
||||
return created_uuids
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"Error creating statement nodes: {e}")
|
||||
logger.error(f"Error creating statement nodes: {e}")
|
||||
return None
|
||||
|
||||
|
||||
@@ -181,7 +181,7 @@ async def add_chunk_nodes(chunks: List[ChunkNode], connector: Neo4jConnector) ->
|
||||
return created_uuids
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"Error creating chunk nodes: {e}")
|
||||
logger.error(f"Error creating chunk nodes: {e}")
|
||||
return None
|
||||
|
||||
|
||||
@@ -228,5 +228,5 @@ async def add_memory_summary_nodes(
|
||||
logger.info(f"Successfully saved {len(created_ids)} MemorySummary nodes to Neo4j")
|
||||
return created_ids
|
||||
except Exception as e:
|
||||
logger.info(f"Failed to save MemorySummary nodes to Neo4j: {e}")
|
||||
logger.error(f"Failed to save MemorySummary nodes to Neo4j: {e}")
|
||||
return None
|
||||
|
||||
@@ -24,6 +24,10 @@ from app.repositories.neo4j.cypher_queries import (
|
||||
CHECK_USER_HAS_COMMUNITIES,
|
||||
UPDATE_COMMUNITY_MEMBER_COUNT,
|
||||
UPDATE_COMMUNITY_METADATA,
|
||||
GET_INCOMPLETE_COMMUNITIES,
|
||||
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING,
|
||||
CHECK_COMMUNITY_IS_COMPLETE,
|
||||
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING,
|
||||
BATCH_UPDATE_COMMUNITY_METADATA,
|
||||
)
|
||||
|
||||
@@ -249,6 +253,31 @@ class CommunityRepository:
|
||||
logger.error(f"refresh_member_count failed: {e}")
|
||||
return 0
|
||||
|
||||
async def get_incomplete_communities(self, end_user_id: str, check_embedding: bool = False) -> List[str]:
|
||||
"""查询该用户下属性不完整的 Community 节点 ID 列表。
|
||||
|
||||
Args:
|
||||
end_user_id: 用户 ID
|
||||
check_embedding: 为 True 时额外检查 summary_embedding 是否缺失(仅当用户有 embedding 模型配置时传 True)
|
||||
"""
|
||||
try:
|
||||
query = GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING if check_embedding else GET_INCOMPLETE_COMMUNITIES
|
||||
result = await self.connector.execute_query(query, end_user_id=end_user_id)
|
||||
return [row["community_id"] for row in result]
|
||||
except Exception as e:
|
||||
logger.error(f"get_incomplete_communities failed: {e}")
|
||||
return []
|
||||
|
||||
async def is_community_complete(self, community_id: str, end_user_id: str, check_embedding: bool = False) -> bool:
|
||||
"""检查单个社区节点的属性是否完整。"""
|
||||
try:
|
||||
query = CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING if check_embedding else CHECK_COMMUNITY_IS_COMPLETE
|
||||
result = await self.connector.execute_query(query, community_id=community_id, end_user_id=end_user_id)
|
||||
return result[0]["is_complete"] if result else False
|
||||
except Exception as e:
|
||||
logger.error(f"is_community_complete failed: {e}")
|
||||
return False
|
||||
|
||||
async def update_community_metadata(
|
||||
self,
|
||||
community_id: str,
|
||||
@@ -258,7 +287,7 @@ class CommunityRepository:
|
||||
core_entities: List[str],
|
||||
summary_embedding: Optional[List[float]] = None,
|
||||
) -> bool:
|
||||
"""更新社区的名称、摘要、核心实体列表和摘要向量。"""
|
||||
"""更新社区的名称、摘要、核心实体列表及 summary_embedding。"""
|
||||
try:
|
||||
result = await self.connector.execute_query(
|
||||
UPDATE_COMMUNITY_METADATA,
|
||||
@@ -271,7 +300,7 @@ class CommunityRepository:
|
||||
)
|
||||
return bool(result)
|
||||
except Exception as e:
|
||||
logger.error(f"update_community_metadata failed: {e}")
|
||||
logger.error(f"update_community_metadata failed: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
async def batch_update_community_metadata(
|
||||
|
||||
@@ -1075,6 +1075,7 @@ RETURN
|
||||
|
||||
COMMUNITY_NODE_UPSERT = """
|
||||
MERGE (c:Community {community_id: $community_id})
|
||||
ON CREATE SET c.id = $community_id
|
||||
SET c.end_user_id = $end_user_id,
|
||||
c.member_count = $member_count,
|
||||
c.updated_at = datetime()
|
||||
@@ -1181,7 +1182,8 @@ RETURN c.community_id AS community_id, cnt AS member_count
|
||||
|
||||
UPDATE_COMMUNITY_METADATA = """
|
||||
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
|
||||
SET c.name = $name,
|
||||
SET c.id = coalesce(c.id, $community_id),
|
||||
c.name = $name,
|
||||
c.summary = $summary,
|
||||
c.core_entities = $core_entities,
|
||||
c.summary_embedding = $summary_embedding,
|
||||
@@ -1192,7 +1194,8 @@ RETURN c.community_id AS community_id
|
||||
BATCH_UPDATE_COMMUNITY_METADATA = """
|
||||
UNWIND $communities AS row
|
||||
MATCH (c:Community {community_id: row.community_id, end_user_id: row.end_user_id})
|
||||
SET c.name = row.name,
|
||||
SET c.id = coalesce(c.id, row.community_id),
|
||||
c.name = row.name,
|
||||
c.summary = row.summary,
|
||||
c.core_entities = row.core_entities,
|
||||
c.summary_embedding = row.summary_embedding,
|
||||
@@ -1276,6 +1279,40 @@ RETURN
|
||||
startNode(r) = e AS r_from_e
|
||||
"""
|
||||
|
||||
CHECK_COMMUNITY_IS_COMPLETE = """
|
||||
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
|
||||
RETURN (
|
||||
c.name IS NOT NULL AND c.name <> '' AND
|
||||
c.summary IS NOT NULL AND c.summary <> '' AND
|
||||
c.core_entities IS NOT NULL
|
||||
) AS is_complete
|
||||
"""
|
||||
|
||||
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """
|
||||
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
|
||||
RETURN (
|
||||
c.name IS NOT NULL AND c.name <> '' AND
|
||||
c.summary IS NOT NULL AND c.summary <> '' AND
|
||||
c.core_entities IS NOT NULL AND
|
||||
c.summary_embedding IS NOT NULL
|
||||
) AS is_complete
|
||||
"""
|
||||
|
||||
GET_INCOMPLETE_COMMUNITIES = """
|
||||
MATCH (c:Community {end_user_id: $end_user_id})
|
||||
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
|
||||
OR c.name = '' OR c.summary = ''
|
||||
RETURN c.community_id AS community_id
|
||||
"""
|
||||
|
||||
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """
|
||||
MATCH (c:Community {end_user_id: $end_user_id})
|
||||
WHERE c.name IS NULL OR c.name = ''
|
||||
OR c.summary IS NULL OR c.summary = ''
|
||||
OR c.core_entities IS NULL
|
||||
OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)')
|
||||
RETURN c.community_id AS community_id
|
||||
"""
|
||||
|
||||
# Community keyword search: matches name or summary via fulltext index
|
||||
SEARCH_COMMUNITIES_BY_KEYWORD = """
|
||||
|
||||
@@ -169,7 +169,7 @@ async def save_dialog_and_statements_to_neo4j(
|
||||
"""Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models.
|
||||
|
||||
只负责数据写入,不触发聚类。聚类由调用方在写入成功后通过
|
||||
schedule_clustering_after_write() 显式触发。
|
||||
_trigger_clustering_sync() 显式触发。
|
||||
|
||||
Args:
|
||||
dialogue_nodes: List of DialogueNode objects to save
|
||||
@@ -336,16 +336,13 @@ async def save_dialog_and_statements_to_neo4j(
|
||||
return False
|
||||
|
||||
|
||||
def schedule_clustering_after_write(
|
||||
async def _trigger_clustering_sync(
|
||||
entity_nodes: List,
|
||||
llm_model_id: Optional[str] = None,
|
||||
embedding_model_id: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
写入 Neo4j 成功后,调度后台聚类任务。
|
||||
|
||||
可通过环境变量 CLUSTERING_ENABLED=false 禁用(用于基准测试对比)。
|
||||
使用 asyncio.create_task 异步触发,不阻塞写入响应。
|
||||
同步等待聚类完成,避免与其他 LLM 任务并发冲突。
|
||||
"""
|
||||
if not entity_nodes:
|
||||
return
|
||||
@@ -357,9 +354,9 @@ def schedule_clustering_after_write(
|
||||
|
||||
end_user_id = entity_nodes[0].end_user_id
|
||||
new_entity_ids = [e.id for e in entity_nodes]
|
||||
logger.info(f"[Clustering] 准备触发聚类,实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}")
|
||||
asyncio.create_task(_trigger_clustering(new_entity_ids, end_user_id, llm_model_id=llm_model_id,
|
||||
embedding_model_id=embedding_model_id))
|
||||
logger.info(f"[Clustering] 准备触发聚类(同步),实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}")
|
||||
await _trigger_clustering(new_entity_ids, end_user_id, llm_model_id=llm_model_id,
|
||||
embedding_model_id=embedding_model_id)
|
||||
|
||||
|
||||
async def _trigger_clustering(
|
||||
|
||||
@@ -43,6 +43,7 @@ class WorkflowConfigRepository:
|
||||
edges: list[dict[str, Any]],
|
||||
variables: list[dict[str, Any]] | None = None,
|
||||
execution_config: dict[str, Any] | None = None,
|
||||
features: dict[str, Any] | None = None,
|
||||
triggers: list[dict[str, Any]] | None = None
|
||||
) -> WorkflowConfig:
|
||||
"""创建或更新工作流配置
|
||||
@@ -53,6 +54,7 @@ class WorkflowConfigRepository:
|
||||
edges: 边列表
|
||||
variables: 变量列表
|
||||
execution_config: 执行配置
|
||||
features: 功能特性
|
||||
triggers: 触发器列表
|
||||
|
||||
Returns:
|
||||
@@ -82,6 +84,7 @@ class WorkflowConfigRepository:
|
||||
edges=edges,
|
||||
variables=variables or [],
|
||||
execution_config=execution_config or {},
|
||||
features=features or {},
|
||||
triggers=triggers or []
|
||||
)
|
||||
self.db.add(config)
|
||||
|
||||
Reference in New Issue
Block a user