From 967139cea4e6527f5d823b18c49702b40b187a8b Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Fri, 13 Mar 2026 12:59:36 +0800 Subject: [PATCH] [add] Community node interface development --- .../controllers/user_memory_controllers.py | 37 ++++ api/app/services/user_memory_service.py | 160 ++++++++++++++++++ 2 files changed, 197 insertions(+) diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index d3fe7d83..be796ff9 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -17,6 +17,7 @@ from app.services.user_memory_service import ( UserMemoryService, analytics_memory_types, analytics_graph_data, + analytics_community_graph_data, ) from app.services.memory_entity_relationship_service import MemoryEntityService,MemoryEmotion,MemoryInteraction from app.schemas.response_schema import ApiResponse @@ -295,6 +296,42 @@ async def get_graph_data_api( return fail(BizCode.INTERNAL_ERROR, "图数据查询失败", str(e)) +@router.get("/analytics/community_graph", response_model=ApiResponse) +async def get_community_graph_data_api( + end_user_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +) -> dict: + workspace_id = current_user.current_workspace_id + + if workspace_id is None: + api_logger.warning(f"用户 {current_user.username} 尝试查询社区图谱但未选择工作空间") + return fail(BizCode.INVALID_PARAMETER, "请先切换到一个工作空间", "current_workspace_id is None") + + api_logger.info( + f"社区图谱查询请求: end_user_id={end_user_id}, user={current_user.username}, " + f"workspace={workspace_id}" + ) + + try: + result = await analytics_community_graph_data(db=db, end_user_id=end_user_id) + + if "message" in result and result["statistics"]["total_nodes"] == 0: + api_logger.warning(f"社区图谱查询返回空结果: {result.get('message')}") + return success(data=result, msg=result.get("message", "查询成功")) + + api_logger.info( + f"成功获取社区图谱: end_user_id={end_user_id}, " + f"nodes={result['statistics']['total_nodes']}, " + f"edges={result['statistics']['total_edges']}" + ) + return success(data=result, msg="查询成功") + + except Exception as e: + api_logger.error(f"社区图谱查询失败: end_user_id={end_user_id}, error={str(e)}") + return fail(BizCode.INTERNAL_ERROR, "社区图谱查询失败", str(e)) + + @router.get("/read_end_user/profile", response_model=ApiResponse) async def get_end_user_profile( end_user_id: str, diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 8bacc112..d21df064 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -1727,6 +1727,166 @@ async def analytics_graph_data( # 辅助函数 +async def analytics_community_graph_data( + db: Session, + end_user_id: str, +) -> Dict[str, Any]: + """ + 获取社区图谱数据,包含 Community 节点、ExtractedEntity 节点及其关系。 + + Returns: + 包含 nodes、edges、statistics 的字典,格式与 analytics_graph_data 一致 + """ + try: + user_uuid = uuid.UUID(end_user_id) + repo = EndUserRepository(db) + end_user = repo.get_by_id(user_uuid) + if not end_user: + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "用户不存在" + } + + # 查询社区节点、实体节点、BELONGS_TO_COMMUNITY 边、实体间关系 + cypher = """ + MATCH (c:Community {end_user_id: $end_user_id}) + MATCH (e:ExtractedEntity {end_user_id: $end_user_id})-[b:BELONGS_TO_COMMUNITY]->(c) + OPTIONAL MATCH (e)-[r:EXTRACTED_RELATIONSHIP]-(e2:ExtractedEntity {end_user_id: $end_user_id}) + RETURN + elementId(c) AS c_id, + properties(c) AS c_props, + elementId(e) AS e_id, + properties(e) AS e_props, + elementId(b) AS b_id, + elementId(e2) AS e2_id, + properties(e2) AS e2_props, + elementId(r) AS r_id, + type(r) AS r_type, + properties(r) AS r_props, + startNode(r) = e AS r_from_e + """ + rows = await _neo4j_connector.execute_query(cypher, end_user_id=end_user_id) + + nodes_map: Dict[str, dict] = {} + edges_map: Dict[str, dict] = {} + # 记录每个 Community 对应的实体 id 列表 + community_members: Dict[str, list] = {} + + for row in rows: + # Community 节点 + c_id = row["c_id"] + if c_id and c_id not in nodes_map: + raw = row["c_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "community_id", "end_user_id", "member_count", "updated_at", + "name", "summary", "core_entities", + ) if k in raw} + nodes_map[c_id] = { + "id": c_id, + "label": "Community", + "properties": props, + } + + # ExtractedEntity 节点 (e) + e_id = row["e_id"] + if e_id and e_id not in nodes_map: + raw = row["e_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # 注入所属社区名称(c 是 e 直接归属的社区) + c_raw = row["c_props"] or {} + props["community_name"] = _clean_neo4j_value(c_raw.get("name")) or "" + nodes_map[e_id] = { + "id": e_id, + "label": "ExtractedEntity", + "properties": props, + } + + # ExtractedEntity 节点 (e2,可选) + e2_id = row.get("e2_id") + if e2_id and e2_id not in nodes_map: + raw = row["e2_props"] or {} + props = {k: _clean_neo4j_value(raw.get(k)) for k in ( + "name", "end_user_id", "description", "created_at", "entity_type", + ) if k in raw} + # e2 的社区归属在后处理阶段通过 community_members 补充 + props["community_name"] = "" + nodes_map[e2_id] = { + "id": e2_id, + "label": "ExtractedEntity", + "properties": props, + } + + # BELONGS_TO_COMMUNITY 边 + b_id = row["b_id"] + if b_id and b_id not in edges_map: + edges_map[b_id] = { + "id": b_id, + "source": e_id, + "target": c_id, + } + # 收集社区成员 id + if c_id and e_id: + community_members.setdefault(c_id, []) + if e_id not in community_members[c_id]: + community_members[c_id].append(e_id) + + # EXTRACTED_RELATIONSHIP 边(可选) + r_id = row.get("r_id") + if r_id and r_id not in edges_map and e2_id: + r_props = {k: _clean_neo4j_value(v) for k, v in (row["r_props"] or {}).items()} + source = e_id if row.get("r_from_e") else e2_id + target = e2_id if row.get("r_from_e") else e_id + edges_map[r_id] = { + "id": r_id, + "source": source, + "target": target, + } + + nodes = list(nodes_map.values()) + edges = list(edges_map.values()) + + # 为每个 Community 节点注入 member_entity_ids,同时补全 e2 节点的 community_name + for c_id, member_ids in community_members.items(): + c_node = nodes_map.get(c_id) + if c_node: + c_node["properties"]["member_entity_ids"] = member_ids + c_name = c_node["properties"].get("name") or "" + # 补全属于该社区但 community_name 为空的实体(即 e2 节点) + for eid in member_ids: + e_node = nodes_map.get(eid) + if e_node and e_node["label"] == "ExtractedEntity": + if not e_node["properties"].get("community_name"): + e_node["properties"]["community_name"] = c_name + + node_type_counts: Dict[str, int] = {} + for n in nodes: + node_type_counts[n["label"]] = node_type_counts.get(n["label"], 0) + 1 + + return { + "nodes": nodes, + "edges": edges, + "statistics": { + "total_nodes": len(nodes), + "total_edges": len(edges), + "node_types": node_type_counts, + } + } + + except ValueError: + logger.error(f"无效的 end_user_id 格式: {end_user_id}") + return { + "nodes": [], "edges": [], + "statistics": {"total_nodes": 0, "total_edges": 0, "node_types": {}, "edge_types": {}}, + "message": "无效的用户ID格式" + } + except Exception as e: + logger.error(f"获取社区图谱数据失败: {str(e)}", exc_info=True) + raise + + async def _extract_node_properties(label: str, properties: Dict[str, Any],node_id: str) -> Dict[str, Any]: """ 根据节点类型提取需要的属性字段