From fc58ac0408c6110fcbd852ce85e17c9e95353ff0 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Wed, 11 Mar 2026 18:04:04 +0800 Subject: [PATCH 1/5] [changes] Initial stage of community integration --- .../clustering_engine/label_propagation.py | 94 ++++++++++++++----- .../neo4j/community_repository.py | 20 ++++ api/app/repositories/neo4j/cypher_queries.py | 48 +++++----- 3 files changed, 115 insertions(+), 47 deletions(-) diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 80e238fd..cb6e5804 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -141,8 +141,18 @@ class LabelPropagationEngine: # 将最终标签写入 Neo4j await self._flush_labels(labels, end_user_id) + pre_merge_count = len(set(labels.values())) logger.info( - f"[Clustering] 全量聚类完成,共 {len(set(labels.values()))} 个社区," + f"[Clustering] 全量迭代完成,共 {pre_merge_count} 个社区," + f"{len(labels)} 个实体,开始后处理合并" + ) + + # 全量初始化后做一轮社区合并(基于 name_embedding 余弦相似度) + all_community_ids = list(set(labels.values())) + await self._evaluate_merge(all_community_ids, end_user_id) + + logger.info( + f"[Clustering] 全量聚类完成,合并前 {pre_merge_count} 个社区," f"{len(labels)} 个实体" ) @@ -221,30 +231,50 @@ class LabelPropagationEngine: 策略:计算各社区成员 embedding 的平均向量,若两两余弦相似度 > 0.75 则合并。 合并时保留成员数最多的社区,其余成员迁移过来。 + + 全量场景(社区数 > 20)使用批量查询,避免 N 次数据库往返。 """ MERGE_THRESHOLD = 0.75 + BATCH_THRESHOLD = 20 # 超过此数量走批量查询 community_embeddings: Dict[str, Optional[List[float]]] = {} community_sizes: Dict[str, int] = {} - for cid in community_ids: - members = await self.repo.get_community_members(cid, end_user_id) - community_sizes[cid] = len(members) - # 计算社区成员 embedding 的平均向量 - valid_embeddings = [ - m["name_embedding"] - for m in members - if m.get("name_embedding") - ] - if valid_embeddings: - dim = len(valid_embeddings[0]) - avg = [ - sum(e[i] for e in valid_embeddings) / len(valid_embeddings) - for i in range(dim) + if len(community_ids) > BATCH_THRESHOLD: + # 批量查询:一次拉取所有社区成员 + all_members = await self.repo.get_all_community_members_batch( + community_ids, end_user_id + ) + for cid in community_ids: + members = all_members.get(cid, []) + community_sizes[cid] = len(members) + valid_embeddings = [ + m["name_embedding"] for m in members if m.get("name_embedding") ] - community_embeddings[cid] = avg - else: - community_embeddings[cid] = None + if valid_embeddings: + dim = len(valid_embeddings[0]) + community_embeddings[cid] = [ + sum(e[i] for e in valid_embeddings) / len(valid_embeddings) + for i in range(dim) + ] + else: + community_embeddings[cid] = None + else: + # 增量场景:逐个查询 + for cid in community_ids: + members = await self.repo.get_community_members(cid, end_user_id) + community_sizes[cid] = len(members) + valid_embeddings = [ + m["name_embedding"] for m in members if m.get("name_embedding") + ] + if valid_embeddings: + dim = len(valid_embeddings[0]) + community_embeddings[cid] = [ + sum(e[i] for e in valid_embeddings) / len(valid_embeddings) + for i in range(dim) + ] + else: + community_embeddings[cid] = None # 找出应合并的社区对 to_merge: List[tuple] = [] @@ -258,14 +288,32 @@ class LabelPropagationEngine: if sim > MERGE_THRESHOLD: to_merge.append((cids[i], cids[j])) + logger.info(f"[Clustering] 发现 {len(to_merge)} 对可合并社区") + + # 执行合并:用 union-find 思路避免重复迁移已被合并的社区 + # 维护一个 canonical 映射,确保链式合并正确收敛 + canonical: Dict[str, str] = {cid: cid for cid in cids} + + def find(x: str) -> str: + while canonical[x] != x: + canonical[x] = canonical[canonical[x]] + x = canonical[x] + return x + for c1, c2 in to_merge: - keep = c1 if community_sizes.get(c1, 0) >= community_sizes.get(c2, 0) else c2 - dissolve = c2 if keep == c1 else c1 + root1, root2 = find(c1), find(c2) + if root1 == root2: + continue # 已经在同一社区,跳过 + keep = root1 if community_sizes.get(root1, 0) >= community_sizes.get(root2, 0) else root2 + dissolve = root2 if keep == root1 else root1 + canonical[dissolve] = keep + members = await self.repo.get_community_members(dissolve, end_user_id) for m in members: - await self.repo.assign_entity_to_community( - m["id"], keep, end_user_id - ) + await self.repo.assign_entity_to_community(m["id"], keep, end_user_id) + # 更新 sizes 以便后续合并决策准确 + community_sizes[keep] = community_sizes.get(keep, 0) + len(members) + community_sizes[dissolve] = 0 await self.repo.refresh_member_count(keep, end_user_id) logger.info( f"[Clustering] 社区合并: {dissolve} → {keep}," diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index 16e30a10..2a1f4f2b 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -14,6 +14,7 @@ from app.repositories.neo4j.cypher_queries import ( GET_ENTITY_NEIGHBORS, GET_ALL_ENTITIES_FOR_USER, GET_COMMUNITY_MEMBERS, + GET_ALL_COMMUNITY_MEMBERS_BATCH, CHECK_USER_HAS_COMMUNITIES, UPDATE_COMMUNITY_MEMBER_COUNT, ) @@ -101,6 +102,25 @@ class CommunityRepository: logger.error(f"get_community_members failed: {e}") return [] + async def get_all_community_members_batch( + self, community_ids: List[str], end_user_id: str + ) -> Dict[str, List[Dict]]: + """批量查询多个社区的成员,返回 {community_id: [members]} 字典。""" + try: + rows = await self.connector.execute_query( + GET_ALL_COMMUNITY_MEMBERS_BATCH, + community_ids=community_ids, + end_user_id=end_user_id, + ) + result: Dict[str, List[Dict]] = {} + for row in rows: + cid = row["community_id"] + result.setdefault(cid, []).append(row) + return result + except Exception as e: + logger.error(f"get_all_community_members_batch failed: {e}") + return {} + async def has_communities(self, end_user_id: str) -> bool: """检查该用户是否已有 Community 节点(用于判断全量 vs 增量)。""" try: diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 947097a2..84889d65 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1065,26 +1065,6 @@ Graph_Node_query = """ # Community 节点 & BELONGS_TO_COMMUNITY 边 # ============================================================ -COMMUNITY_NODE_SAVE = """ -MERGE (c:Community {community_id: $community_id}) -SET c.end_user_id = $end_user_id, - c.formed_at = $formed_at, - c.updated_at = datetime(), - c.status = $status, - c.member_count = $member_count -RETURN c.community_id AS community_id -""" - -COMMUNITY_ADD_MEMBER = """ -MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) -MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) -MERGE (e)-[:BELONGS_TO_COMMUNITY]->(c) -SET c.updated_at = datetime(), - c.member_count = $member_count -""" - - - # ─── Community 聚类相关 Cypher 模板 ─────────────────────────────────────────── COMMUNITY_NODE_UPSERT = """ @@ -1111,12 +1091,23 @@ DELETE r GET_ENTITY_NEIGHBORS = """ MATCH (e:ExtractedEntity {id: $entity_id, end_user_id: $end_user_id}) -OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源一:直接关系邻居(EXTRACTED_RELATIONSHIP 边) +OPTIONAL MATCH (e)-[:EXTRACTED_RELATIONSHIP]-(nb1:ExtractedEntity {end_user_id: $end_user_id}) + +// 来源二:同 Statement 共现邻居(REFERENCES_ENTITY 边) +OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e) +OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(nb2:ExtractedEntity {end_user_id: $end_user_id}) +WHERE nb2.id <> e.id + +WITH collect(DISTINCT nb1) + collect(DISTINCT nb2) AS all_neighbors +UNWIND all_neighbors AS nb +WITH nb WHERE nb IS NOT NULL OPTIONAL MATCH (nb)-[:BELONGS_TO_COMMUNITY]->(c:Community) RETURN DISTINCT - nb.id AS id, - nb.name AS name, - nb.name_embedding AS name_embedding, + nb.id AS id, + nb.name AS name, + nb.name_embedding AS name_embedding, nb.activation_value AS activation_value, CASE WHEN c IS NOT NULL THEN c.community_id ELSE null END AS community_id """ @@ -1139,6 +1130,15 @@ RETURN e.id AS id, e.name AS name, e.entity_type AS entity_type, ORDER BY coalesce(e.activation_value, 0) DESC """ +GET_ALL_COMMUNITY_MEMBERS_BATCH = """ +MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[:BELONGS_TO_COMMUNITY]->(c:Community) +WHERE c.community_id IN $community_ids +RETURN c.community_id AS community_id, + e.id AS id, + e.name_embedding AS name_embedding, + e.activation_value AS activation_value +""" + CHECK_USER_HAS_COMMUNITIES = """ MATCH (c:Community {end_user_id: $end_user_id}) RETURN count(c) AS community_count From 7b8f101824055122631dd590f8f9b873211ec136 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 12 Mar 2026 20:27:50 +0800 Subject: [PATCH 2/5] [add] Create the attribute values of the community nodes --- .../core/memory/agent/utils/write_tools.py | 4 +- .../clustering_engine/label_propagation.py | 83 ++++++++++++++++++- .../neo4j/community_repository.py | 24 ++++++ api/app/repositories/neo4j/cypher_queries.py | 9 ++ api/app/repositories/neo4j/graph_saver.py | 20 +++-- 5 files changed, 132 insertions(+), 8 deletions(-) diff --git a/api/app/core/memory/agent/utils/write_tools.py b/api/app/core/memory/agent/utils/write_tools.py index 22030278..b3707083 100644 --- a/api/app/core/memory/agent/utils/write_tools.py +++ b/api/app/core/memory/agent/utils/write_tools.py @@ -165,7 +165,9 @@ async def write( statement_chunk_edges=all_statement_chunk_edges, statement_entity_edges=all_statement_entity_edges, entity_edges=all_entity_entity_edges, - connector=neo4j_connector + connector=neo4j_connector, + config_id=config_id, + llm_model_id=str(memory_config.llm_model_id) if memory_config.llm_model_id else None, ) if success: logger.info("Successfully saved all data to Neo4j") diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index cb6e5804..251d4fea 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -19,6 +19,8 @@ logger = logging.getLogger(__name__) # 全量迭代最大轮数,防止不收敛 MAX_ITERATIONS = 10 +# 社区摘要核心实体数量 +CORE_ENTITY_LIMIT = 5 def _cosine_similarity(v1: Optional[List[float]], v2: Optional[List[float]]) -> float: @@ -62,9 +64,16 @@ def _weighted_vote( class LabelPropagationEngine: """标签传播聚类引擎""" - def __init__(self, connector: Neo4jConnector): + def __init__( + self, + connector: Neo4jConnector, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, + ): self.connector = connector self.repo = CommunityRepository(connector) + self.config_id = config_id + self.llm_model_id = llm_model_id # ────────────────────────────────────────────────────────────────────────── # 公开接口 @@ -155,6 +164,10 @@ class LabelPropagationEngine: f"[Clustering] 全量聚类完成,合并前 {pre_merge_count} 个社区," f"{len(labels)} 个实体" ) + # 为所有社区生成元数据 + unique_communities = list(set(labels.values())) + for cid in unique_communities: + await self._generate_community_metadata(cid, end_user_id) async def incremental_update( self, new_entity_ids: List[str], end_user_id: str @@ -211,6 +224,7 @@ class LabelPropagationEngine: logger.debug( f"[Clustering] 新实体 {entity_id} 与 {len(neighbors)} 个无社区邻居 → 新社区 {new_cid}" ) + await self._generate_community_metadata(new_cid, end_user_id) else: # 加入得票最多的社区 await self.repo.assign_entity_to_community(entity_id, target_cid, end_user_id) @@ -222,6 +236,7 @@ class LabelPropagationEngine: await self._evaluate_merge( list(community_ids_in_neighbors), end_user_id ) + await self._generate_community_metadata(target_cid, end_user_id) async def _evaluate_merge( self, community_ids: List[str], end_user_id: str @@ -354,6 +369,72 @@ class LabelPropagationEngine: except Exception: return None + async def _generate_community_metadata( + self, community_id: str, end_user_id: str + ) -> None: + """ + 为社区生成并写入元数据:名称、摘要、核心实体。 + + - core_entities:按 activation_value 排序取 top-N 实体名称列表(无需 LLM) + - name / summary:若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底 + """ + try: + members = await self.repo.get_community_members(community_id, end_user_id) + if not members: + return + + # 核心实体:按 activation_value 降序取 top-N + sorted_members = sorted( + members, + key=lambda m: m.get("activation_value") or 0, + reverse=True, + ) + core_entities = [m["name"] for m in sorted_members[:CORE_ENTITY_LIMIT] if m.get("name")] + all_names = [m["name"] for m in members if m.get("name")] + + name = "、".join(core_entities[:3]) if core_entities else community_id[:8] + summary = f"包含实体:{', '.join(all_names)}" + + # 若有 LLM 配置,调用 LLM 生成更好的名称和摘要 + if self.llm_model_id: + try: + from app.db import get_db_context + from app.core.memory.utils.llm.llm_utils import MemoryClientFactory + + entity_list_str = "、".join(all_names) + prompt = ( + f"以下是一组语义相关的实体:{entity_list_str}\n\n" + f"请为这组实体所代表的主题:\n" + f"1. 起一个简洁的中文名称(不超过10个字)\n" + f"2. 写一句话摘要(不超过50个字)\n\n" + f"严格按以下格式输出,不要有其他内容:\n" + f"名称:<名称>\n摘要:<摘要>" + ) + with get_db_context() as db: + factory = MemoryClientFactory(db) + llm_client = factory.get_llm_client(self.llm_model_id) + response = await llm_client.chat([{"role": "user", "content": prompt}]) + text = response.content if hasattr(response, "content") else str(response) + + for line in text.strip().splitlines(): + if line.startswith("名称:"): + name = line[3:].strip() + elif line.startswith("摘要:"): + summary = line[3:].strip() + except Exception as e: + logger.warning(f"[Clustering] LLM 生成社区元数据失败,使用兜底值: {e}") + + await self.repo.update_community_metadata( + community_id=community_id, + end_user_id=end_user_id, + name=name, + summary=summary, + core_entities=core_entities, + ) + logger.debug(f"[Clustering] 社区 {community_id} 元数据已更新: name={name}") + except Exception as e: + logger.error(f"[Clustering] _generate_community_metadata failed for {community_id}: {e}") + @staticmethod def _new_community_id() -> str: return str(uuid.uuid4()) diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index 2a1f4f2b..6c5c7618 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -17,6 +17,7 @@ from app.repositories.neo4j.cypher_queries import ( GET_ALL_COMMUNITY_MEMBERS_BATCH, CHECK_USER_HAS_COMMUNITIES, UPDATE_COMMUNITY_MEMBER_COUNT, + UPDATE_COMMUNITY_METADATA, ) logger = logging.getLogger(__name__) @@ -147,3 +148,26 @@ class CommunityRepository: except Exception as e: logger.error(f"refresh_member_count failed: {e}") return 0 + + async def update_community_metadata( + self, + community_id: str, + end_user_id: str, + name: str, + summary: str, + core_entities: List[str], + ) -> bool: + """更新社区的名称、摘要和核心实体列表。""" + try: + result = await self.connector.execute_query( + UPDATE_COMMUNITY_METADATA, + community_id=community_id, + end_user_id=end_user_id, + name=name, + summary=summary, + core_entities=core_entities, + ) + return bool(result) + except Exception as e: + logger.error(f"update_community_metadata failed: {e}") + return False diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 84889d65..b270ed64 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1150,3 +1150,12 @@ WITH c, count(e) AS cnt SET c.member_count = cnt RETURN c.community_id AS community_id, cnt AS member_count """ + +UPDATE_COMMUNITY_METADATA = """ +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +SET c.name = $name, + c.summary = $summary, + c.core_entities = $core_entities, + c.updated_at = datetime() +RETURN c.community_id AS community_id +""" diff --git a/api/app/repositories/neo4j/graph_saver.py b/api/app/repositories/neo4j/graph_saver.py index a94bc23b..2ef9bafc 100644 --- a/api/app/repositories/neo4j/graph_saver.py +++ b/api/app/repositories/neo4j/graph_saver.py @@ -1,5 +1,6 @@ import asyncio -from typing import List +import os +from typing import List, Optional # 使用新的仓储层 from app.repositories.neo4j.neo4j_connector import Neo4jConnector @@ -156,7 +157,9 @@ async def save_dialog_and_statements_to_neo4j( entity_edges: List[EntityEntityEdge], statement_chunk_edges: List[StatementChunkEdge], statement_entity_edges: List[StatementEntityEdge], - connector: Neo4jConnector + connector: Neo4jConnector, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, ) -> bool: """Save dialogue nodes, chunk nodes, statement nodes, entities, and all relationships to Neo4j using graph models. @@ -290,12 +293,15 @@ async def save_dialog_and_statements_to_neo4j( logger.info("Transaction completed. Summary: %s", summary) logger.debug("Full transaction results: %r", results) - # 写入成功后,触发聚类 - if entity_nodes: + # 写入成功后,触发聚类(可通过环境变量 CLUSTERING_ENABLED=false 禁用,用于基准测试对比) + clustering_enabled = os.getenv("CLUSTERING_ENABLED", "true").lower() != "false" + if entity_nodes and clustering_enabled: end_user_id = entity_nodes[0].end_user_id new_entity_ids = [e.id for e in entity_nodes] logger.info(f"[Clustering] 准备触发聚类,实体数: {len(new_entity_ids)}, end_user_id: {end_user_id}") - await _trigger_clustering(new_entity_ids, end_user_id) + asyncio.create_task(_trigger_clustering(new_entity_ids, end_user_id, config_id=config_id, llm_model_id=llm_model_id)) + elif entity_nodes and not clustering_enabled: + logger.info("[Clustering] 聚类已禁用(CLUSTERING_ENABLED=false),跳过聚类触发") return True @@ -309,6 +315,8 @@ async def save_dialog_and_statements_to_neo4j( async def _trigger_clustering( new_entity_ids: List[str], end_user_id: str, + config_id: Optional[str] = None, + llm_model_id: Optional[str] = None, ) -> None: """ 聚类触发函数,自动判断全量初始化还是增量更新。 @@ -318,7 +326,7 @@ async def _trigger_clustering( from app.core.memory.storage_services.clustering_engine import LabelPropagationEngine logger.info(f"[Clustering] 开始聚类,end_user_id={end_user_id}, 实体数={len(new_entity_ids)}") connector = Neo4jConnector() - engine = LabelPropagationEngine(connector) + engine = LabelPropagationEngine(connector, config_id=config_id, llm_model_id=llm_model_id) await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids) logger.info(f"[Clustering] 聚类完成,end_user_id={end_user_id}") except Exception as e: From f6d929ab7a48e9a5417f9f45f376de02a796058a Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 12:59:36 +0800 Subject: [PATCH 3/5] [add] Community node interface development --- .../controllers/user_memory_controllers.py | 37 ++++ api/app/services/user_memory_service.py | 160 ++++++++++++++++++ 2 files changed, 197 insertions(+) diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index d3fe7d83..be796ff9 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -17,6 +17,7 @@ from app.services.user_memory_service import ( UserMemoryService, analytics_memory_types, analytics_graph_data, + analytics_community_graph_data, ) from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse @@ -295,6 +296,42 @@ async def get_graph_data_api( return fail(BizCode.INTERNAL_ERROR, "图数据查询失败", str(e)) +@router.get("/analytics/community_graph", response_model=ApiResponse) +async def get_community_graph_data_api( + end_user_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> dict: + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询社区图谱但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"社区图谱查询请求: end_user_id={end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}" + ) + + try: + result = await analytics_community_graph_data(db=db, end_user_id=end_user_id) + + if "message" in result and result["statistics"]["total_nodes"] == 0: + api_logger.warning(f"社区图谱查询返回空结果: {result.get('message')}") + return success(data=result, msg=result.get("message", "查询成功")) + + api_logger.info( + f"成功获取社区图谱: end_user_id={end_user_id}, " + f"nodes={result['statistics']['total_nodes']}, " + f"edges={result['statistics']['total_edges']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"社区图谱查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "社区图谱查询失败", str(e)) + + @router.get("/read_end_user/profile", response_model=ApiResponse) async def get_end_user_profile( end_user_id: str, diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 8bacc112..d21df064 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -1727,6 +1727,166 @@ async def analytics_graph_data( # 辅助函数 +async def analytics_community_graph_data( + db: Session, + end_user_id: str, +) -> Dict[str, Any]: + """ + 获取社区图谱数据,包含 Community 节点、ExtractedEntity 节点及其关系。 + + Returns: + 包含 nodes、edges、statistics 的字典,格式与 analytics_graph_data 一致 + """ + try: + user_uuid = uuid.UUID(end_user_id) + repo = EndUserRepository(db) + end_user = repo.get_by_id(user_uuid) + if not end_user: + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "用户不存在" + } + + # 查询社区节点、实体节点、BELONGS_TO_COMMUNITY 边、实体间关系 + cypher = """ + MATCH (c:Community {end_user_id: $end_user_id}) + MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[b:BELONGS_TO_COMMUNITY]->(c) + OPTIONAL MATCH (e)-[r:EXTRACTED_RELATIONSHIP]-(e2:ExtractedEntity {end_user_id: $end_user_id}) + RETURN + elementId(c) AS c_id, + properties(c) AS c_props, + elementId(e) AS e_id, + properties(e) AS e_props, + elementId(b) AS b_id, + elementId(e2) AS e2_id, + properties(e2) AS e2_props, + elementId(r) AS r_id, + type(r) AS r_type, + properties(r) AS r_props, + startNode(r) = e AS r_from_e + """ + rows = await _neo4j_connector.execute_query(cypher, end_user_id=end_user_id) + + nodes_map: Dict[str, dict] = {} + edges_map: Dict[str, dict] = {} + # 记录每个 Community 对应的实体 id 列表 + community_members: Dict[str, list] = {} + + for row in rows: + # Community 节点 + c_id = row["c_id"] + if c_id and c_id not in nodes_map: + raw = row["c_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "community_id", "end_user_id", "member_count", "updated_at", + "name", "summary", "core_entities", + ) if k in raw} + nodes_map[c_id] = { + "id": c_id, + "label": "Community", + "properties": props, + } + + # ExtractedEntity 节点 (e) + e_id = row["e_id"] + if e_id and e_id not in nodes_map: + raw = row["e_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # 注入所属社区名称(c 是 e 直接归属的社区) + c_raw = row["c_props"] or {} + props["community_name"] = _clean_neo4j_value(c_raw.get("name")) or "" + nodes_map[e_id] = { + "id": e_id, + "label": "ExtractedEntity", + "properties": props, + } + + # ExtractedEntity 节点 (e2,可选) + e2_id = row.get("e2_id") + if e2_id and e2_id not in nodes_map: + raw = row["e2_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # e2 的社区归属在后处理阶段通过 community_members 补充 + props["community_name"] = "" + nodes_map[e2_id] = { + "id": e2_id, + "label": "ExtractedEntity", + "properties": props, + } + + # BELONGS_TO_COMMUNITY 边 + b_id = row["b_id"] + if b_id and b_id not in edges_map: + edges_map[b_id] = { + "id": b_id, + "source": e_id, + "target": c_id, + } + # 收集社区成员 id + if c_id and e_id: + community_members.setdefault(c_id, []) + if e_id not in community_members[c_id]: + community_members[c_id].append(e_id) + + # EXTRACTED_RELATIONSHIP 边(可选) + r_id = row.get("r_id") + if r_id and r_id not in edges_map and e2_id: + r_props = {k: _clean_neo4j_value(v) for k, v in (row["r_props"] or {}).items()} + source = e_id if row.get("r_from_e") else e2_id + target = e2_id if row.get("r_from_e") else e_id + edges_map[r_id] = { + "id": r_id, + "source": source, + "target": target, + } + + nodes = list(nodes_map.values()) + edges = list(edges_map.values()) + + # 为每个 Community 节点注入 member_entity_ids,同时补全 e2 节点的 community_name + for c_id, member_ids in community_members.items(): + c_node = nodes_map.get(c_id) + if c_node: + c_node["properties"]["member_entity_ids"] = member_ids + c_name = c_node["properties"].get("name") or "" + # 补全属于该社区但 community_name 为空的实体(即 e2 节点) + for eid in member_ids: + e_node = nodes_map.get(eid) + if e_node and e_node["label"] == "ExtractedEntity": + if not e_node["properties"].get("community_name"): + e_node["properties"]["community_name"] = c_name + + node_type_counts: Dict[str, int] = {} + for n in nodes: + node_type_counts[n["label"]] = node_type_counts.get(n["label"], 0) + 1 + + return { + "nodes": nodes, + "edges": edges, + "statistics": { + "total_nodes": len(nodes), + "total_edges": len(edges), + "node_types": node_type_counts, + } + } + + except ValueError: + logger.error(f"无效的 end_user_id 格式: {end_user_id}") + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "无效的用户ID格式" + } + except Exception as e: + logger.error(f"获取社区图谱数据失败: {str(e)}", exc_info=True) + raise + + async def _extract_node_properties(label: str, properties: Dict[str, Any],node_id: str) -> Dict[str, Any]: """ 根据节点类型提取需要的属性字段 From 6a0ee22d8145d17654b010656e52f7ead22bc92c Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 14:43:29 +0800 Subject: [PATCH 4/5] [add] Create trigger events for the purpose of completing the existing data --- api/app/celery_app.py | 1 + .../memory_dashboard_controller.py | 14 +- .../clustering_engine/label_propagation.py | 70 +++++++-- api/app/tasks.py | 138 ++++++++++++++++++ 4 files changed, 206 insertions(+), 17 deletions(-) diff --git a/api/app/celery_app.py b/api/app/celery_app.py index 0319e079..dbfa9d51 100644 --- a/api/app/celery_app.py +++ b/api/app/celery_app.py @@ -113,6 +113,7 @@ celery_app.conf.update( 'app.tasks.run_forgetting_cycle_task': {'queue': 'periodic_tasks'}, 'app.tasks.write_all_workspaces_memory_task': {'queue': 'periodic_tasks'}, 'app.tasks.update_implicit_emotions_storage': {'queue': 'periodic_tasks'}, + 'app.tasks.init_community_clustering_for_users': {'queue': 'periodic_tasks'}, }, ) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 1b5b45fb..f01445d3 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -177,7 +177,19 @@ async def get_workspace_end_users( await aio_redis_set(cache_key, json.dumps(result), expire=30) except Exception as e: api_logger.warning(f"Redis 缓存写入失败: {str(e)}") - + + # 触发社区聚类补全任务(异步,不阻塞接口响应) + # 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类 + try: + from app.tasks import init_community_clustering_for_users + init_community_clustering_for_users.apply_async( + kwargs={"end_user_ids": end_user_ids}, + queue="periodic_tasks", + ) + api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}") + except Exception as e: + api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") + api_logger.info(f"成功获取 {len(end_users)} 个宿主记录") return success(data=result, msg="宿主列表获取成功") diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 251d4fea..4491b416 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -165,8 +165,15 @@ class LabelPropagationEngine: f"{len(labels)} 个实体" ) # 为所有社区生成元数据 - unique_communities = list(set(labels.values())) - for cid in unique_communities: + # 注意:_evaluate_merge 后部分社区已被合并消解,需重新从 Neo4j 查询实际存活的社区 + # 不能复用 labels.values(),那里包含已被 dissolve 的旧社区 ID + surviving_communities = await self.repo.get_all_entities(end_user_id) + surviving_community_ids = list({ + e.get("community_id") for e in surviving_communities + if e.get("community_id") + }) + logger.info(f"[Clustering] 合并后实际存活社区数: {len(surviving_community_ids)}") + for cid in surviving_community_ids: await self._generate_community_metadata(cid, end_user_id) async def incremental_update( @@ -249,7 +256,7 @@ class LabelPropagationEngine: 全量场景(社区数 > 20)使用批量查询,避免 N 次数据库往返。 """ - MERGE_THRESHOLD = 0.75 + MERGE_THRESHOLD = 0.85 BATCH_THRESHOLD = 20 # 超过此数量走批量查询 community_embeddings: Dict[str, Optional[List[float]]] = {} @@ -305,34 +312,65 @@ class LabelPropagationEngine: logger.info(f"[Clustering] 发现 {len(to_merge)} 对可合并社区") - # 执行合并:用 union-find 思路避免重复迁移已被合并的社区 - # 维护一个 canonical 映射,确保链式合并正确收敛 - canonical: Dict[str, str] = {cid: cid for cid in cids} + # 执行合并:逐对处理,每次合并后重新计算合并社区的平均向量 + # 避免 union-find 链式传递导致语义不相关的社区被间接合并 + # (A≈B、B≈C 不代表 A≈C,不能因传递性把 A/B/C 全部合并) + merged_into: Dict[str, str] = {} # dissolve → keep 的最终映射 - def find(x: str) -> str: - while canonical[x] != x: - canonical[x] = canonical[canonical[x]] - x = canonical[x] + def get_root(x: str) -> str: + """路径压缩,找到 x 当前所属的根社区。""" + while x in merged_into: + merged_into[x] = merged_into.get(merged_into[x], merged_into[x]) + x = merged_into[x] return x for c1, c2 in to_merge: - root1, root2 = find(c1), find(c2) + root1, root2 = get_root(c1), get_root(c2) if root1 == root2: - continue # 已经在同一社区,跳过 + continue + + # 用合并后的最新平均向量重新验证相似度 + # 防止链式传递:A≈B 合并后 B 的向量已更新,C 必须和新 B 相似才能合并 + current_sim = _cosine_similarity( + community_embeddings.get(root1), + community_embeddings.get(root2), + ) + if current_sim <= MERGE_THRESHOLD: + # 合并后向量已漂移,不再满足阈值,跳过 + logger.debug( + f"[Clustering] 跳过合并 {root1} ↔ {root2}," + f"当前相似度 {current_sim:.3f} ≤ {MERGE_THRESHOLD}" + ) + continue + keep = root1 if community_sizes.get(root1, 0) >= community_sizes.get(root2, 0) else root2 dissolve = root2 if keep == root1 else root1 - canonical[dissolve] = keep + merged_into[dissolve] = keep members = await self.repo.get_community_members(dissolve, end_user_id) for m in members: await self.repo.assign_entity_to_community(m["id"], keep, end_user_id) - # 更新 sizes 以便后续合并决策准确 - community_sizes[keep] = community_sizes.get(keep, 0) + len(members) + + # 合并后重新计算 keep 的平均向量(加权平均) + keep_emb = community_embeddings.get(keep) + dissolve_emb = community_embeddings.get(dissolve) + keep_size = community_sizes.get(keep, 0) + dissolve_size = community_sizes.get(dissolve, 0) + total_size = keep_size + dissolve_size + if keep_emb and dissolve_emb and total_size > 0: + dim = len(keep_emb) + community_embeddings[keep] = [ + (keep_emb[i] * keep_size + dissolve_emb[i] * dissolve_size) / total_size + for i in range(dim) + ] + community_embeddings[dissolve] = None + + community_sizes[keep] = total_size community_sizes[dissolve] = 0 await self.repo.refresh_member_count(keep, end_user_id) logger.info( f"[Clustering] 社区合并: {dissolve} → {keep}," - f"迁移 {len(members)} 个成员" + f"相似度={current_sim:.3f},迁移 {len(members)} 个成员" ) async def _flush_labels( diff --git a/api/app/tasks.py b/api/app/tasks.py index a6ebbb8e..134d4744 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2416,3 +2416,141 @@ def update_implicit_emotions_storage(self) -> Dict[str, Any]: "elapsed_time": elapsed_time, "task_id": self.request.id } + + +# ============================================================================= +# 社区聚类补全任务(触发型) +# ============================================================================= + +@celery_app.task( + name="app.tasks.init_community_clustering_for_users", + bind=True, + ignore_result=False, + max_retries=0, + acks_late=False, + time_limit=7200, # 2小时硬超时 + soft_time_limit=6900, +) +def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]: + """触发型任务:检查指定用户列表,对有 ExtractedEntity 但无 Community 节点的用户执行全量聚类。 + + 由 /dashboard/end_users 接口触发,已有社区节点的用户直接跳过。 + + Args: + end_user_ids: 需要检查的用户 ID 列表 + + Returns: + 包含任务执行结果的字典 + """ + start_time = time.time() + + async def _run() -> Dict[str, Any]: + from app.core.logging_config import get_logger + from app.repositories.neo4j.community_repository import CommunityRepository + from app.repositories.neo4j.neo4j_connector import Neo4jConnector + from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine + + logger = get_logger(__name__) + logger.info(f"[CommunityCluster] 开始社区聚类补全任务,候选用户数: {len(end_user_ids)}") + + initialized = 0 + skipped = 0 + failed = 0 + + connector = Neo4jConnector() + try: + repo = CommunityRepository(connector) + + # 获取 llm_model_id(从第一个用户的配置中读取,作为全局兜底) + llm_model_id = None + try: + with get_db_context() as db: + from app.services.memory_agent_service import get_end_user_connected_config + from app.services.memory_config_service import MemoryConfigService + for uid in end_user_ids: + try: + connected = get_end_user_connected_config(uid, db) + config_id = connected.get("memory_config_id") + workspace_id = connected.get("workspace_id") + if config_id or workspace_id: + cfg = MemoryConfigService(db).load_memory_config( + config_id=config_id, workspace_id=workspace_id + ) + llm_model_id = str(cfg.llm_model_id) + break + except Exception: + continue + except Exception as e: + logger.warning(f"[CommunityCluster] 获取 LLM 配置失败,将使用兜底值: {e}") + + engine = LabelPropagationEngine( + connector=connector, + llm_model_id=llm_model_id, + ) + + for end_user_id in end_user_ids: + try: + # 已有社区节点则跳过 + has_communities = await repo.has_communities(end_user_id) + if has_communities: + skipped += 1 + logger.debug(f"[CommunityCluster] 用户 {end_user_id} 已有社区节点,跳过") + continue + + # 检查是否有 ExtractedEntity 节点 + entities = await repo.get_all_entities(end_user_id) + if not entities: + skipped += 1 + logger.debug(f"[CommunityCluster] 用户 {end_user_id} 无实体节点,跳过") + continue + + logger.info(f"[CommunityCluster] 用户 {end_user_id} 有 {len(entities)} 个实体,开始全量聚类") + await engine.full_clustering(end_user_id) + initialized += 1 + logger.info(f"[CommunityCluster] 用户 {end_user_id} 聚类完成") + + except Exception as e: + failed += 1 + logger.error(f"[CommunityCluster] 用户 {end_user_id} 聚类失败: {e}") + + finally: + await connector.close() + + logger.info( + f"[CommunityCluster] 任务完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}" + ) + return { + "status": "SUCCESS", + "initialized": initialized, + "skipped": skipped, + "failed": failed, + } + + try: + try: + import nest_asyncio + nest_asyncio.apply() + except ImportError: + pass + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(_run()) + result["elapsed_time"] = time.time() - start_time + result["task_id"] = self.request.id + return result + + except Exception as e: + return { + "status": "FAILURE", + "error": str(e), + "elapsed_time": time.time() - start_time, + "task_id": self.request.id, + } From 01a1e8eab1ee97dbb04b6c79d1d3fd085b491b0f Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 14:50:21 +0800 Subject: [PATCH 5/5] [changes] Update the pointers in the main repository to point to the submodules --- redbear-mem-benchmark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redbear-mem-benchmark b/redbear-mem-benchmark index 8494e824..b4ddbe9b 160000 --- a/redbear-mem-benchmark +++ b/redbear-mem-benchmark @@ -1 +1 @@ -Subproject commit 8494e82498cb99c70ac67a64a544ff872432363a +Subproject commit b4ddbe9b19014bb8d2d20f1b41eb656d03a5e5ed