[changes] Batch mode for metadata creation and unified management of indexes

This commit is contained in:
lanceyq
2026-03-16 23:06:41 +08:00
parent 8e6288bca8
commit 56adca9f22
6 changed files with 156 additions and 333 deletions

View File

@@ -7,6 +7,7 @@
- 增量更新incremental_update新实体到达时只处理新实体及其邻居
"""
import asyncio
import logging
import uuid
from math import sqrt
@@ -114,7 +115,7 @@ class LabelPropagationEngine:
- 每批独立跑 MAX_ITERATIONS 轮 LPA批次间通过 labels 传递社区信息
- 所有批次完成后统一 flush 和 merge
"""
BATCH_SIZE = 2000 # 每批实体数,可按需调整
BATCH_SIZE = 888 # 每批实体数,可按需调整
# 轻量查询:只获取总数和 ID 列表,不加载 embedding 等大字段
total_count = await self.repo.get_entity_count(end_user_id)
@@ -203,8 +204,7 @@ class LabelPropagationEngine:
if e.get("community_id")
})
logger.info(f"[Clustering] 合并后实际存活社区数: {len(surviving_community_ids)}")
for cid in surviving_community_ids:
await self._generate_community_metadata(cid, end_user_id)
await self._generate_community_metadata(surviving_community_ids, end_user_id)
async def incremental_update(
self, new_entity_ids: List[str], end_user_id: str
@@ -261,7 +261,7 @@ class LabelPropagationEngine:
logger.debug(
f"[Clustering] 新实体 {entity_id}{len(neighbors)} 个无社区邻居 → 新社区 {new_cid}"
)
await self._generate_community_metadata(new_cid, end_user_id)
await self._generate_community_metadata([new_cid], end_user_id)
else:
# 加入得票最多的社区
await self.repo.assign_entity_to_community(entity_id, target_cid, end_user_id)
@@ -273,7 +273,7 @@ class LabelPropagationEngine:
await self._evaluate_merge(
list(community_ids_in_neighbors), end_user_id
)
await self._generate_community_metadata(target_cid, end_user_id)
await self._generate_community_metadata([target_cid], end_user_id)
async def _evaluate_merge(
self, community_ids: List[str], end_user_id: str
@@ -437,89 +437,122 @@ class LabelPropagationEngine:
except Exception:
return None
@staticmethod
def _build_entity_lines(members: List[Dict]) -> List[str]:
"""将实体列表格式化为 prompt 行,包含 name、aliases、description。"""
lines = []
for m in members:
m_name = m.get("name", "")
aliases = m.get("aliases") or []
description = m.get("description") or ""
aliases_str = f"(别名:{''.join(aliases)}" if aliases else ""
desc_str = f"{description}" if description else ""
lines.append(f"- {m_name}{aliases_str}{desc_str}")
return lines
async def _generate_community_metadata(
self, community_id: str, end_user_id: str
self, community_ids: List[str], end_user_id: str
) -> None:
"""
为社区生成并写入元数据:名称、摘要、核心实体
一个或多个社区生成并写入元数据。
- core_entities按 activation_value 排序取 top-N 实体名称列表(无需 LLM
- name / summary若有 llm_model_id 则调用 LLM 生成,否则用实体名称拼接兜底
NOTE: core_entities按照激活值高低排序会造成对边缘信息检索返回消息质量不高。
流程:
1. 逐个社区调 LLM 生成 name / summary串行
2. 收集所有 summary一次性批量 embed
3. 单个社区用 update_community_metadata多个用 batch_update_community_metadata
"""
try:
members = await self.repo.get_community_members(community_id, end_user_id)
if not members:
return
if not community_ids:
return
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
# --- 阶段1并发调 LLM 生成每个社区的 name / summary ---
async def _build_one(cid: str):
members = await self.repo.get_community_members(cid, end_user_id)
if not members:
return None
# 核心实体:按 activation_value 降序取 top-N
sorted_members = sorted(
members,
key=lambda m: m.get("activation_value") or 0,
reverse=True,
)
core_entities = [m["name"] for m in sorted_members[:CORE_ENTITY_LIMIT] if m.get("name")]
all_names = [m["name"] for m in members if m.get("name")]
name = "".join(core_entities[:3]) if core_entities else community_id[:8]
summary = f"包含实体:{', '.join(all_names)}"
entity_list_str = "\n".join(self._build_entity_lines(members))
prompt = (
f"以下是一组语义相关的实体:\n{entity_list_str}\n\n"
f"请为这组实体所代表的主题:\n"
f"1. 起一个简洁的中文名称不超过10个字\n"
f"2. 写一句话摘要不超过50个字\n\n"
f"严格按以下格式输出,不要有其他内容:\n"
f"名称:<名称>\n摘要:<摘要>"
)
with get_db_context() as db:
llm_client = MemoryClientFactory(db).get_llm_client(self.llm_model_id)
response = await llm_client.chat([{"role": "user", "content": prompt}])
text = response.content if hasattr(response, "content") else str(response)
# 若有 LLM 配置,调用 LLM 生成更好的名称和摘要
if self.llm_model_id:
try:
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
name, summary = "", ""
for line in text.strip().splitlines():
if line.startswith("名称:"):
name = line[3:].strip()
elif line.startswith("摘要:"):
summary = line[3:].strip()
entity_list_str = "".join(all_names)
prompt = (
f"以下是一组语义相关的实体:{entity_list_str}\n\n"
f"请为这组实体所代表的主题:\n"
f"1. 起一个简洁的中文名称不超过10个字\n"
f"2. 写一句话摘要不超过50个字\n\n"
f"严格按以下格式输出,不要有其他内容:\n"
f"名称:<名称>\n摘要:<摘要>"
)
with get_db_context() as db:
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client(self.llm_model_id)
response = await llm_client.chat([{"role": "user", "content": prompt}])
text = response.content if hasattr(response, "content") else str(response)
return {
"community_id": cid,
"end_user_id": end_user_id,
"name": name,
"summary": summary,
"core_entities": core_entities,
"summary_embedding": None,
}
for line in text.strip().splitlines():
if line.startswith("名称:"):
name = line[3:].strip()
elif line.startswith("摘要:"):
summary = line[3:].strip()
except Exception as e:
logger.warning(f"[Clustering] LLM 生成社区元数据失败,使用兜底值: {e}")
results = await asyncio.gather(
*[_build_one(cid) for cid in community_ids],
return_exceptions=True,
)
metadata_list = []
for cid, res in zip(community_ids, results):
if isinstance(res, Exception):
logger.error(f"[Clustering] 社区 {cid} 元数据准备失败: {res}", exc_info=res)
elif res is not None:
metadata_list.append(res)
# 生成 summary_embedding
summary_embedding = None
if self.embedding_model_id and summary:
try:
from app.db import get_db_context
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
with get_db_context() as db:
embedder = MemoryClientFactory(db).get_embedder_client(self.embedding_model_id)
results = await embedder.response([summary])
summary_embedding = results[0] if results else None
except Exception as e:
logger.warning(f"[Clustering] 社区 {community_id} 生成 summary_embedding 失败: {e}")
if not metadata_list:
return
# --- 阶段2批量生成 summary_embedding ---
summaries = [m["summary"] for m in metadata_list]
with get_db_context() as db:
embedder = MemoryClientFactory(db).get_embedder_client(self.embedding_model_id)
embeddings = await embedder.response(summaries)
for i, meta in enumerate(metadata_list):
meta["summary_embedding"] = embeddings[i] if i < len(embeddings) else None
# --- 阶段3写入单个 or 批量)---
if len(metadata_list) == 1:
m = metadata_list[0]
result = await self.repo.update_community_metadata(
community_id=community_id,
end_user_id=end_user_id,
name=name,
summary=summary,
core_entities=core_entities,
summary_embedding=summary_embedding,
community_id=m["community_id"],
end_user_id=m["end_user_id"],
name=m["name"],
summary=m["summary"],
core_entities=m["core_entities"],
summary_embedding=m["summary_embedding"],
)
if result:
logger.info(f"[Clustering] 社区 {community_id} 元数据写入成功: name={name}, summary={summary[:30]}...")
logger.info(f"[Clustering] 社区 {m['community_id']} 元数据写入成功: name={m['name']}, summary={m['summary'][:30]}...")
else:
logger.warning(f"[Clustering] 社区 {community_id} 元数据写入返回 False")
except Exception as e:
logger.error(f"[Clustering] _generate_community_metadata failed for {community_id}: {e}", exc_info=True)
logger.warning(f"[Clustering] 社区 {m['community_id']} 元数据写入返回 False")
else:
ok = await self.repo.batch_update_community_metadata(metadata_list)
if ok:
logger.info(f"[Clustering] 批量写入 {len(metadata_list)} 个社区元数据成功")
else:
logger.warning(f"[Clustering] 批量写入社区元数据失败")
@staticmethod
def _new_community_id() -> str: