Merge pull request #634 from SuanmoSuanyangTechnology/fix/celery

[changes] Modify the execution conditions of the task
This commit is contained in:
Ke Sun
2026-03-19 20:24:43 +08:00
committed by GitHub
5 changed files with 43 additions and 24 deletions

View File

@@ -195,18 +195,10 @@ async def get_workspace_end_users(
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
# 触发社区聚类补全任务(异步,不阻塞接口响应)
# 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类
try:
from app.tasks import init_community_clustering_for_users
from app.aioRedis import aio_redis_get
done_key = f"community_cluster:done:workspace:{workspace_id}"
already_done = await aio_redis_get(done_key)
if already_done:
api_logger.info(f"工作空间 {workspace_id} 社区数据已完整,跳过本次聚类任务投递")
else:
init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id))
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id))
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
except Exception as e:
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")

View File

@@ -425,6 +425,12 @@ class LabelPropagationEngine:
- name / summary若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底
"""
try:
# 先检查属性是否已完整,完整则跳过,避免重复生成
check_embedding = bool(self.embedding_model_id)
if await self.repo.is_community_complete(community_id, end_user_id, check_embedding=check_embedding):
logger.debug(f"[Clustering] 社区 {community_id} 属性已完整,跳过生成")
return
members = await self.repo.get_community_members(community_id, end_user_id)
if not members:
return

View File

@@ -21,6 +21,8 @@ from app.repositories.neo4j.cypher_queries import (
UPDATE_COMMUNITY_METADATA,
GET_INCOMPLETE_COMMUNITIES,
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING,
CHECK_COMMUNITY_IS_COMPLETE,
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING,
)
logger = logging.getLogger(__name__)
@@ -187,6 +189,16 @@ class CommunityRepository:
logger.error(f"get_incomplete_communities failed: {e}")
return []
async def is_community_complete(self, community_id: str, end_user_id: str, check_embedding: bool = False) -> bool:
"""检查单个社区节点的属性是否完整。"""
try:
query = CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING if check_embedding else CHECK_COMMUNITY_IS_COMPLETE
result = await self.connector.execute_query(query, community_id=community_id, end_user_id=end_user_id)
return result[0]["is_complete"] if result else False
except Exception as e:
logger.error(f"is_community_complete failed: {e}")
return False
async def update_community_metadata(
self,
community_id: str,

View File

@@ -1204,6 +1204,25 @@ RETURN
startNode(r) = e AS r_from_e
"""
CHECK_COMMUNITY_IS_COMPLETE = """
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
RETURN (
c.name IS NOT NULL AND c.name <> '' AND
c.summary IS NOT NULL AND c.summary <> '' AND
c.core_entities IS NOT NULL
) AS is_complete
"""
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
RETURN (
c.name IS NOT NULL AND c.name <> '' AND
c.summary IS NOT NULL AND c.summary <> '' AND
c.core_entities IS NOT NULL AND
c.summary_embedding IS NOT NULL
) AS is_complete
"""
GET_INCOMPLETE_COMMUNITIES = """
MATCH (c:Community {end_user_id: $end_user_id})
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
@@ -1213,8 +1232,9 @@ RETURN c.community_id AS community_id
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """
MATCH (c:Community {end_user_id: $end_user_id})
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
OR c.name = '' OR c.summary = ''
OR c.summary_embedding IS NULL
WHERE c.name IS NULL OR c.name = ''
OR c.summary IS NULL OR c.summary = ''
OR c.core_entities IS NULL
OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)')
RETURN c.community_id AS community_id
"""

View File

@@ -2820,17 +2820,6 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time
result["task_id"] = self.request.id
# 所有用户均完整(无需初始化也无失败),写入 Redis 标记1小时内不再重复投递
if workspace_id and result.get("initialized", 0) == 0 and result.get("failed", 0) == 0:
try:
_r = get_sync_redis_client()
if _r:
_r.set(f"community_cluster:done:workspace:{workspace_id}", "1", ex=3600)
logger.info(f"[CommunityCluster] 工作空间 {workspace_id} 数据完整已写入完成标记1小时有效")
except Exception as e:
logger.warning(f"[CommunityCluster] 写入完成标记失败: {e}")
return result
except Exception as e: