[changes] Modify the execution conditions of the task
This commit is contained in:
@@ -195,18 +195,10 @@ async def get_workspace_end_users(
|
|||||||
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
api_logger.warning(f"Redis 缓存写入失败: {str(e)}")
|
||||||
|
|
||||||
# 触发社区聚类补全任务(异步,不阻塞接口响应)
|
# 触发社区聚类补全任务(异步,不阻塞接口响应)
|
||||||
# 对有 ExtractedEntity 但无 Community 节点的存量用户自动补跑全量聚类
|
|
||||||
try:
|
try:
|
||||||
from app.tasks import init_community_clustering_for_users
|
from app.tasks import init_community_clustering_for_users
|
||||||
from app.aioRedis import aio_redis_get
|
init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id))
|
||||||
|
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
|
||||||
done_key = f"community_cluster:done:workspace:{workspace_id}"
|
|
||||||
already_done = await aio_redis_get(done_key)
|
|
||||||
if already_done:
|
|
||||||
api_logger.info(f"工作空间 {workspace_id} 社区数据已完整,跳过本次聚类任务投递")
|
|
||||||
else:
|
|
||||||
init_community_clustering_for_users.delay(end_user_ids=end_user_ids, workspace_id=str(workspace_id))
|
|
||||||
api_logger.info(f"已触发社区聚类补全任务,候选用户数: {len(end_user_ids)}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
|
api_logger.warning(f"触发社区聚类补全任务失败(不影响主流程): {str(e)}")
|
||||||
|
|
||||||
|
|||||||
@@ -425,6 +425,12 @@ class LabelPropagationEngine:
|
|||||||
- name / summary:若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底
|
- name / summary:若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
# 先检查属性是否已完整,完整则跳过,避免重复生成
|
||||||
|
check_embedding = bool(self.embedding_model_id)
|
||||||
|
if await self.repo.is_community_complete(community_id, end_user_id, check_embedding=check_embedding):
|
||||||
|
logger.debug(f"[Clustering] 社区 {community_id} 属性已完整,跳过生成")
|
||||||
|
return
|
||||||
|
|
||||||
members = await self.repo.get_community_members(community_id, end_user_id)
|
members = await self.repo.get_community_members(community_id, end_user_id)
|
||||||
if not members:
|
if not members:
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ from app.repositories.neo4j.cypher_queries import (
|
|||||||
UPDATE_COMMUNITY_METADATA,
|
UPDATE_COMMUNITY_METADATA,
|
||||||
GET_INCOMPLETE_COMMUNITIES,
|
GET_INCOMPLETE_COMMUNITIES,
|
||||||
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING,
|
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING,
|
||||||
|
CHECK_COMMUNITY_IS_COMPLETE,
|
||||||
|
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -187,6 +189,16 @@ class CommunityRepository:
|
|||||||
logger.error(f"get_incomplete_communities failed: {e}")
|
logger.error(f"get_incomplete_communities failed: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def is_community_complete(self, community_id: str, end_user_id: str, check_embedding: bool = False) -> bool:
|
||||||
|
"""检查单个社区节点的属性是否完整。"""
|
||||||
|
try:
|
||||||
|
query = CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING if check_embedding else CHECK_COMMUNITY_IS_COMPLETE
|
||||||
|
result = await self.connector.execute_query(query, community_id=community_id, end_user_id=end_user_id)
|
||||||
|
return result[0]["is_complete"] if result else False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"is_community_complete failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
async def update_community_metadata(
|
async def update_community_metadata(
|
||||||
self,
|
self,
|
||||||
community_id: str,
|
community_id: str,
|
||||||
|
|||||||
@@ -1204,6 +1204,25 @@ RETURN
|
|||||||
startNode(r) = e AS r_from_e
|
startNode(r) = e AS r_from_e
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
CHECK_COMMUNITY_IS_COMPLETE = """
|
||||||
|
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
|
||||||
|
RETURN (
|
||||||
|
c.name IS NOT NULL AND c.name <> '' AND
|
||||||
|
c.summary IS NOT NULL AND c.summary <> '' AND
|
||||||
|
c.core_entities IS NOT NULL
|
||||||
|
) AS is_complete
|
||||||
|
"""
|
||||||
|
|
||||||
|
CHECK_COMMUNITY_IS_COMPLETE_WITH_EMBEDDING = """
|
||||||
|
MATCH (c:Community {community_id: $community_id, end_user_id: $end_user_id})
|
||||||
|
RETURN (
|
||||||
|
c.name IS NOT NULL AND c.name <> '' AND
|
||||||
|
c.summary IS NOT NULL AND c.summary <> '' AND
|
||||||
|
c.core_entities IS NOT NULL AND
|
||||||
|
c.summary_embedding IS NOT NULL
|
||||||
|
) AS is_complete
|
||||||
|
"""
|
||||||
|
|
||||||
GET_INCOMPLETE_COMMUNITIES = """
|
GET_INCOMPLETE_COMMUNITIES = """
|
||||||
MATCH (c:Community {end_user_id: $end_user_id})
|
MATCH (c:Community {end_user_id: $end_user_id})
|
||||||
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
|
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
|
||||||
@@ -1213,8 +1232,9 @@ RETURN c.community_id AS community_id
|
|||||||
|
|
||||||
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """
|
GET_INCOMPLETE_COMMUNITIES_WITH_EMBEDDING = """
|
||||||
MATCH (c:Community {end_user_id: $end_user_id})
|
MATCH (c:Community {end_user_id: $end_user_id})
|
||||||
WHERE c.name IS NULL OR c.summary IS NULL OR c.core_entities IS NULL
|
WHERE c.name IS NULL OR c.name = ''
|
||||||
OR c.name = '' OR c.summary = ''
|
OR c.summary IS NULL OR c.summary = ''
|
||||||
OR c.summary_embedding IS NULL
|
OR c.core_entities IS NULL
|
||||||
|
OR (c.summary_embedding IS NULL AND c.summary IS NOT NULL AND c.summary <> '(empty)')
|
||||||
RETURN c.community_id AS community_id
|
RETURN c.community_id AS community_id
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -2820,17 +2820,6 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
|
|||||||
result = loop.run_until_complete(_run())
|
result = loop.run_until_complete(_run())
|
||||||
result["elapsed_time"] = time.time() - start_time
|
result["elapsed_time"] = time.time() - start_time
|
||||||
result["task_id"] = self.request.id
|
result["task_id"] = self.request.id
|
||||||
|
|
||||||
# 所有用户均完整(无需初始化也无失败),写入 Redis 标记,1小时内不再重复投递
|
|
||||||
if workspace_id and result.get("initialized", 0) == 0 and result.get("failed", 0) == 0:
|
|
||||||
try:
|
|
||||||
_r = get_sync_redis_client()
|
|
||||||
if _r:
|
|
||||||
_r.set(f"community_cluster:done:workspace:{workspace_id}", "1", ex=3600)
|
|
||||||
logger.info(f"[CommunityCluster] 工作空间 {workspace_id} 数据完整,已写入完成标记(1小时有效)")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"[CommunityCluster] 写入完成标记失败: {e}")
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user