Merge branch 'refs/heads/develop' into feature/agent-tool_xjn

This commit is contained in:
Timebomb2018
2026-04-10 18:30:46 +08:00
44 changed files with 1522 additions and 227 deletions

View File

@@ -111,6 +111,9 @@ celery_app.conf.update(
# Clustering tasks → memory_tasks queue (使用相同的 worker避免 macOS fork 问题)
'app.tasks.run_incremental_clustering': {'queue': 'memory_tasks'},
# Metadata extraction → memory_tasks queue
'app.tasks.extract_user_metadata': {'queue': 'memory_tasks'},
# Document tasks → document_tasks queue (prefork worker)
'app.core.rag.tasks.parse_document': {'queue': 'document_tasks'},
'app.core.rag.tasks.build_graphrag_for_kb': {'queue': 'document_tasks'},

View File

@@ -14,6 +14,7 @@ from app.core.response_utils import success
from app.db import get_db
from app.models.app_model import App
from app.models.app_model import AppType
from app.models.app_release_model import AppRelease
from app.repositories import knowledge_repository
from app.repositories.end_user_repository import EndUserRepository
from app.schemas import AppChatRequest, conversation_schema
@@ -61,18 +62,18 @@ async def list_apps():
# return success(data={"received": True}, msg="消息已接收")
def _checkAppConfig(app: App):
if app.type == AppType.AGENT:
if not app.current_release.config:
def _checkAppConfig(release: AppRelease):
if release.type == AppType.AGENT:
if not release.config:
raise BusinessException("Agent 应用未配置模型", BizCode.AGENT_CONFIG_MISSING)
elif app.type == AppType.MULTI_AGENT:
if not app.current_release.config:
elif release.type == AppType.MULTI_AGENT:
if not release.config:
raise BusinessException("Multi-Agent 应用未配置模型", BizCode.AGENT_CONFIG_MISSING)
elif app.type == AppType.WORKFLOW:
if not app.current_release.config:
elif release.type == AppType.WORKFLOW:
if not release.config:
raise BusinessException("工作流应用未配置模型", BizCode.AGENT_CONFIG_MISSING)
else:
raise BusinessException("不支持的应用类型", BizCode.AGENT_CONFIG_MISSING)
raise BusinessException("不支持的应用类型", BizCode.APP_TYPE_NOT_SUPPORTED)
@router.post("/chat")
@@ -86,10 +87,22 @@ async def chat(
app_service: Annotated[AppService, Depends(get_app_service)] = None,
message: str = Body(..., description="聊天消息内容"),
):
"""
Agent/Workflow 聊天接口
- 不传 version使用当前生效版本current_release回滚后为回滚目标版本
- 传 version=N使用指定版本号的历史快照例如 {"version": 2}
"""
body = await request.json()
payload = AppChatRequest(**body)
app = app_service.get_app(api_key_auth.resource_id, api_key_auth.workspace_id)
# 版本切换:指定 version 时查找对应历史快照,否则使用当前激活版本
if payload.version is not None:
active_release = app_service.get_release_by_version(app.id, payload.version)
else:
active_release = app.current_release
other_id = payload.user_id
workspace_id = api_key_auth.workspace_id
end_user_repo = EndUserRepository(db)
@@ -127,7 +140,7 @@ async def chat(
storage_type = 'neo4j'
app_type = app.type
# check app config
_checkAppConfig(app)
_checkAppConfig(active_release)
# 获取或创建会话(提前验证)
conversation = conversation_service.create_or_get_conversation(
@@ -142,7 +155,7 @@ async def chat(
# print("="*50)
# print(app.current_release.default_model_config_id)
agent_config = agent_config_4_app_release(app.current_release)
agent_config = agent_config_4_app_release(active_release)
# print(agent_config.default_model_config_id)
# thinking 开关:仅当 agent 配置了 deep_thinking 且请求 thinking=True 时才启用
@@ -194,7 +207,7 @@ async def chat(
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
elif app_type == AppType.MULTI_AGENT:
# 多 Agent 流式返回
config = multi_agent_config_4_app_release(app.current_release)
config = multi_agent_config_4_app_release(active_release)
if payload.stream:
async def event_generator():
async for event in app_chat_service.multi_agent_chat_stream(
@@ -237,7 +250,7 @@ async def chat(
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
elif app_type == AppType.WORKFLOW:
# 多 Agent 流式返回
config = workflow_config_4_app_release(app.current_release)
config = workflow_config_4_app_release(active_release)
if payload.stream:
async def event_generator():
async for event in app_chat_service.workflow_chat_stream(
@@ -253,7 +266,7 @@ async def chat(
user_rag_memory_id=user_rag_memory_id,
app_id=app.id,
workspace_id=workspace_id,
release_id=app.current_release.id,
release_id=active_release.id,
public=True
):
event_type = event.get("event", "message")
@@ -288,7 +301,7 @@ async def chat(
files=payload.files,
app_id=app.id,
workspace_id=workspace_id,
release_id=app.current_release.id
release_id=active_release.id
)
logger.debug(
"工作流试运行返回结果",
@@ -302,6 +315,4 @@ async def chat(
msg="工作流任务执行成功"
)
else:
from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)

View File

@@ -153,7 +153,7 @@ class PerceptualSearchService:
return []
try:
r = await search_perceptual(
connector=connector, q=escaped,
connector=connector, query=escaped,
end_user_id=self.end_user_id,
limit=limit * 5, # 多查一些以提高命中率
)
@@ -178,7 +178,7 @@ class PerceptualSearchService:
if not escaped.strip():
return []
r = await search_perceptual(
connector=connector, q=escaped,
connector=connector, query=escaped,
end_user_id=self.end_user_id, limit=limit,
)
return r.get("perceptuals", [])

View File

@@ -58,6 +58,14 @@ from app.core.memory.models.triplet_models import (
TripletExtractionResponse,
)
# User metadata models
from app.core.memory.models.metadata_models import (
UserMetadata,
UserMetadataBehavioralHints,
UserMetadataProfile,
MetadataExtractionResponse,
)
# Ontology scenario models (LLM extracted from scenarios)
from app.core.memory.models.ontology_scenario_models import (
OntologyClass,
@@ -124,6 +132,10 @@ __all__ = [
"Entity",
"Triplet",
"TripletExtractionResponse",
"UserMetadata",
"UserMetadataBehavioralHints",
"UserMetadataProfile",
"MetadataExtractionResponse",
# Ontology models
"OntologyClass",
"OntologyExtractionResponse",

View File

@@ -364,12 +364,14 @@ class ChunkNode(Node):
Attributes:
dialog_id: ID of the parent dialog
content: The text content of the chunk
speaker: Speaker identifier ('user' or 'assistant')
chunk_embedding: Optional embedding vector for the chunk
sequence_number: Order of this chunk within the dialog
metadata: Additional chunk metadata as key-value pairs
"""
dialog_id: str = Field(..., description="ID of the parent dialog")
content: str = Field(..., description="The text content of the chunk")
speaker: Optional[str] = Field(None, description="Speaker identifier: 'user' for user messages, 'assistant' for AI responses")
chunk_embedding: Optional[List[float]] = Field(None, description="Chunk embedding vector")
sequence_number: int = Field(..., description="Order of this chunk within the dialog")
metadata: dict = Field(default_factory=dict, description="Additional chunk metadata")

View File

@@ -0,0 +1,57 @@
"""Models for user metadata extraction.
Independent from triplet_models.py - these models are used by the
standalone metadata extraction pipeline (post-dedup async Celery task).
"""
from typing import List
from pydantic import BaseModel, ConfigDict, Field
class UserMetadataProfile(BaseModel):
"""用户画像信息"""
model_config = ConfigDict(extra="ignore")
role: str = Field(default="", description="用户职业或角色")
domain: str = Field(default="", description="用户所在领域")
expertise: List[str] = Field(
default_factory=list, description="用户擅长的技能或工具"
)
interests: List[str] = Field(
default_factory=list, description="用户关注的话题或领域标签"
)
class UserMetadataBehavioralHints(BaseModel):
"""行为偏好"""
model_config = ConfigDict(extra="ignore")
learning_stage: str = Field(default="", description="学习阶段")
preferred_depth: str = Field(default="", description="偏好深度")
tone_preference: str = Field(default="", description="语气偏好")
class UserMetadata(BaseModel):
"""用户元数据顶层结构"""
model_config = ConfigDict(extra="ignore")
profile: UserMetadataProfile = Field(default_factory=UserMetadataProfile)
behavioral_hints: UserMetadataBehavioralHints = Field(
default_factory=UserMetadataBehavioralHints
)
knowledge_tags: List[str] = Field(default_factory=list, description="知识标签")
class MetadataExtractionResponse(BaseModel):
"""元数据提取 LLM 响应结构"""
model_config = ConfigDict(extra="ignore")
user_metadata: UserMetadata = Field(default_factory=UserMetadata)
aliases_to_add: List[str] = Field(
default_factory=list,
description="本次新发现的用户别名(用户自我介绍或他人对用户的称呼)",
)
aliases_to_remove: List[str] = Field(
default_factory=list, description="用户明确否认的别名(如'我不叫XX了'"
)

View File

@@ -1,4 +1,3 @@
import argparse
import asyncio
import json
import math
@@ -6,7 +5,6 @@ import os
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import UUID
if TYPE_CHECKING:
from app.schemas.memory_config_schema import MemoryConfig
@@ -23,7 +21,7 @@ from app.core.memory.utils.config.config_utils import (
)
from app.core.memory.utils.data.text_utils import extract_plain_query
from app.core.memory.utils.data.time_utils import normalize_date_safe
from app.core.memory.utils.llm.llm_utils import get_reranker_client
# from app.core.memory.utils.llm.llm_utils import get_reranker_client
from app.core.models.base import RedBearModelConfig
from app.db import get_db_context
from app.repositories.neo4j.graph_search import (
@@ -748,11 +746,10 @@ async def run_hybrid_search(
if search_type in ["keyword", "hybrid"]:
# Keyword-based search
logger.info("[PERF] Starting keyword search...")
keyword_start = time.time()
keyword_task = asyncio.create_task(
search_graph(
connector=connector,
q=query_text,
query=query_text,
end_user_id=end_user_id,
limit=limit,
include=include
@@ -762,7 +759,6 @@ async def run_hybrid_search(
if search_type in ["embedding", "hybrid"]:
# Embedding-based search
logger.info("[PERF] Starting embedding search...")
embedding_start = time.time()
# 从数据库读取嵌入器配置(按 ID并构建 RedBearModelConfig
config_load_start = time.time()
@@ -904,10 +900,10 @@ async def run_hybrid_search(
else:
results["latency_metrics"] = latency_metrics
logger.info(f"[PERF] ===== SEARCH PERFORMANCE SUMMARY =====")
logger.info("[PERF] ===== SEARCH PERFORMANCE SUMMARY =====")
logger.info(f"[PERF] Total search completed in {total_latency:.4f}s")
logger.info(f"[PERF] Latency breakdown: {json.dumps(latency_metrics, indent=2)}")
logger.info(f"[PERF] =========================================")
logger.info("[PERF] =========================================")
# Sanitize results: drop large/unused fields
_remove_keys_recursive(results, ["name_embedding"]) # drop entity name embeddings from outputs

View File

@@ -311,10 +311,53 @@ class ExtractionOrchestrator:
dialog_data_list,
)
# 步骤 7: 同步用户别名到数据库表(仅正式模式)
# 步骤 7: 触发异步元数据和别名提取(仅正式模式)
if not is_pilot_run:
logger.info("步骤 7: 同步用户别名到 end_user 和 end_user_info 表")
await self._update_end_user_other_name(entity_nodes, dialog_data_list)
try:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import (
MetadataExtractor,
)
metadata_extractor = MetadataExtractor(
llm_client=self.llm_client, language=self.language
)
user_statements = (
metadata_extractor.collect_user_related_statements(
entity_nodes, statement_nodes, statement_entity_edges
)
)
if user_statements:
end_user_id = (
dialog_data_list[0].end_user_id
if dialog_data_list
else None
)
config_id = (
dialog_data_list[0].config_id
if dialog_data_list
and hasattr(dialog_data_list[0], "config_id")
else None
)
if end_user_id:
from app.tasks import extract_user_metadata_task
extract_user_metadata_task.delay(
end_user_id=str(end_user_id),
statements=user_statements,
config_id=str(config_id) if config_id else None,
language=self.language,
)
logger.info(
f"已触发异步元数据提取任务,共 {len(user_statements)} 条用户相关 statement"
)
else:
logger.info("未找到用户相关 statement跳过元数据提取")
except Exception as e:
logger.error(
f"触发元数据提取任务失败(不影响主流程): {e}", exc_info=True
)
# 别名同步已迁移到 Celery 元数据提取任务中,不再在此处执行
logger.info(f"知识提取流水线运行完成({mode_str}")
return (
@@ -1107,6 +1150,7 @@ class ExtractionOrchestrator:
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
content=chunk.content,
speaker=getattr(chunk, 'speaker', None),
chunk_embedding=chunk.chunk_embedding,
sequence_number=chunk_idx, # 添加必需的 sequence_number 字段
created_at=dialog_data.created_at,
@@ -1342,7 +1386,7 @@ class ExtractionOrchestrator:
async def _update_end_user_other_name(
self,
entity_nodes: List[ExtractedEntityNode],
dialog_data_list: List[DialogData]
dialog_data_list: List[DialogData],
) -> None:
"""
将本轮提取的用户别名同步到 end_user 和 end_user_info 表。
@@ -1470,7 +1514,6 @@ class ExtractionOrchestrator:
end_user_id=end_user_uuid,
other_name=first_alias,
aliases=merged_aliases,
meta_data={}
))
logger.info(f"创建 end_user_info 记录other_name={first_alias}, aliases={merged_aliases}")
@@ -1478,9 +1521,6 @@ class ExtractionOrchestrator:
except Exception as e:
logger.error(f"更新 end_user other_name 失败: {e}", exc_info=True)
# 用户实体占位名称,不允许作为 other_name 或出现在 aliases 中
# 复用 deduped_and_disamb 模块级常量,避免重复维护
USER_PLACEHOLDER_NAMES = _USER_PLACEHOLDER_NAMES
@@ -1587,7 +1627,6 @@ class ExtractionOrchestrator:
if candidate and candidate.lower() in self.USER_PLACEHOLDER_NAMES:
return None
return candidate
return None
async def _run_dedup_and_write_summary(

View File

@@ -0,0 +1,175 @@
"""
Metadata extractor module.
Collects user-related statements from post-dedup graph data and
extracts user metadata via an independent LLM call.
"""
import logging
from typing import List, Optional
from app.core.memory.models.graph_models import (
ExtractedEntityNode,
StatementEntityEdge,
StatementNode,
)
logger = logging.getLogger(__name__)
# Reuse the same user-entity detection logic from dedup module
_USER_NAMES = {"用户", "", "user", "i"}
_CANONICAL_USER_TYPE = "用户"
def _is_user_entity(ent: ExtractedEntityNode) -> bool:
"""判断实体是否为用户实体"""
name = (getattr(ent, "name", "") or "").strip().lower()
etype = (getattr(ent, "entity_type", "") or "").strip()
return name in _USER_NAMES or etype == _CANONICAL_USER_TYPE
class MetadataExtractor:
"""Extracts user metadata from post-dedup graph data via independent LLM call."""
def __init__(self, llm_client, language: Optional[str] = None):
self.llm_client = llm_client
self.language = language
@staticmethod
def detect_language(statements: List[str]) -> str:
"""根据 statement 文本内容检测语言。
如果文本中包含中文字符则返回 "zh",否则返回 "en"
"""
import re
combined = " ".join(statements)
if re.search(r"[\u4e00-\u9fff]", combined):
return "zh"
return "en"
def collect_user_related_statements(
self,
entity_nodes: List[ExtractedEntityNode],
statement_nodes: List[StatementNode],
statement_entity_edges: List[StatementEntityEdge],
) -> List[str]:
"""
从去重后的数据中筛选与用户直接相关且由用户发言的 statement 文本。
筛选逻辑:
1. 用户实体 → StatementEntityEdge → statement直接关联
2. 只保留 speaker="user" 的 statement过滤 assistant 回复的噪声)
Returns:
用户发言的 statement 文本列表
"""
# Find user entity IDs
user_entity_ids = set()
for ent in entity_nodes:
if _is_user_entity(ent):
user_entity_ids.add(ent.id)
if not user_entity_ids:
logger.debug("未找到用户实体节点,跳过 statement 收集")
return []
# 用户实体 → StatementEntityEdge → statement
target_stmt_ids = set()
for edge in statement_entity_edges:
if edge.target in user_entity_ids:
target_stmt_ids.add(edge.source)
# Collect: only speaker="user" statements, preserving order
result = []
seen = set()
total_associated = 0
skipped_non_user = 0
for stmt_node in statement_nodes:
if stmt_node.id in target_stmt_ids and stmt_node.id not in seen:
total_associated += 1
speaker = getattr(stmt_node, "speaker", None) or "unknown"
if speaker == "user":
text = (stmt_node.statement or "").strip()
if text:
result.append(text)
else:
skipped_non_user += 1
seen.add(stmt_node.id)
logger.info(
f"收集到 {len(result)} 条用户发言 statement "
f"(直接关联: {total_associated}, speaker=user: {len(result)}, "
f"跳过非user: {skipped_non_user})"
)
if result:
for i, text in enumerate(result):
logger.info(f" [user statement {i + 1}] {text}")
if total_associated > 0 and len(result) == 0:
logger.warning(
f"{total_associated} 条直接关联 statement 但全部被 speaker 过滤,"
f"可能本次写入不包含 user 消息"
)
return result
async def extract_metadata(
self,
statements: List[str],
existing_metadata: Optional[dict] = None,
existing_aliases: Optional[List[str]] = None,
) -> Optional[tuple]:
"""
对筛选后的 statement 列表调用 LLM 提取元数据和用户别名。
Args:
statements: 用户发言的 statement 文本列表
existing_metadata: 数据库已有的元数据(可选)
existing_aliases: 数据库已有的用户别名列表(可选)
Returns:
(UserMetadata, List[str], List[str]) tuple: (metadata, aliases_to_add, aliases_to_remove) on success, None on failure
"""
if not statements:
return None
try:
from app.core.memory.utils.prompt.prompt_utils import prompt_env
if self.language:
detected_language = self.language
logger.info(f"元数据提取使用显式指定语言: {detected_language}")
else:
detected_language = self.detect_language(statements)
logger.info(f"元数据提取语言自动检测结果: {detected_language}")
template = prompt_env.get_template("extract_user_metadata.jinja2")
prompt = template.render(
statements=statements,
language=detected_language,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
json_schema="",
)
from app.core.memory.models.metadata_models import (
MetadataExtractionResponse,
)
response = await self.llm_client.response_structured(
messages=[{"role": "user", "content": prompt}],
response_model=MetadataExtractionResponse,
)
if response:
metadata = response.user_metadata if response.user_metadata else None
to_add = response.aliases_to_add if response.aliases_to_add else []
to_remove = (
response.aliases_to_remove if response.aliases_to_remove else []
)
return metadata, to_add, to_remove
logger.warning("LLM 返回的响应为空")
return None
except Exception as e:
logger.error(f"元数据提取 LLM 调用失败: {e}", exc_info=True)
return None

View File

@@ -1,6 +1,5 @@
import asyncio
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
@@ -82,6 +81,7 @@ class StatementExtractor:
logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty")
return None
async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]:
"""Process a single chunk and return extracted statements
@@ -94,7 +94,8 @@ class StatementExtractor:
List of ExtractedStatement objects extracted from the chunk
"""
chunk_content = chunk.content
chunk_speaker = self._get_speaker_from_chunk(chunk)
if not chunk_content or len(chunk_content.strip()) < 5:
logger.warning(f"Chunk {chunk.id} content too short or empty, skipping")
return []
@@ -149,8 +150,6 @@ class StatementExtractor:
relevence_info = RelevenceInfo[relevence_str] if relevence_str in RelevenceInfo.__members__ else RelevenceInfo.RELEVANT
except (KeyError, ValueError):
relevence_info = RelevenceInfo.RELEVANT
chunk_speaker = self._get_speaker_from_chunk(chunk)
chunk_statement = Statement(
statement=extracted_stmt.statement,

View File

@@ -1,4 +1,3 @@
import os
import asyncio
from typing import List, Dict, Optional

View File

@@ -5,7 +5,7 @@
使用Neo4j的全文索引进行高效的文本匹配。
"""
from typing import List, Dict, Any, Optional
from typing import List, Optional
from app.core.logging_config import get_memory_logger
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.memory.storage_services.search.search_strategy import SearchStrategy, SearchResult
@@ -74,7 +74,7 @@ class KeywordSearchStrategy(SearchStrategy):
# 调用底层的关键词搜索函数
results_dict = await search_graph(
connector=self.connector,
q=query_text,
query=query_text,
end_user_id=end_user_id,
limit=limit,
include=include_list

View File

@@ -22,7 +22,9 @@ def escape_lucene_query(query: str) -> str:
s = s.replace("\r", " ").replace("\n", " ").strip()
# Lucene reserved tokens/special characters
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':']
# NOTE: '/' is the regex delimiter in Lucene — must be escaped to prevent
# TokenMgrError when the query contains unmatched slashes.
specials = ['&&', '||', '\\', '+', '-', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':', '/']
# Replace longer tokens first to avoid partial double-escaping
for token in sorted(specials, key=len, reverse=True):
s = s.replace(token, f"\\{token}")

View File

@@ -43,8 +43,9 @@ Each statement must be labeled as per the criteria mentioned below.
对话上下文和共指消解:
- 将每个陈述句归属于说出它的参与者。
- 如果参与者列表为说话者提供了名称(例如,"李雪(用户)"),请在提取的陈述句中使用具体名称("李雪"),而不是通用角色("用户"
- 将所有代词解析为对话上下文中的具体人物或实体
- **对于用户的发言:必须使用"用户"作为主语**,禁止将"用户"或"我"替换为用户的真实姓名或别名。例如,用户说"我叫张三"应提取为"用户叫张三",而不是"张三叫张三"
- 对于 AI 助手的发言:使用"助手"或"AI助手"作为主语
- 将所有代词解析为对话上下文中的具体人物或实体,但"我"必须解析为"用户"。
- 识别并将抽象引用解析为其具体名称(如果提到)。
- 将缩写和首字母缩略词扩展为其完整形式。
{% else %}
@@ -68,8 +69,9 @@ Context Resolution Requirements:
Conversational Context & Co-reference Resolution:
- Attribute every statement to the participant who uttered it.
- If the participant list provides a name for a speaker (e.g., "李雪 (用户)"), use the specific name ("李雪") in the extracted statement, not the generic role ("用户").
- Resolve all pronouns to the specific person or entity from the conversation's context.
- **For user's statements: always use "用户" (User) as the subject**. Do NOT replace "用户" or "I" with the user's real name or alias. For example, if the user says "I'm John", extract as "用户 is John", not "John is John".
- For AI assistant's statements: use "助手" or "AI助手" as the subject.
- Resolve all pronouns to the specific person or entity from the conversation's context, but "I"/"我" must always resolve to "用户".
- Identify and resolve abstract references to their specific names if mentioned.
- Expand abbreviations and acronyms to their full form.
{% endif %}
@@ -139,13 +141,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
示例输出: {
"statements": [
{
"statement": "Sarah Chen 最近一直在尝试水彩画。",
"statement": "用户最近一直在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen 画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -157,13 +159,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料与阿拉伯树胶等粘合
"relevance": "IRRELEVANT"
},
{
"statement": "Sarah Chen 认为她的水彩画中的色彩组合可以改进。",
"statement": "用户认为她的水彩画中的色彩组合可以改进。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen 真的很喜欢玫瑰和百合。",
"statement": "用户真的很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -186,13 +188,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
示例输出: {
"statements": [
{
"statement": "张曼婷最近在尝试水彩画。",
"statement": "用户最近在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -204,13 +206,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
"relevance": "IRRELEVANT"
},
{
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。",
"statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷很喜欢玫瑰和百合。",
"statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -233,13 +235,13 @@ User: "I think the color combinations could use some improvement, but I really l
Example Output: {
"statements": [
{
"statement": "Sarah Chen has been trying watercolor painting recently.",
"statement": "用户 has been trying watercolor painting recently.",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen painted some flowers.",
"statement": "用户 painted some flowers.",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -251,13 +253,13 @@ Example Output: {
"relevance": "IRRELEVANT"
},
{
"statement": "Sarah Chen thinks the color combinations in her watercolor paintings could use some improvement.",
"statement": "用户 thinks the color combinations in her watercolor paintings could use some improvement.",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "Sarah Chen really likes roses and lilies.",
"statement": "用户 really likes roses and lilies.",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
@@ -280,13 +282,13 @@ AI: "水彩画很有趣!水彩颜料通常由颜料和阿拉伯树胶等粘合
Example Output: {
"statements": [
{
"statement": "张曼婷最近在尝试水彩画。",
"statement": "用户最近在尝试水彩画。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷画了一些花朵。",
"statement": "用户画了一些花朵。",
"statement_type": "FACT",
"temporal_type": "DYNAMIC",
"relevance": "RELEVANT"
@@ -298,13 +300,13 @@ Example Output: {
"relevance": "IRRELEVANT"
},
{
"statement": "张曼婷觉得水彩画的色彩搭配还有提升的空间。",
"statement": "用户觉得水彩画的色彩搭配还有提升的空间。",
"statement_type": "OPINION",
"temporal_type": "STATIC",
"relevance": "RELEVANT"
},
{
"statement": "张曼婷很喜欢玫瑰和百合。",
"statement": "用户很喜欢玫瑰和百合。",
"statement_type": "FACT",
"temporal_type": "STATIC",
"relevance": "RELEVANT"

View File

@@ -406,4 +406,12 @@ Output:
- **⚠️ ALIASES ORDER: preserve temporal order of appearance**
- **🚨 MANDATORY FIELD: EVERY entity MUST include "aliases" field, even if empty array []**
**Output JSON structure:**
```json
{
"triplets": [...],
"entities": [...]
}
```
{{ json_schema }}

View File

@@ -0,0 +1,135 @@
===Task===
Extract user metadata from the following conversation statements spoken by the user.
{% if language == "zh" %}
**"三度原则"判断标准:**
- 复用度:该信息是否会被多个功能模块使用?
- 约束度:该信息是否会影响系统行为?
- 时效性:该信息是长期稳定的还是临时的?仅提取长期稳定信息。
**提取规则:**
- **只提取关于"用户本人"的画像信息**,忽略用户提到的第三方人物(如朋友、同事、家人)的信息
- 仅提取文本中明确提到的信息,不要推测
- 如果文本中没有可提取的用户画像信息,返回空的 user_metadata 对象
- **输出语言必须与输入文本的语言一致**(输入中文则输出中文值,输入英文则输出英文值)
{% if existing_metadata %}
**重要:合并已有元数据**
下方提供了数据库中已有的用户元数据。请结合用户最新发言,输出**合并后的完整元数据**
- 如果用户明确否定了已有信息(如"我不再教高中物理了"),在输出中**移除**该信息
- 如果用户提到了新信息,**添加**到对应字段中
- 如果已有信息未被用户否定,**保留**在输出中
- 标量字段(如 role、domain如果用户提到了新值用新值替换否则保留已有值
- 最终输出应该是完整的、合并后的元数据,不是增量
{% endif %}
**字段说明:**
- profile.role用户的职业或角色如 教师、医生、后端工程师
- profile.domain用户所在领域如 教育、医疗、软件开发
- profile.expertise用户擅长的技能或工具通用不限于编程如 Python、心理咨询、高中物理
- profile.interests用户主动表达兴趣的话题或领域标签
- behavioral_hints.learning_stage学习阶段初学者/中级/高级)
- behavioral_hints.preferred_depth偏好深度概览/技术细节/深入探讨)
- behavioral_hints.tone_preference语气偏好轻松随意/专业简洁/学术严谨)
- knowledge_tags用户涉及的知识领域标签
**用户别名变更(增量模式):**
- **aliases_to_add**:本次新发现的用户别名,包括:
* 用户主动自我介绍:如"我叫张三"、"我的名字是XX"、"我的网名是XX"
* 他人对用户的称呼:如"同事叫我陈哥"、"大家叫我小张"、"领导叫我老陈"
* 只提取原文中逐字出现的名字,严禁推测或创造
* 禁止提取:用户给 AI 取的名字、第三方人物自身的名字、"用户"/"我" 等占位词
* 如果没有新别名,返回空数组 `[]`
- **aliases_to_remove**:用户明确否认的别名,包括:
* 用户说"我不叫XX了"、"别叫我XX"、"我改名了不叫XX" → 将 XX 放入此数组
* **严格限制**:只将用户原文中**逐字提到**的被否认名字放入,不要推断关联的其他别名
* 例如:用户说"我不叫陈小刀了" → 只移除"陈小刀",不要移除"陈哥"、"老陈"等未被提及的别名
* 如果没有要移除的别名,返回空数组 `[]`
{% if existing_aliases %}
- 已有别名:{{ existing_aliases | tojson }}(仅供参考,不需要在输出中重复)
{% endif %}
{% else %}
**"Three-Degree Principle" criteria:**
- Reusability: Will this information be used by multiple functional modules?
- Constraint: Will this information affect system behavior?
- Timeliness: Is this information long-term stable or temporary? Only extract long-term stable information.
**Extraction rules:**
- **Only extract profile information about the user themselves**, ignore information about third parties (friends, colleagues, family) mentioned by the user
- Only extract information explicitly mentioned in the text, do not speculate
- If no user profile information can be extracted, return an empty user_metadata object
- **Output language must match the input text language**
{% if existing_metadata %}
**Important: Merge with existing metadata**
Existing user metadata from the database is provided below. Combine with the user's latest statements to output the **complete merged metadata**:
- If the user explicitly negates existing info (e.g. "I no longer teach high school physics"), **remove** it from output
- If the user mentions new info, **add** it to the corresponding field
- If existing info is not negated by the user, **keep** it in the output
- Scalar fields (e.g. role, domain): replace with new value if user mentions one; otherwise keep existing
- The final output should be the complete, merged metadata — not an incremental update
{% endif %}
**Field descriptions:**
- profile.role: User's occupation or role, e.g. teacher, doctor, software engineer
- profile.domain: User's domain, e.g. education, healthcare, software development
- profile.expertise: User's skills or tools (general, not limited to programming)
- profile.interests: Topics or domain tags the user actively expressed interest in
- behavioral_hints.learning_stage: Learning stage (beginner/intermediate/advanced)
- behavioral_hints.preferred_depth: Preferred depth (overview/detailed/deep dive)
- behavioral_hints.tone_preference: Tone preference (casual/professional/academic)
- knowledge_tags: Knowledge domain tags related to the user
**User alias changes (incremental mode):**
- **aliases_to_add**: Newly discovered user aliases from this conversation, including:
* User self-introductions: e.g. "I'm John", "My name is XX", "My username is XX"
* How others address the user: e.g. "My colleagues call me Johnny", "People call me Mike"
* Only extract names that appear VERBATIM in the text — never infer or fabricate
* Do NOT extract: names the user gives to the AI, third-party people's own names, placeholder words like "User"/"I"
* If no new aliases, return empty array `[]`
- **aliases_to_remove**: Aliases the user explicitly denies, including:
* User says "Don't call me XX anymore", "I'm not called XX", "I changed my name from XX" → put XX in this array
* **Strict rule**: Only include the exact name the user **verbatim mentions** as denied. Do NOT infer or remove related aliases
* Example: User says "I'm not called John anymore" → only remove "John", do NOT remove "Johnny", "J" or other related aliases not mentioned
* If no aliases to remove, return empty array `[]`
{% if existing_aliases %}
- Existing aliases: {{ existing_aliases | tojson }} (for reference only, do not repeat in output)
{% endif %}
{% endif %}
===User Statements===
{% for stmt in statements %}
- {{ stmt }}
{% endfor %}
{% if existing_metadata %}
===Existing User Metadata===
```json
{{ existing_metadata | tojson }}
```
{% endif %}
===Output Format===
Return a JSON object with the following structure:
```json
{
"user_metadata": {
"profile": {
"role": "",
"domain": "",
"expertise": [],
"interests": []
},
"behavioral_hints": {
"learning_stage": "",
"preferred_depth": "",
"tone_preference": ""
},
"knowledge_tags": []
},
"aliases_to_add": [],
"aliases_to_remove": []
}
```
{{ json_schema }}

View File

@@ -31,7 +31,7 @@ logger = logging.getLogger(__name__)
# Example:
# "Hello {{user.name}}!" ->
# ["Hello ", "{{user.name}}", "!"]
_OUTPUT_PATTERN = re.compile(r'\{\{.*?}}|[^{}]+')
_OUTPUT_PATTERN = re.compile(r'\{\{.*?}}|[^{]+|{')
# Strict variable format: {{ node_id.field_name }}
_VARIABLE_PATTERN = re.compile(r'\{\{\s*[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+)?\s*}}')

View File

@@ -55,9 +55,9 @@ class CycleGraphNode(BaseNode):
if config.output_type in [
VariableType.ARRAY_FILE,
VariableType.ARRAY_STRING,
VariableType.NUMBER,
VariableType.ARRAY_NUMBER,
VariableType.ARRAY_OBJECT,
VariableType.BOOLEAN
VariableType.ARRAY_BOOLEAN
]:
if config.flatten:
outputs['output'] = config.output_type

View File

@@ -61,3 +61,15 @@ def get_apps_by_id(db: Session, app_id: uuid.UUID) -> App:
"""根据工作空间ID查询应用"""
repo = AppRepository(db)
return repo.get_apps_by_id(app_id)
def get_release_by_version(db: Session, app_id: uuid.UUID, version: int):
"""根据版本号查询发布快照(仅返回激活状态)"""
from app.models.app_release_model import AppRelease
return db.scalars(
select(AppRelease).where(
AppRelease.app_id == app_id,
AppRelease.version == version,
AppRelease.is_active.is_(True),
)
).first()

View File

@@ -23,6 +23,7 @@ SET s += {
end_user_id: statement.end_user_id,
stmt_type: statement.stmt_type,
statement: statement.statement,
speaker: statement.speaker,
emotion_intensity: statement.emotion_intensity,
emotion_target: statement.emotion_target,
emotion_subject: statement.emotion_subject,
@@ -56,6 +57,7 @@ SET c += {
expired_at: chunk.expired_at,
dialog_id: chunk.dialog_id,
content: chunk.content,
speaker: chunk.speaker,
chunk_embedding: chunk.chunk_embedding,
sequence_number: chunk.sequence_number,
start_index: chunk.start_index,
@@ -283,7 +285,7 @@ LIMIT $limit
"""
SEARCH_STATEMENTS_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $q) YIELD node AS s, score
CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
@@ -307,7 +309,7 @@ LIMIT $limit
"""
# 查询实体名称包含指定字符串的实体
SEARCH_ENTITIES_BY_NAME = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $q) YIELD node AS e, score
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
OPTIONAL MATCH (s:Statement)-[:REFERENCES_ENTITY]->(e)
OPTIONAL MATCH (c:Chunk)-[:CONTAINS]->(s)
@@ -337,21 +339,21 @@ LIMIT $limit
"""
SEARCH_ENTITIES_BY_NAME_OR_ALIAS = """
CALL db.index.fulltext.queryNodes("entitiesFulltext", $q) YIELD node AS e, score
CALL db.index.fulltext.queryNodes("entitiesFulltext", $query) YIELD node AS e, score
WHERE ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
WITH e, score
WITH collect({entity: e, score: score}) AS fulltextResults
With collect({entity: e, score: score}) AS fulltextResults
OPTIONAL MATCH (ae:ExtractedEntity)
WHERE ($end_user_id IS NULL OR ae.end_user_id = $end_user_id)
AND ae.aliases IS NOT NULL
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($q))
AND ANY(alias IN ae.aliases WHERE toLower(alias) CONTAINS toLower($query))
WITH fulltextResults, collect(ae) AS aliasEntities
UNWIND (fulltextResults + [x IN aliasEntities | {entity: x, score:
CASE
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($q)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($q)) THEN 0.9
WHEN ANY(alias IN x.aliases WHERE toLower(alias) = toLower($query)) THEN 1.0
WHEN ANY(alias IN x.aliases WHERE toLower(alias) STARTS WITH toLower($query)) THEN 0.9
ELSE 0.8
END
}]) AS row
@@ -384,7 +386,7 @@ LIMIT $limit
SEARCH_CHUNKS_BY_CONTENT = """
CALL db.index.fulltext.queryNodes("chunksFulltext", $q) YIELD node AS c, score
CALL db.index.fulltext.queryNodes("chunksFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
OPTIONAL MATCH (c)-[:CONTAINS]->(s:Statement)
OPTIONAL MATCH (s)-[:REFERENCES_ENTITY]->(e:ExtractedEntity)
@@ -501,7 +503,7 @@ LIMIT $limit
"""
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL = """
CALL db.index.fulltext.queryNodes("statementsFulltext", $q) YIELD node AS s, score
CALL db.index.fulltext.queryNodes("statementsFulltext", $query) YIELD node AS s, score
WHERE ($end_user_id IS NULL OR s.end_user_id = $end_user_id)
AND ((($start_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) >= datetime($start_date)))
AND ($end_date IS NULL OR (s.created_at IS NOT NULL AND datetime(s.created_at) <= datetime($end_date))))
@@ -677,7 +679,7 @@ SET n.invalid_at = $new_invalid_at
# MemorySummary keyword search using fulltext index
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("summariesFulltext", $q) YIELD node AS m, score
CALL db.index.fulltext.queryNodes("summariesFulltext", $query) YIELD node AS m, score
WHERE ($end_user_id IS NULL OR m.end_user_id = $end_user_id)
OPTIONAL MATCH (m)-[:DERIVED_FROM_STATEMENT]->(s:Statement)
RETURN m.id AS id,
@@ -1363,7 +1365,7 @@ RETURN c.community_id AS community_id
# Community keyword search: matches name or summary via fulltext index
SEARCH_COMMUNITIES_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("communitiesFulltext", $q) YIELD node AS c, score
CALL db.index.fulltext.queryNodes("communitiesFulltext", $query) YIELD node AS c, score
WHERE ($end_user_id IS NULL OR c.end_user_id = $end_user_id)
RETURN c.community_id AS id,
c.name AS name,
@@ -1451,7 +1453,7 @@ RETURN elementId(r) AS uuid
"""
SEARCH_PERCEPTUAL_BY_KEYWORD = """
CALL db.index.fulltext.queryNodes("perceptualFulltext", $q) YIELD node AS p, score
CALL db.index.fulltext.queryNodes("perceptualFulltext", $query) YIELD node AS p, score
WHERE p.end_user_id = $end_user_id
RETURN p.id AS id,
p.end_user_id AS end_user_id,

View File

@@ -186,6 +186,58 @@ async def save_dialog_and_statements_to_neo4j(
Returns:
bool: True if successful, False otherwise
"""
# TODO 需要在去重消歧节阶段,做以下逻辑的处理
# 预处理:对特殊实体("用户"、"AI助手")复用 Neo4j 中已有节点的 ID
# 确保同一个 end_user_id 下只有一个"用户"节点和一个"AI助手"节点。
if entity_nodes:
_SPECIAL_NAMES = {"用户", "", "user", "i", "ai助手", "助手", "ai assistant", "assistant"}
end_user_id = entity_nodes[0].end_user_id if entity_nodes else None
if end_user_id:
try:
# 查询已有的特殊实体
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $names
RETURN e.id AS id, e.name AS name
"""
existing = await connector.execute_query(
cypher,
end_user_id=end_user_id,
names=list(_SPECIAL_NAMES),
)
# 建立 name(lower) → existing_id 映射
existing_id_map = {}
for record in (existing or []):
name_lower = (record.get("name") or "").strip().lower()
if name_lower and record.get("id"):
existing_id_map[name_lower] = record["id"]
if existing_id_map:
# 替换新实体的 ID 为已有 ID同时更新所有引用该 ID 的边
for ent in entity_nodes:
name_lower = (ent.name or "").strip().lower()
if name_lower in existing_id_map:
old_id = ent.id
new_id = existing_id_map[name_lower]
if old_id != new_id:
ent.id = new_id
# 更新 statement_entity_edges 中的引用
for edge in statement_entity_edges:
if edge.target == old_id:
edge.target = new_id
if edge.source == old_id:
edge.source = new_id
# 更新 entity_edges 中的引用
for edge in entity_edges:
if edge.source == old_id:
edge.source = new_id
if edge.target == old_id:
edge.target = new_id
logger.info(
f"特殊实体 '{ent.name}' ID 复用: {old_id[:8]}... → {new_id[:8]}..."
)
except Exception as e:
logger.warning(f"特殊实体 ID 复用查询失败(不影响写入): {e}")
# 定义事务函数,将所有写操作放在一个事务中
async def _save_all_in_transaction(tx):

View File

@@ -2,6 +2,7 @@ import asyncio
import logging
from typing import Any, Dict, List, Optional
from app.core.memory.utils.data.text_utils import escape_lucene_query
from app.repositories.neo4j.cypher_queries import (
CHUNK_EMBEDDING_SEARCH,
COMMUNITY_EMBEDDING_SEARCH,
@@ -87,7 +88,7 @@ async def _update_activation_values_batch(
unique_node_ids.append(node_id)
if not unique_node_ids:
logger.warning(f"批量更新激活值没有有效的节点ID")
logger.warning("批量更新激活值没有有效的节点ID")
return nodes
# 记录去重信息(仅针对具有有效 ID 的节点)
@@ -223,7 +224,7 @@ async def _update_search_results_activation(
async def search_graph(
connector: Neo4jConnector,
q: str,
query: str,
end_user_id: Optional[str] = None,
limit: int = 50,
include: List[str] = None,
@@ -234,14 +235,14 @@ async def search_graph(
OPTIMIZED: Runs all queries in parallel using asyncio.gather()
INTEGRATED: Updates activation values for knowledge nodes before returning results
- Statements: matches s.statement CONTAINS q
- Entities: matches e.name CONTAINS q
- Chunks: matches s.content CONTAINS q (from Statement nodes)
- Summaries: matches ms.content CONTAINS q
- Statements: matches s.statement CONTAINS query
- Entities: matches e.name CONTAINS query
- Chunks: matches s.content CONTAINS query (from Statement nodes)
- Summaries: matches ms.content CONTAINS query
Args:
connector: Neo4j connector
q: Query text
query: Query text for full-text search
end_user_id: Optional group filter
limit: Max results per category
include: List of categories to search (default: all)
@@ -252,6 +253,9 @@ async def search_graph(
if include is None:
include = ["statements", "chunks", "entities", "summaries"]
# Escape Lucene special characters to prevent query parse errors
escaped_query = escape_lucene_query(query)
# Prepare tasks for parallel execution
tasks = []
task_keys = []
@@ -260,7 +264,7 @@ async def search_graph(
tasks.append(connector.execute_query(
SEARCH_STATEMENTS_BY_KEYWORD,
json_format=True,
q=q,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
@@ -270,7 +274,7 @@ async def search_graph(
tasks.append(connector.execute_query(
SEARCH_ENTITIES_BY_NAME_OR_ALIAS,
json_format=True,
q=q,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
@@ -280,7 +284,7 @@ async def search_graph(
tasks.append(connector.execute_query(
SEARCH_CHUNKS_BY_CONTENT,
json_format=True,
q=q,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
@@ -290,7 +294,7 @@ async def search_graph(
tasks.append(connector.execute_query(
SEARCH_MEMORY_SUMMARIES_BY_KEYWORD,
json_format=True,
q=q,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
@@ -300,7 +304,7 @@ async def search_graph(
tasks.append(connector.execute_query(
SEARCH_COMMUNITIES_BY_KEYWORD,
json_format=True,
q=q,
query=escaped_query,
end_user_id=end_user_id,
limit=limit,
))
@@ -482,7 +486,7 @@ async def search_graph_by_embedding(
update_time = time.time() - update_start
logger.info(f"[PERF] Activation value updates took: {update_time:.4f}s")
else:
logger.info(f"[PERF] Skipping activation updates (only summaries)")
logger.info("[PERF] Skipping activation updates (only summaries)")
return results
@@ -520,7 +524,7 @@ async def get_dedup_candidates_for_entities( # 适配新版查询:使用全
# 全文索引按名称检索(包含 CONTAINS 语义)
rows = await connector.execute_query(
SEARCH_ENTITIES_BY_NAME,
q=name,
query=escape_lucene_query(name),
end_user_id=end_user_id,
limit=100,
)
@@ -544,7 +548,7 @@ async def get_dedup_candidates_for_entities( # 适配新版查询:使用全
try:
rows = await connector.execute_query(
SEARCH_ENTITIES_BY_NAME,
q=name.lower(),
query=escape_lucene_query(name.lower()),
end_user_id=end_user_id,
limit=100,
)
@@ -593,11 +597,12 @@ async def search_graph_by_keyword_temporal(
- Returns up to 'limit' statements
"""
if not query_text:
logger.warning(f"query_text不能为空")
logger.warning("query_text不能为空")
return {"statements": []}
escaped_query = escape_lucene_query(query_text)
statements = await connector.execute_query(
SEARCH_STATEMENTS_BY_KEYWORD_TEMPORAL,
q=query_text,
query=escaped_query,
end_user_id=end_user_id,
start_date=start_date,
end_date=end_date,
@@ -671,7 +676,7 @@ async def search_graph_by_dialog_id(
- Returns up to 'limit' dialogues
"""
if not dialog_id:
logger.warning(f"dialog_id不能为空")
logger.warning("dialog_id不能为空")
return {"dialogues": []}
dialogues = await connector.execute_query(
@@ -690,7 +695,7 @@ async def search_graph_by_chunk_id(
limit: int = 1,
) -> Dict[str, List[Dict[str, Any]]]:
if not chunk_id:
logger.warning(f"chunk_id不能为空")
logger.warning("chunk_id不能为空")
return {"chunks": []}
chunks = await connector.execute_query(
SEARCH_CHUNK_BY_CHUNK_ID,
@@ -968,7 +973,7 @@ async def search_graph_l_valid_at(
async def search_perceptual(
connector: Neo4jConnector,
q: str,
query: str,
end_user_id: Optional[str] = None,
limit: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
@@ -979,7 +984,7 @@ async def search_perceptual(
Args:
connector: Neo4j connector
q: Query text
query: Query text for full-text search
end_user_id: Optional user filter
limit: Max results
@@ -989,7 +994,7 @@ async def search_perceptual(
try:
perceptuals = await connector.execute_query(
SEARCH_PERCEPTUAL_BY_KEYWORD,
q=q,
query=escape_lucene_query(query),
end_user_id=end_user_id,
limit=limit,
)

View File

@@ -616,6 +616,7 @@ class AppChatRequest(BaseModel):
stream: bool = Field(default=False, description="是否流式返回")
thinking: bool = Field(default=False, description="是否启用深度思考需Agent配置支持")
files: List[FileInput] = Field(default_factory=list, description="附件列表(支持多文件)")
version: Optional[int] = Field(default=None, description="指定发布版本号,不传则使用当前发布版本")
class DraftRunRequest(BaseModel):

View File

@@ -620,6 +620,28 @@ class AppService:
self._validate_app_accessible(app, workspace_id)
return app
def get_release_by_version(self, app_id: uuid.UUID, version: int) -> AppRelease:
"""按版本号获取发布快照
Args:
app_id: 应用ID
version: 版本号(整数,按应用内递增)
Returns:
AppRelease: 发布快照
Raises:
BusinessException: 版本不存在或已下线
"""
from app.repositories.app_repository import get_release_by_version
release = get_release_by_version(self.db, app_id, version)
if not release:
raise BusinessException(
f"版本 {version} 不存在或已下线",
BizCode.RELEASE_NOT_FOUND,
)
return release
def create_app(
self,
*,

View File

@@ -803,7 +803,6 @@ def get_rag_content(
"page": {
"page": page,
"pagesize": pagesize,
"total": 0,
"hasnext": False,
},
"items": []
@@ -897,13 +896,12 @@ def get_rag_content(
"page": {
"page": page,
"pagesize": pagesize,
"total": global_total,
"hasnext": offset_end < global_total,
},
"items": conversations
}
business_logger.info(f"成功获取RAG内容: total={global_total}, page={page}, 返回={len(conversations)} 条对话")
business_logger.info(f"成功获取RAG内容: page={page}, 返回={len(conversations)} 条对话")
return result
except Exception as e:

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import os
import re
import shutil
@@ -1001,7 +1002,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID):
except Exception as e:
print(f"\n\nError during fetch feishu: {e}")
case _: # General
print(f"General: No synchronization needed\n")
print("General: No synchronization needed\n")
result = f"sync knowledge '{db_knowledge.name}' processed successfully."
return result
@@ -1510,6 +1511,7 @@ def write_all_workspaces_memory_task(self) -> Dict[str, Any]:
"status": "SUCCESS",
"total_num": total_num,
"end_user_count": len(end_users),
"end_user_details": end_user_details,
"memory_increment_id": str(memory_increment.id),
"created_at": memory_increment.created_at.isoformat(),
})
@@ -2602,35 +2604,34 @@ def init_interest_distribution_for_users(self, end_user_ids: List[str]) -> Dict[
service = MemoryAgentService()
with get_db_context() as db:
for end_user_id in end_user_ids:
# 存在性检查:缓存有数据则跳过
cached = await InterestMemoryCache.get_interest_distribution(
for end_user_id in end_user_ids:
# 存在性检查:缓存有数据则跳过
cached = await InterestMemoryCache.get_interest_distribution(
end_user_id=end_user_id,
language=language,
)
if cached is not None:
skipped += 1
continue
logger.info(f"用户 {end_user_id} 无兴趣分布缓存,开始生成")
try:
result = await service.get_interest_distribution_by_user(
end_user_id=end_user_id,
limit=5,
language=language,
)
if cached is not None:
skipped += 1
continue
logger.info(f"用户 {end_user_id} 无兴趣分布缓存,开始生成")
try:
result = await service.get_interest_distribution_by_user(
end_user_id=end_user_id,
limit=5,
language=language,
)
await InterestMemoryCache.set_interest_distribution(
end_user_id=end_user_id,
language=language,
data=result,
expire=INTEREST_CACHE_EXPIRE,
)
initialized += 1
logger.info(f"用户 {end_user_id} 兴趣分布缓存生成成功")
except Exception as e:
failed += 1
logger.error(f"用户 {end_user_id} 兴趣分布缓存生成失败: {e}")
await InterestMemoryCache.set_interest_distribution(
end_user_id=end_user_id,
language=language,
data=result,
expire=INTEREST_CACHE_EXPIRE,
)
initialized += 1
logger.info(f"用户 {end_user_id} 兴趣分布缓存生成成功")
except Exception as e:
failed += 1
logger.error(f"用户 {end_user_id} 兴趣分布缓存生成失败: {e}")
logger.info(f"兴趣分布按需初始化完成: 初始化={initialized}, 跳过={skipped}, 失败={failed}")
return {
@@ -2914,4 +2915,270 @@ def init_community_clustering_for_users(self, end_user_ids: List[str], workspace
}
# ─── User Metadata Extraction Task ───────────────────────────────────────────
def _update_timestamps(existing: dict, new: dict, updated_at: dict, now: str, prefix: str = "") -> None:
"""对比新旧元数据,更新变更字段的 _updated_at 时间戳。"""
for key, new_val in new.items():
if key == "_updated_at":
continue
path = f"{prefix}.{key}" if prefix else key
old_val = existing.get(key)
if isinstance(new_val, dict) and isinstance(old_val, dict):
_update_timestamps(old_val, new_val, updated_at, now, prefix=path)
elif old_val != new_val:
updated_at[path] = now
@celery_app.task(
bind=True,
name='app.tasks.extract_user_metadata',
ignore_result=False,
max_retries=0,
acks_late=True,
time_limit=300,
soft_time_limit=240,
)
def extract_user_metadata_task(
self,
end_user_id: str,
statements: List[str],
config_id: Optional[str] = None,
language: str = "zh",
) -> Dict[str, Any]:
"""异步提取用户元数据并写入数据库。
在去重消歧完成后由编排器触发,使用独立 LLM 调用提取元数据。
LLM 配置优先使用 config_id 对应的应用配置,失败时回退到工作空间默认配置。
Args:
end_user_id: 终端用户 ID
statements: 用户相关的 statement 文本列表
config_id: 应用配置 ID可选
language: 语言类型 ("zh" 中文, "en" 英文)
Returns:
包含任务执行结果的字典
"""
start_time = time.time()
logger.info(
f"[CELERY METADATA] Starting metadata extraction - end_user_id={end_user_id}, "
f"statements_count={len(statements)}, config_id={config_id}, language={language}"
)
async def _run() -> Dict[str, Any]:
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.metadata_extractor import MetadataExtractor
from app.repositories.end_user_info_repository import EndUserInfoRepository
from app.repositories.end_user_repository import EndUserRepository
from app.services.memory_config_service import MemoryConfigService
# 1. 获取 LLM 配置(应用配置 → 工作空间配置兜底)并创建 LLM client
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
# 获取 workspace_id from end_user
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if not end_user:
return {"status": "FAILURE", "error": f"End user not found: {end_user_id}"}
workspace_id = end_user.workspace_id
config_service = MemoryConfigService(db)
memory_config = config_service.get_config_with_fallback(
memory_config_id=uuid.UUID(config_id) if config_id else None,
workspace_id=workspace_id,
)
if not memory_config:
return {"status": "FAILURE", "error": "No LLM config available (app + workspace fallback failed)"}
# 2. 创建 LLM client
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
factory = MemoryClientFactory(db)
if not memory_config.llm_id:
return {"status": "FAILURE", "error": "Memory config has no LLM model configured"}
llm_client = factory.get_llm_client(memory_config.llm_id)
# 2.5 读取已有元数据和别名,传给 extractor 作为上下文
existing_metadata = None
existing_aliases = None
try:
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
if info:
if info.meta_data:
existing_metadata = info.meta_data
existing_aliases = info.aliases if info.aliases else []
logger.info(f"[CELERY METADATA] 已读取已有元数据和别名aliases={existing_aliases}")
except Exception as e:
logger.warning(f"[CELERY METADATA] 读取已有数据失败(继续无上下文提取): {e}")
# 3. 提取元数据和别名(传入已有数据作为上下文)
extractor = MetadataExtractor(llm_client=llm_client, language=language)
extract_result = await extractor.extract_metadata(
statements,
existing_metadata=existing_metadata,
existing_aliases=existing_aliases,
)
if not extract_result:
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
user_metadata, aliases_to_add, aliases_to_remove = extract_result
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
# 4. 清洗元数据、覆盖写入元数据和别名
def clean_metadata(raw: dict) -> dict:
"""递归移除空字符串、空列表、空字典。"""
result = {}
for k, v in raw.items():
if v == "" or v == []:
continue
if isinstance(v, dict):
cleaned = clean_metadata(v)
if cleaned:
result[k] = cleaned
else:
result[k] = v
return result
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
cleaned = clean_metadata(raw_dict) if raw_dict else {}
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
from datetime import datetime as dt, timezone as tz
now = dt.now(tz.utc).isoformat()
# 过滤别名中的占位名称,执行增量增删
_PLACEHOLDER_NAMES = {"用户", "", "user", "i"}
def _filter_aliases(aliases_list):
seen = set()
result = []
for a in aliases_list:
a_stripped = a.strip()
if a_stripped and a_stripped.lower() not in _PLACEHOLDER_NAMES and a_stripped.lower() not in seen:
result.append(a_stripped)
seen.add(a_stripped.lower())
return result
filtered_add = _filter_aliases(aliases_to_add)
filtered_remove = _filter_aliases(aliases_to_remove)
remove_lower = {a.lower() for a in filtered_remove}
with get_db_context() as db:
end_user_uuid = uuid.UUID(end_user_id)
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
if info:
# 元数据覆盖写入
if cleaned:
existing_meta = info.meta_data if info.meta_data else {}
updated_at = dict(existing_meta.get("_updated_at", {}))
_update_timestamps(existing_meta, cleaned, updated_at, now)
final = dict(cleaned)
final["_updated_at"] = updated_at
info.meta_data = final
logger.info("[CELERY METADATA] 覆盖写入元数据")
# 别名增量增删:(已有 - remove) + add
old_aliases = info.aliases if info.aliases else []
# 先移除
merged = [a for a in old_aliases if a.strip().lower() not in remove_lower]
# 再追加(去重)
existing_lower = {a.strip().lower() for a in merged}
for a in filtered_add:
if a.lower() not in existing_lower:
merged.append(a)
existing_lower.add(a.lower())
if merged != old_aliases:
info.aliases = merged
# other_name 更新逻辑
if merged and (
not info.other_name
or info.other_name.strip().lower() in _PLACEHOLDER_NAMES
or info.other_name.strip().lower() in remove_lower
):
info.other_name = merged[0]
if end_user and merged and (
not end_user.other_name
or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
or end_user.other_name.strip().lower() in remove_lower
):
end_user.other_name = merged[0]
logger.info(
f"[CELERY METADATA] 别名增量更新: {old_aliases} - {filtered_remove} + {filtered_add}{merged}"
)
else:
# 没有 end_user_info 记录,创建一条
from app.models.end_user_info_model import EndUserInfo
initial_aliases = filtered_add # 新记录只有 add没有 remove
first_alias = initial_aliases[0] if initial_aliases else ""
if first_alias or cleaned:
new_info = EndUserInfo(
end_user_id=end_user_uuid,
other_name=first_alias or "",
aliases=initial_aliases,
meta_data=cleaned if cleaned else None,
)
db.add(new_info)
if end_user and first_alias and (
not end_user.other_name or end_user.other_name.strip().lower() in _PLACEHOLDER_NAMES
):
end_user.other_name = first_alias
logger.info(f"[CELERY METADATA] 创建 end_user_info: other_name={first_alias}, aliases={initial_aliases}")
else:
return {"status": "SUCCESS", "result": "no_data_to_write"}
db.commit()
# 同步 PgSQL aliases 到 Neo4j 用户实体PgSQL 为权威源)
final_aliases = info.aliases if info else initial_aliases
if final_aliases:
try:
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
neo4j_connector = Neo4jConnector()
cypher = """
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '', 'User', 'I']
SET e.aliases = $aliases
"""
await neo4j_connector.execute_query(
cypher, end_user_id=end_user_id, aliases=final_aliases
)
await neo4j_connector.close()
logger.info(f"[CELERY METADATA] Neo4j 用户实体 aliases 已同步: {final_aliases}")
except Exception as neo4j_err:
logger.warning(f"[CELERY METADATA] Neo4j aliases 同步失败(不影响主流程): {neo4j_err}")
return {"status": "SUCCESS", "result": "metadata_and_aliases_written"}
loop = None
try:
loop = set_asyncio_event_loop()
result = loop.run_until_complete(_run())
elapsed = time.time() - start_time
result["elapsed_time"] = elapsed
result["task_id"] = self.request.id
logger.info(f"[CELERY METADATA] Task completed - elapsed={elapsed:.2f}s, result={result.get('result')}")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"[CELERY METADATA] Task failed - elapsed={elapsed:.2f}s, error={e}", exc_info=True)
return {
"status": "FAILURE",
"error": str(e),
"elapsed_time": elapsed,
"task_id": self.request.id,
}
finally:
if loop:
_shutdown_loop_gracefully(loop)
# unused task