[changes]Community node attribute check

This commit is contained in:
lanceyq
2026-03-19 19:24:37 +08:00
parent d798d101f7
commit f644c84fbb
5 changed files with 123 additions and 14 deletions

View File

@@ -2675,13 +2675,15 @@ def write_perceptual_memory(
time_limit=7200, # 2小时硬超时
soft_time_limit=6900,
)
def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[str, Any]:
def init_community_clustering_for_users(self, end_user_ids: List[str], workspace_id: Optional[str] = None) -> Dict[str, Any]:
"""触发型任务:检查指定用户列表,对有 ExtractedEntity 但无 Community 节点的用户执行全量聚类。
由 /dashboard/end_users 接口触发,已有社区节点的用户直接跳过。
任务完成且所有用户数据均完整时,写入 Redis 标记,避免下次重复投递。
Args:
end_user_ids: 需要检查的用户 ID 列表
workspace_id: 工作空间 ID用于完成标记
Returns:
包含任务执行结果的字典
@@ -2707,6 +2709,7 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
# 批量预取所有用户的配置(内置兜底:用户配置不可用时自动回退到工作空间默认配置)
user_llm_map: Dict[str, Optional[str]] = {}
user_embedding_map: Dict[str, Optional[str]] = {}
try:
with get_db_context() as db:
from app.services.memory_agent_service import get_end_users_connected_configs_batch
@@ -2718,21 +2721,54 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
try:
cfg = MemoryConfigService(db).load_memory_config(config_id=config_id)
user_llm_map[uid] = str(cfg.llm_model_id) if cfg.llm_model_id else None
user_embedding_map[uid] = str(cfg.embedding_model_id) if cfg.embedding_model_id else None
except Exception as e:
logger.warning(f"[CommunityCluster] 用户 {uid} 加载 LLM 配置失败,将使用 None: {e}")
logger.warning(f"[CommunityCluster] 用户 {uid} 加载配置失败,将使用 None: {e}")
user_llm_map[uid] = None
user_embedding_map[uid] = None
else:
user_llm_map[uid] = None
user_embedding_map[uid] = None
except Exception as e:
logger.warning(f"[CommunityCluster] 批量获取 LLM 配置失败,所有用户将使用 None: {e}")
logger.warning(f"[CommunityCluster] 批量获取配置失败,所有用户将使用 None: {e}")
for end_user_id in end_user_ids:
try:
# 已有社区节点则跳过
# 已有社区节点时,检查是否存在属性不完整的节点
has_communities = await repo.has_communities(end_user_id)
if has_communities:
skipped += 1
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 已有社区节点,跳过")
llm_model_id = user_llm_map.get(end_user_id)
embedding_model_id = user_embedding_map.get(end_user_id)
incomplete_ids = await repo.get_incomplete_communities(
end_user_id, check_embedding=bool(embedding_model_id)
)
if not incomplete_ids:
skipped += 1
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 社区节点均完整,跳过")
continue
# 对不完整的社区节点逐一补全元数据
engine = LabelPropagationEngine(
connector=connector,
llm_model_id=llm_model_id,
embedding_model_id=embedding_model_id,
)
logger.info(
f"[CommunityCluster] 用户 {end_user_id} 发现 {len(incomplete_ids)} 个属性不完整的社区,开始补全"
)
patch_ok = 0
patch_fail = 0
for cid in incomplete_ids:
try:
await engine._generate_community_metadata(cid, end_user_id)
patch_ok += 1
except Exception as patch_err:
patch_fail += 1
logger.error(f"[CommunityCluster] 社区 {cid} 元数据补全失败: {patch_err}")
logger.info(
f"[CommunityCluster] 用户 {end_user_id} 社区补全完成: 成功={patch_ok}, 失败={patch_fail}"
)
initialized += 1
continue
# 检查是否有 ExtractedEntity 节点
@@ -2742,11 +2778,13 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
logger.debug(f"[CommunityCluster] 用户 {end_user_id} 无实体节点,跳过")
continue
# 每个用户使用自己的 llm_model_id
# 每个用户使用自己的 llm_model_id / embedding_model_id
llm_model_id = user_llm_map.get(end_user_id)
embedding_model_id = user_embedding_map.get(end_user_id)
engine = LabelPropagationEngine(
connector=connector,
llm_model_id=llm_model_id,
embedding_model_id=embedding_model_id,
)
logger.info(f"[CommunityCluster] 用户 {end_user_id}{len(entities)} 个实体开始全量聚类llm_model_id={llm_model_id}")
@@ -2782,6 +2820,17 @@ def init_community_clustering_for_users(self, end_user_ids: List[str]) -> Dict[s
result = loop.run_until_complete(_run())
result["elapsed_time"] = time.time() - start_time
result["task_id"] = self.request.id
# 所有用户均完整(无需初始化也无失败),写入 Redis 标记1小时内不再重复投递
if workspace_id and result.get("initialized", 0) == 0 and result.get("failed", 0) == 0:
try:
_r = get_sync_redis_client()
if _r:
_r.set(f"community_cluster:done:workspace:{workspace_id}", "1", ex=3600)
logger.info(f"[CommunityCluster] 工作空间 {workspace_id} 数据完整已写入完成标记1小时有效")
except Exception as e:
logger.warning(f"[CommunityCluster] 写入完成标记失败: {e}")
return result
except Exception as e: