refactor(memory): remove expired_at field and add dialog_at timestamp

Remove the deprecated expired_at field from all graph models, Neo4j
Cypher queries, repositories, and pipeline code. Replace with dialog_at
on StatementNode to track the original dialog timestamp.

- Strip expired_at from DialogueNode, ChunkNode, StatementNode,
  ExtractedEntityNode, edges, and all Cypher queries
- Add dialog_at to MessageItem schema and propagate through extraction
  and graph build steps
- Extract emotion/metadata async submission from WritePipeline into
  a generic _submit_celery_task helper
- Add post_store_dedup_and_alias_merge Celery task for async alias
  merging and second-layer dedup after Neo4j write
- Switch pytest async backend from anyio to asyncio_mode=auto
This commit is contained in:
lanceyq
2026-04-30 12:20:47 +08:00
parent d66d601e41
commit cf389bb978
30 changed files with 531 additions and 278 deletions

View File

@@ -1564,6 +1564,186 @@ def extract_emotion_batch_task(
_shutdown_loop_gracefully(loop)
@celery_app.task(
bind=True,
name="app.tasks.post_store_dedup_and_alias_merge",
max_retries=1,
default_retry_delay=30,
)
def post_store_dedup_and_alias_merge_task(
self,
end_user_id: str,
entity_ids: List[str],
llm_model_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Celery task: 写入后异步执行 Neo4j 别名归并 + 第二层去重。
在主写入流水线将第一层去重结果写入 Neo4j 之后执行:
1. Neo4j 别名归并:将 "别名属于" 边的 source.name 合并到 target.aliases
2. Neo4j 边重定向:将指向别名节点的边重定向到目标节点
3. 第二层去重:与 Neo4j 中已有的同组实体做联合去重
Args:
end_user_id: 终端用户 ID
entity_ids: 本轮写入的实体 ID 列表(用于第二层去重的候选检索)
llm_model_id: LLM 模型 UUID用于第二层去重的 LLM 兜底判定)
"""
task_id = self.request.id
logger.info(
f"[PostStore] 开始异步别名归并+第二层去重: "
f"end_user_id={end_user_id}, entity_count={len(entity_ids)}, "
f"task_id={task_id}"
)
start_time = time.time()
async def _run() -> Dict[str, Any]:
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.repositories.neo4j.cypher_queries import (
MERGE_ALIAS_BELONGS_TO,
REDIRECT_ALIAS_EDGES,
)
connector = Neo4jConnector()
result_info: Dict[str, Any] = {}
try:
# ── 1. Neo4j 别名归并 ──
try:
records = await connector.execute_query(
MERGE_ALIAS_BELONGS_TO,
end_user_id=end_user_id,
)
merged_count = len(records) if records else 0
result_info["alias_merged"] = merged_count
logger.info(f"[PostStore] Neo4j 别名归并完成,影响 {merged_count} 条记录")
except Exception as e:
logger.warning(f"[PostStore] Neo4j 别名归并失败: {e}")
result_info["alias_merge_error"] = str(e)
# ── 2. Neo4j 边重定向 ──
try:
redirect_records = await connector.execute_query(
REDIRECT_ALIAS_EDGES,
end_user_id=end_user_id,
)
redirect_count = len(redirect_records) if redirect_records else 0
result_info["edges_redirected"] = redirect_count
logger.info(f"[PostStore] Neo4j 边重定向完成,影响 {redirect_count} 条记录")
except Exception as e:
logger.warning(f"[PostStore] Neo4j 边重定向失败: {e}")
result_info["redirect_error"] = str(e)
# ── 3. 第二层去重(与 Neo4j 已有实体联合去重) ──
try:
from app.core.memory.storage_services.extraction_engine.deduplication.second_layer_dedup import (
second_layer_dedup_and_merge_with_neo4j,
)
from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import (
clean_cross_role_aliases,
)
from app.repositories.neo4j.cypher_queries import EXTRACTED_ENTITY_NODE_SAVE
# 从 Neo4j 加载本轮写入的实体(第一层去重后的结果)
load_query = """
UNWIND $entity_ids AS eid
MATCH (e:ExtractedEntity {id: eid})
RETURN e {.*} AS entity
"""
entity_records = await connector.execute_query(
load_query, entity_ids=entity_ids
)
if entity_records:
from app.core.memory.storage_services.extraction_engine.deduplication.second_layer_dedup import (
_row_to_entity,
)
current_entities = []
for rec in entity_records:
try:
entity_data = rec.get("entity") or rec
current_entities.append(_row_to_entity(entity_data))
except Exception:
pass
if current_entities:
# 构建 LLM client如果有 llm_model_id
llm_client = None
if llm_model_id:
try:
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
with get_db_context() as db:
factory = MemoryClientFactory(db)
llm_client = factory.get_llm_client(llm_model_id)
except Exception as e:
logger.warning(f"[PostStore] 构建 LLM client 失败,跳过 LLM 兜底: {e}")
fused_entities, _, _ = await second_layer_dedup_and_merge_with_neo4j(
connector=connector,
end_user_id=end_user_id,
entity_nodes=current_entities,
statement_entity_edges=[],
entity_entity_edges=[],
llm_client=llm_client,
)
# 清洗跨角色别名污染
clean_cross_role_aliases(fused_entities)
# 将融合后的实体回写 Neo4j
if fused_entities:
entity_data = [e.model_dump() for e in fused_entities]
await connector.execute_query(
EXTRACTED_ENTITY_NODE_SAVE, entities=entity_data
)
result_info["layer2_input"] = len(current_entities)
result_info["layer2_output"] = len(fused_entities)
logger.info(
f"[PostStore] 第二层去重完成: "
f"{len(current_entities)}{len(fused_entities)} 个实体"
)
else:
result_info["layer2_skipped"] = "no entities loaded"
else:
result_info["layer2_skipped"] = "no entity records found"
except Exception as e:
logger.warning(f"[PostStore] 第二层去重失败(不影响主流程): {e}", exc_info=True)
result_info["layer2_error"] = str(e)
finally:
await connector.close()
return result_info
loop = None
try:
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed = time.time() - start_time
logger.info(
f"[PostStore] 任务完成: {result}, 耗时={elapsed:.2f}s, task_id={task_id}"
)
return {
"status": "SUCCESS",
**result,
"elapsed_time": elapsed,
"task_id": task_id,
}
except Exception as e:
elapsed = time.time() - start_time
logger.error(
f"[PostStore] 任务失败: {e}, 耗时={elapsed:.2f}s",
exc_info=True,
)
raise self.retry(exc=e)
finally:
if loop:
_shutdown_loop_gracefully(loop)
@celery_app.task(
bind=True,
name="app.tasks.extract_metadata_batch",