From b4615bacdcd8799275dcd317c2328f4ff830ac70 Mon Sep 17 00:00:00 2001 From: lanceyq <1982376970@qq.com> Date: Thu, 19 Mar 2026 20:17:43 +0800 Subject: [PATCH] [changes] Modify the execution conditions of the task --- .../memory_dashboard_controller.py | 12 ++------- .../clustering_engine/label_propagation.py | 6 +++++ .../neo4j/community_repository.py | 12 +++++++++ api/app/repositories/neo4j/cypher_queries.py | 26 ++++++++++++++++--- api/app/tasks.py | 11 -------- 5 files changed, 43 insertions(+), 24 deletions(-) diff --git a/api/app/controllers/memory_dashboard_controller.py b/api/app/controllers/memory_dashboard_controller.py index 552b3483..cc0efab3 100644 --- a/api/app/controllers/memory_dashboard_controller.py +++ b/api/app/controllers/memory_dashboard_controller.py @@ -195,18 +195,10 @@ async def get_workspace_end_users( api_logger.warning(f"Redis 缓存写入失败: {str(e)}") # 触发社区聚类补全任务(异步,不阻塞接口响应) - # 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类 try: from app.tasks import init_community_clustering_for_users - from app.aioRedis import aio_redis_get - - done_key = f"community_cluster:done:workspace:{workspace_id}" - already_done = await aio_redis_get(done_key) - if already_done: - api_logger.info(f"工作空间 {workspace_id} 社区数据已完整,跳过本次聚类任务投递") - else: - init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id)) - api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}") + init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id)) + api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}") except Exception as e: api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}") diff --git a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py index 23ce3901..58fd7f86 100644 --- a/api/app/core/memory/storage_services/clustering_engine/label_propagation.py +++ b/api/app/core/memory/storage_services/clustering_engine/label_propagation.py @@ -425,6 +425,12 @@ class LabelPropagationEngine: - name / summary:若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底 """ try: + # 先检查属性是否已完整,完整则跳过,避免重复生成 + check_embedding = bool(self.embedding_model_id) + if await self.repo.is_community_complete(community_id, end_user_id, check_embedding=check_embedding): + logger.debug(f"[Clustering] 社区 {community_id} 属性已完整,跳过生成") + return + members = await self.repo.get_community_members(community_id, end_user_id) if not members: return diff --git a/api/app/repositories/neo4j/community_repository.py b/api/app/repositories/neo4j/community_repository.py index f0febf24..e89ee451 100644 --- a/api/app/repositories/neo4j/community_repository.py +++ b/api/app/repositories/neo4j/community_repository.py @@ -21,6 +21,8 @@ from app.repositories.neo4j.cypher_queries import ( UPDATE_COMMUNITY_METADATA, GET_INCOMPLETE_COMMUNITIES, GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING, + CHECK_COMMUNITY_IS_COMPLETE, + CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING, ) logger = logging.getLogger(__name__) @@ -187,6 +189,16 @@ class CommunityRepository: logger.error(f"get_incomplete_communities failed: {e}") return [] + async def is_community_complete(self, community_id: str, end_user_id: str, check_embedding: bool = False) -> bool: + """检查单个社区节点的属性是否完整。""" + try: + query = CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING if check_embedding else CHECK_COMMUNITY_IS_COMPLETE + result = await self.connector.execute_query(query, community_id=community_id, end_user_id=end_user_id) + return result[0]["is_complete"] if result else False + except Exception as e: + logger.error(f"is_community_complete failed: {e}") + return False + async def update_community_metadata( self, community_id: str, diff --git a/api/app/repositories/neo4j/cypher_queries.py b/api/app/repositories/neo4j/cypher_queries.py index 1ec5eaba..66d24fab 100644 --- a/api/app/repositories/neo4j/cypher_queries.py +++ b/api/app/repositories/neo4j/cypher_queries.py @@ -1204,6 +1204,25 @@ RETURN startNode(r) = e AS r_from_e """ +CHECK_COMMUNITY_IS_COMPLETE = """ +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +RETURN ( + c.name IS NOT NULL AND c.name <> '' AND + c.summary IS NOT NULL AND c.summary <> '' AND + c.core_entities IS NOT NULL +) AS is_complete +""" + +CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """ +MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id}) +RETURN ( + c.name IS NOT NULL AND c.name <> '' AND + c.summary IS NOT NULL AND c.summary <> '' AND + c.core_entities IS NOT NULL AND + c.summary_embedding IS NOT NULL +) AS is_complete +""" + GET_INCOMPLETE_COMMUNITIES = """ MATCH (c:Community {end_user_id: $end_user_id}) WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL @@ -1213,8 +1232,9 @@ RETURN c.community_id AS community_id GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """ MATCH (c:Community {end_user_id: $end_user_id}) -WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL - OR c.name = '' OR c.summary = '' - OR c.summary_embedding IS NULL +WHERE c.name IS NULL OR c.name = '' + OR c.summary IS NULL OR c.summary = '' + OR c.core_entities IS NULL + OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)') RETURN c.community_id AS community_id """ diff --git a/api/app/tasks.py b/api/app/tasks.py index 3d8a2456..3a237d82 100644 --- a/api/app/tasks.py +++ b/api/app/tasks.py @@ -2820,17 +2820,6 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace result = loop.run_until_complete(_run()) result["elapsed_time"] = time.time() - start_time result["task_id"] = self.request.id - - # 所有用户均完整(无需初始化也无失败),写入 Redis 标记,1小时内不再重复投递 - if workspace_id and result.get("initialized", 0) == 0 and result.get("failed", 0) == 0: - try: - _r = get_sync_redis_client() - if _r: - _r.set(f"community_cluster:done:workspace:{workspace_id}", "1", ex=3600) - logger.info(f"[CommunityCluster] 工作空间 {workspace_id} 数据完整,已写入完成标记(1小时有效)") - except Exception as e: - logger.warning(f"[CommunityCluster] 写入完成标记失败: {e}") - return result except Exception as e: