新增中翻英功能(记忆时间线)(用户摘要)(兴趣分布接口)(查询核心档案)(记忆洞察)-接口添加翻译字段

This commit is contained in:
lixinyue
2026-01-21 19:37:03 +08:00
parent afcf12ebc9
commit 4a4931bee2
84 changed files with 1193 additions and 1190 deletions

View File

@@ -35,10 +35,10 @@ async def Split_The_Problem(state: ReadState) -> ReadState:
"""问题分解节点"""
# 从状态中获取数据
content = state.get('data', '')
group_id = state.get('group_id', '')
end_user_id = state.get('end_user_id', '')
memory_config = state.get('memory_config', None)
history = await SessionService(store).get_history(group_id, group_id, group_id)
history = await SessionService(store).get_history(end_user_id, end_user_id, end_user_id)
# 生成 JSON schema 以指导 LLM 输出正确格式
json_schema = ProblemExtensionResponse.model_json_schema()
@@ -140,7 +140,7 @@ async def Problem_Extension(state: ReadState) -> ReadState:
start = time.time()
content = state.get('data', '')
data = state.get('spit_data', '')['context']
group_id = state.get('group_id', '')
end_user_id = state.get('end_user_id', '')
storage_type = state.get('storage_type', '')
user_rag_memory_id = state.get('user_rag_memory_id', '')
memory_config = state.get('memory_config', None)
@@ -156,7 +156,7 @@ async def Problem_Extension(state: ReadState) -> ReadState:
databasets = {}
data = []
history = await SessionService(store).get_history(group_id, group_id, group_id)
history = await SessionService(store).get_history(end_user_id, end_user_id, end_user_id)
# 生成 JSON schema 以指导 LLM 输出正确格式
json_schema = ProblemExtensionResponse.model_json_schema()

View File

@@ -52,9 +52,9 @@ async def rag_config(state):
return kb_config
async def rag_knowledge(state,question):
kb_config = await rag_config(state)
group_id = state.get('group_id', '')
end_user_id = state.get('end_user_id', '')
user_rag_memory_id=state.get("user_rag_memory_id",'')
retrieve_chunks_result = knowledge_retrieval(question, kb_config, [str(group_id)])
retrieve_chunks_result = knowledge_retrieval(question, kb_config, [str(end_user_id)])
try:
retrieval_knowledge = [i.page_content for i in retrieve_chunks_result]
clean_content = '\n\n'.join(retrieval_knowledge)
@@ -159,7 +159,7 @@ async def retrieve_nodes(state: ReadState) -> ReadState:
problem_extension=state.get('problem_extension', '')['context']
storage_type=state.get('storage_type', '')
user_rag_memory_id=state.get('user_rag_memory_id', '')
group_id=state.get('group_id', '')
end_user_id=state.get('end_user_id', '')
memory_config = state.get('memory_config', None)
original=state.get('data', '')
problem_list=[]
@@ -172,7 +172,7 @@ async def retrieve_nodes(state: ReadState) -> ReadState:
try:
# Prepare search parameters based on storage type
search_params = {
"group_id": group_id,
"end_user_id": end_user_id,
"question": question,
"return_raw_results": True
}
@@ -263,13 +263,13 @@ async def retrieve_nodes(state: ReadState) -> ReadState:
async def retrieve(state: ReadState) -> ReadState:
# 从state中获取group_id
# 从state中获取end_user_id
import time
start=time.time()
problem_extension = state.get('problem_extension', '')['context']
storage_type = state.get('storage_type', '')
user_rag_memory_id = state.get('user_rag_memory_id', '')
group_id = state.get('group_id', '')
end_user_id = state.get('end_user_id', '')
memory_config = state.get('memory_config', None)
original = state.get('data', '')
problem_list = []
@@ -295,13 +295,13 @@ async def retrieve(state: ReadState) -> ReadState:
temperature=0.2,
)
time_retrieval_tool = create_time_retrieval_tool(group_id)
search_params = { "group_id": group_id, "return_raw_results": True }
time_retrieval_tool = create_time_retrieval_tool(end_user_id)
search_params = { "end_user_id": end_user_id, "return_raw_results": True }
hybrid_retrieval=create_hybrid_retrieval_tool_sync(memory_config, **search_params)
agent = create_agent(
llm,
tools=[time_retrieval_tool,hybrid_retrieval],
system_prompt=f"我是检索专家,可以根据适合的工具进行检索。当前使用的group_id是: {group_id}"
system_prompt=f"我是检索专家,可以根据适合的工具进行检索。当前使用的end_user_id是: {end_user_id}"
)
# 创建异步任务处理单个问题

View File

@@ -34,8 +34,8 @@ class SummaryNodeService(LLMServiceMixin):
summary_service = SummaryNodeService()
async def summary_history(state: ReadState) -> ReadState:
group_id = state.get("group_id", '')
history = await SessionService(store).get_history(group_id, group_id, group_id)
end_user_id = state.get("end_user_id", '')
history = await SessionService(store).get_history(end_user_id, end_user_id, end_user_id)
return history
async def summary_llm(state: ReadState, history, retrieve_info, template_name, operation_name, response_model,search_mode) -> str:
@@ -122,12 +122,12 @@ async def summary_llm(state: ReadState, history, retrieve_info, template_name, o
async def summary_redis_save(state: ReadState,aimessages) -> ReadState:
data = state.get("data", '')
group_id = state.get("group_id", '')
end_user_id = state.get("end_user_id", '')
await SessionService(store).save_session(
user_id=group_id,
user_id=end_user_id,
query=data,
apply_id=group_id,
group_id=group_id,
apply_id=end_user_id,
end_user_id=end_user_id,
ai_response=aimessages
)
await SessionService(store).cleanup_duplicates()
@@ -175,11 +175,11 @@ async def Input_Summary(state: ReadState) -> ReadState:
memory_config = state.get('memory_config', None)
user_rag_memory_id=state.get("user_rag_memory_id",'')
data=state.get("data", '')
group_id=state.get("group_id", '')
end_user_id=state.get("end_user_id", '')
logger.info(f"Input_Summary: storage_type={storage_type}, user_rag_memory_id={user_rag_memory_id}")
history = await summary_history( state)
search_params = {
"group_id": group_id,
"end_user_id": end_user_id,
"question": data,
"return_raw_results": True,
"include": ["summaries"] # Only search summary nodes for faster performance

View File

@@ -62,12 +62,12 @@ async def Verify(state: ReadState):
logger.info("=== Verify 节点开始执行 ===")
try:
content = state.get('data', '')
group_id = state.get('group_id', '')
end_user_id = state.get('end_user_id', '')
memory_config = state.get('memory_config', None)
logger.info(f"Verify: content={content[:50] if content else 'empty'}..., group_id={group_id}")
logger.info(f"Verify: content={content[:50] if content else 'empty'}..., end_user_id={end_user_id}")
history = await SessionService(store).get_history(group_id, group_id, group_id)
history = await SessionService(store).get_history(end_user_id, end_user_id, end_user_id)
logger.info(f"Verify: 获取历史记录完成history length={len(history)}")
retrieve = state.get("retrieve", {})

View File

@@ -9,47 +9,36 @@ async def write_node(state: WriteState) -> WriteState:
Write data to the database/file system.
Args:
state: WriteState containing messages, group_id, and memory_config
content: Data content to write
end_user_id: End user identifier
memory_config: MemoryConfig object containing all configuration
Returns:
dict: Contains 'write_result' with status and data fields
dict: Contains 'status', 'saved_to', and 'data' fields
"""
messages = state.get('messages', [])
group_id = state.get('group_id', '')
memory_config = state.get('memory_config', '')
# Convert LangChain messages to structured format expected by write()
structured_messages = []
for msg in messages:
if hasattr(msg, 'type') and hasattr(msg, 'content'):
# Map LangChain message types to role names
role = 'user' if msg.type == 'human' else 'assistant' if msg.type == 'ai' else msg.type
structured_messages.append({
"role": role,
"content": msg.content # content is now guaranteed to be a string
})
content=state.get('data','')
end_user_id=state.get('end_user_id','')
memory_config=state.get('memory_config', '')
try:
result = await write(
messages=structured_messages,
user_id=group_id,
apply_id=group_id,
group_id=group_id,
result=await write(
content=content,
end_user_id=end_user_id,
memory_config=memory_config,
)
logger.info(f"Write completed successfully! Config: {memory_config.config_name}")
write_result = {
write_result= {
"status": "success",
"data": structured_messages,
"data": content,
"config_id": memory_config.config_id,
"config_name": memory_config.config_name,
}
return {"write_result": write_result}
return {"write_result":write_result}
except Exception as e:
logger.error(f"Data_write failed: {e}", exc_info=True)
write_result = {
write_result= {
"status": "error",
"message": str(e),
}

View File

@@ -79,7 +79,7 @@ async def make_read_graph():
async def main():
"""主函数 - 运行工作流"""
message = "昨天有什么好看的电影"
group_id = '88a459f5_text09' # 组ID
end_user_id = '88a459f5_text09' # 组ID
storage_type = 'neo4j' # 存储类型
search_switch = '1' # 搜索开关
user_rag_memory_id = 'wwwwwwww' # 用户RAG记忆ID
@@ -95,9 +95,9 @@ async def main():
start=time.time()
try:
async with make_read_graph() as graph:
config = {"configurable": {"thread_id": group_id}}
config = {"configurable": {"thread_id": end_user_id}}
# 初始状态 - 包含所有必要字段
initial_state = {"messages": [HumanMessage(content=message)] ,"search_switch":search_switch,"group_id":group_id
initial_state = {"messages": [HumanMessage(content=message)] ,"search_switch":search_switch,"end_user_id":end_user_id
,"storage_type":storage_type,"user_rag_memory_id":user_rag_memory_id,"memory_config":memory_config}
# 获取节点更新信息
_intermediate_outputs = []

View File

@@ -48,11 +48,11 @@ def extract_tool_message_content(response):
class TimeRetrievalInput(BaseModel):
"""时间检索工具的输入模式"""
context: str = Field(description="用户输入的查询内容")
group_id: str = Field(default="88a459f5_text09", description="组ID用于过滤搜索结果")
end_user_id: str = Field(default="88a459f5_text09", description="组ID用于过滤搜索结果")
def create_time_retrieval_tool(group_id: str):
def create_time_retrieval_tool(end_user_id: str):
"""
创建一个带有特定group_id的TimeRetrieval工具同步版本用于按时间范围搜索语句(Statements)
创建一个带有特定end_user_id的TimeRetrieval工具同步版本用于按时间范围搜索语句(Statements)
"""
def clean_temporal_result_fields(data):
@@ -93,26 +93,26 @@ def create_time_retrieval_tool(group_id: str):
return data
@tool
def TimeRetrievalWithGroupId(context: str, start_date: str = None, end_date: str = None, group_id_param: str = None, clean_output: bool = True) -> str:
def TimeRetrievalWithGroupId(context: str, start_date: str = None, end_date: str = None, end_user_id_param: str = None, clean_output: bool = True) -> str:
"""
优化的时间检索工具,只结合时间范围搜索(同步版本),自动过滤不需要的元数据字段
显式接收参数:
- context: 查询上下文内容
- start_date: 开始时间可选格式YYYY-MM-DD
- end_date: 结束时间可选格式YYYY-MM-DD
- group_id_param: 组ID可选用于覆盖默认组ID
- end_user_id_param: 组ID可选用于覆盖默认组ID
- clean_output: 是否清理输出中的元数据字段
-end_date 需要根据用户的描述获取结束的时间输出格式用strftime("%Y-%m-%d")
"""
async def _async_search():
# 使用传入的参数或默认值
actual_group_id = group_id_param or group_id
actual_end_user_id = end_user_id_param or end_user_id
actual_end_date = end_date or datetime.now().strftime("%Y-%m-%d")
actual_start_date = start_date or (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
# 基本时间搜索
results = await search_by_temporal(
group_id=actual_group_id,
end_user_id=actual_end_user_id,
start_date=actual_start_date,
end_date=actual_end_date,
limit=10
@@ -147,7 +147,7 @@ def create_time_retrieval_tool(group_id: str):
# 关键词时间搜索
results = await search_by_keyword_temporal(
query_text=context,
group_id=group_id,
end_user_id=end_user_id,
start_date=actual_start_date,
end_date=actual_end_date,
limit=15
@@ -172,7 +172,7 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
Args:
memory_config: 内存配置对象
**search_params: 搜索参数,包含group_id, limit, include等
**search_params: 搜索参数,包含end_user_id, limit, include等
"""
def clean_result_fields(data):
@@ -211,7 +211,7 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
context: str,
search_type: str = "hybrid",
limit: int = 10,
group_id: str = None,
end_user_id: str = None,
rerank_alpha: float = 0.6,
use_forgetting_rerank: bool = False,
use_llm_rerank: bool = False,
@@ -224,7 +224,7 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
context: 查询内容
search_type: 搜索类型 ('keyword', 'embedding', 'hybrid')
limit: 结果数量限制
group_id: 组ID用于过滤搜索结果
end_user_id: 组ID用于过滤搜索结果
rerank_alpha: 重排序权重参数
use_forgetting_rerank: 是否使用遗忘重排序
use_llm_rerank: 是否使用LLM重排序
@@ -238,7 +238,7 @@ def create_hybrid_retrieval_tool_async(memory_config, **search_params):
final_params = {
"query_text": context,
"search_type": search_type,
"group_id": group_id or search_params.get("group_id"),
"end_user_id": end_user_id or search_params.get("end_user_id"),
"limit": limit or search_params.get("limit", 10),
"include": search_params.get("include", ["summaries", "statements", "chunks", "entities"]),
"output_path": None, # 不保存到文件
@@ -291,7 +291,7 @@ def create_hybrid_retrieval_tool_sync(memory_config, **search_params):
context: str,
search_type: str = "hybrid",
limit: int = 10,
group_id: str = None,
end_user_id: str = None,
clean_output: bool = True
) -> str:
"""
@@ -301,7 +301,7 @@ def create_hybrid_retrieval_tool_sync(memory_config, **search_params):
context: 查询内容
search_type: 搜索类型 ('keyword', 'embedding', 'hybrid')
limit: 结果数量限制
group_id: 组ID用于过滤搜索结果
end_user_id: 组ID用于过滤搜索结果
clean_output: 是否清理输出中的元数据字段
"""
async def _async_search():
@@ -311,7 +311,7 @@ def create_hybrid_retrieval_tool_sync(memory_config, **search_params):
"context": context,
"search_type": search_type,
"limit": limit,
"group_id": group_id,
"end_user_id": end_user_id,
"clean_output": clean_output
})

View File

@@ -14,6 +14,7 @@ from app.db import get_db
from app.core.logging_config import get_agent_logger
from app.core.memory.agent.utils.llm_tools import WriteState
from app.core.memory.agent.langgraph_graph.nodes.write_nodes import write_node
from app.core.memory.agent.langgraph_graph.nodes.data_nodes import content_input_write
from app.services.memory_config_service import MemoryConfigService
warnings.filterwarnings("ignore", category=RuntimeWarning)
@@ -26,12 +27,18 @@ async def make_write_graph():
"""
Create a write graph workflow for memory operations.
The workflow directly processes messages from the initial state
and saves them to Neo4j storage.
Args:
user_id: User identifier
tools: MCP tools loaded from session
apply_id: Application identifier
end_user_id: Group identifier
memory_config: MemoryConfig object containing all configuration
"""
workflow = StateGraph(WriteState)
workflow.add_node("content_input", content_input_write)
workflow.add_node("save_neo4j", write_node)
workflow.add_edge(START, "save_neo4j")
workflow.add_edge(START, "content_input")
workflow.add_edge("content_input", "save_neo4j")
workflow.add_edge("save_neo4j", END)
graph = workflow.compile()
@@ -42,7 +49,7 @@ async def make_write_graph():
async def main():
"""主函数 - 运行工作流"""
message = "今天周一"
group_id = 'new_2025test1103' # 组ID
end_user_id = 'new_2025test1103' # 组ID
# 获取数据库会话
@@ -54,9 +61,9 @@ async def main():
)
try:
async with make_write_graph() as graph:
config = {"configurable": {"thread_id": group_id}}
config = {"configurable": {"thread_id": end_user_id}}
# 初始状态 - 包含所有必要字段
initial_state = {"messages": [HumanMessage(content=message)], "group_id": group_id, "memory_config": memory_config}
initial_state = {"messages": [HumanMessage(content=message)], "end_user_id": end_user_id, "memory_config": memory_config}
# 获取节点更新信息
async for update_event in graph.astream(

View File

@@ -24,7 +24,7 @@ class ParameterBuilder:
tool_call_id: str,
search_switch: str,
apply_id: str,
group_id: str,
end_user_id: str,
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None
) -> Dict[str, Any]:
@@ -44,7 +44,7 @@ class ParameterBuilder:
tool_call_id: Extracted tool call identifier
search_switch: Search routing parameter
apply_id: Application identifier
group_id: Group identifier
end_user_id: Group identifier
storage_type: Storage type for the workspace (optional)
user_rag_memory_id: User RAG memory ID for knowledge base retrieval (optional)
@@ -55,7 +55,7 @@ class ParameterBuilder:
base_args = {
"usermessages": tool_call_id,
"apply_id": apply_id,
"group_id": group_id
"end_user_id": end_user_id
}
# Always add storage_type and user_rag_memory_id (with defaults if None)

View File

@@ -91,7 +91,7 @@ class SearchService:
async def execute_hybrid_search(
self,
group_id: str,
end_user_id: str,
question: str,
limit: int = 5,
search_type: str = "hybrid",
@@ -105,7 +105,7 @@ class SearchService:
Execute hybrid search and return clean content.
Args:
group_id: Group identifier for filtering results
end_user_id: Group identifier for filtering results
question: Search query text
limit: Maximum number of results to return (default: 5)
search_type: Type of search - "hybrid", "keyword", or "embedding" (default: "hybrid")
@@ -130,7 +130,7 @@ class SearchService:
answer = await run_hybrid_search(
query_text=cleaned_query,
search_type=search_type,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include,
output_path=output_path,
@@ -186,7 +186,7 @@ class SearchService:
except Exception as e:
logger.error(
f"Search failed for query '{question}' in group '{group_id}': {e}",
f"Search failed for query '{question}' in group '{end_user_id}': {e}",
exc_info=True
)
# Return empty results on failure

View File

@@ -59,7 +59,7 @@ class SessionService:
self,
user_id: str,
apply_id: str,
group_id: str
end_user_id: str
) -> List[dict]:
"""
Retrieve conversation history from Redis.
@@ -67,20 +67,20 @@ class SessionService:
Args:
user_id: User identifier
apply_id: Application identifier
group_id: Group identifier
end_user_id: Group identifier
Returns:
List of conversation history items with Query and Answer keys
Returns empty list if no history found or on error
"""
try:
history = self.store.find_user_apply_group(user_id, apply_id, group_id)
history = self.store.find_user_apply_group(user_id, apply_id, end_user_id)
# Validate history structure
if not isinstance(history, list):
logger.warning(
f"Invalid history format for user {user_id}, "
f"apply {apply_id}, group {group_id}: expected list, got {type(history)}"
f"apply {apply_id}, group {end_user_id}: expected list, got {type(history)}"
)
return []
@@ -89,7 +89,7 @@ class SessionService:
except Exception as e:
logger.error(
f"Failed to retrieve history for user {user_id}, "
f"apply {apply_id}, group {group_id}: {e}",
f"apply {apply_id}, group {end_user_id}: {e}",
exc_info=True
)
# Return empty list on error to allow execution to continue
@@ -100,7 +100,7 @@ class SessionService:
user_id: str,
query: str,
apply_id: str,
group_id: str,
end_user_id: str,
ai_response: str
) -> Optional[str]:
"""
@@ -110,7 +110,7 @@ class SessionService:
user_id: User identifier
query: User query/message
apply_id: Application identifier
group_id: Group identifier
end_user_id: Group identifier
ai_response: AI response/answer
Returns:
@@ -131,7 +131,7 @@ class SessionService:
userid=user_id,
messages=query,
apply_id=apply_id,
group_id=group_id,
end_user_id=end_user_id,
aimessages=ai_response
)
@@ -152,7 +152,7 @@ class SessionService:
Duplicates are identified by matching:
- sessionid
- user_id (id field)
- group_id
- end_user_id
- messages
- aimessages

View File

@@ -9,65 +9,56 @@ from app.core.memory.models.message_models import DialogData, ConversationContex
async def get_chunked_dialogs(
chunker_strategy: str = "RecursiveChunker",
group_id: str = "group_1",
user_id: str = "user1",
apply_id: str = "applyid",
messages: list = None,
end_user_id: str = "group_1",
content: str = "这是用户的输入",
ref_id: str = "wyl_20251027",
config_id: str = None
) -> List[DialogData]:
"""Generate chunks from structured messages using the specified chunker strategy.
"""Generate chunks from all test data entries using the specified chunker strategy.
Args:
chunker_strategy: The chunking strategy to use (default: RecursiveChunker)
group_id: Group identifier
user_id: User identifier
apply_id: Application identifier
messages: Structured message list [{"role": "user", "content": "..."}, ...]
end_user_id: End user identifier
content: Dialog content
ref_id: Reference identifier
config_id: Configuration ID for processing
Returns:
List of DialogData objects with generated chunks
List of DialogData objects with generated chunks for each test entry
"""
from app.core.logging_config import get_agent_logger
logger = get_agent_logger(__name__)
if not messages or not isinstance(messages, list) or len(messages) == 0:
raise ValueError("messages parameter must be a non-empty list")
conversation_messages = []
for idx, msg in enumerate(messages):
if not isinstance(msg, dict) or 'role' not in msg or 'content' not in msg:
raise ValueError(f"Message {idx} format error: must contain 'role' and 'content' fields")
role = msg['role']
content = msg['content']
if role not in ['user', 'assistant']:
raise ValueError(f"Message {idx} role must be 'user' or 'assistant', got: {role}")
if content.strip():
conversation_messages.append(ConversationMessage(role=role, msg=content.strip()))
if not conversation_messages:
raise ValueError("Message list cannot be empty after filtering")
conversation_context = ConversationContext(msgs=conversation_messages)
dialog_data_list = []
messages = []
messages.append(ConversationMessage(role="用户", msg=content))
# Create DialogData
conversation_context = ConversationContext(msgs=messages)
# Create DialogData with end_user_id
dialog_data = DialogData(
context=conversation_context,
ref_id=ref_id,
group_id=group_id,
user_id=user_id,
apply_id=apply_id,
end_user_id=end_user_id,
config_id=config_id
)
# Create DialogueChunker and process the dialogue
chunker = DialogueChunker(chunker_strategy)
extracted_chunks = await chunker.process_dialogue(dialog_data)
dialog_data.chunks = extracted_chunks
logger.info(f"DialogData created with {len(extracted_chunks)} chunks")
return [dialog_data]
dialog_data_list.append(dialog_data)
# Convert to dict with datetime serialized
def serialize_datetime(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
combined_output = [dd.model_dump() for dd in dialog_data_list]
print(dialog_data_list)
# with open(os.path.join(os.path.dirname(__file__), "chunker_test_output.txt"), "w", encoding="utf-8") as f:
# json.dump(combined_output, f, ensure_ascii=False, indent=4, default=serialize_datetime)
return dialog_data_list

View File

@@ -12,13 +12,11 @@ class WriteState(TypedDict):
Langgrapg Writing TypedDict
'''
messages: Annotated[list[AnyMessage], add_messages]
user_id:str
apply_id:str
group_id:str
end_user_id: str
errors: list[dict] # Track errors: [{"tool": "tool_name", "error": "message"}]
memory_config: object
write_result: dict
data:str
data: str
class ReadState(TypedDict):
"""
@@ -28,7 +26,7 @@ class ReadState(TypedDict):
messages: 消息列表,支持自动追加
loop_count: 遍历次数
search_switch: 搜索类型开关
group_id: 组标识
end_user_id: 组标识
config_id: 配置ID用于过滤结果
data: 从content_input_node传递的内容数据
spit_data: 从Split_The_Problem传递的分解结果
@@ -39,7 +37,7 @@ class ReadState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages] # 消息追加模式
loop_count: int
search_switch: str
group_id: str
end_user_id: str
config_id: str
data: str # 新增字段用于传递内容
spit_data: dict # 新增字段用于传递问题分解结果

View File

@@ -28,7 +28,7 @@ class RedisSessionStore:
return text
# 修改后的 save_session 方法
def save_session(self, userid, messages, aimessages, apply_id, group_id):
def save_session(self, userid, messages, aimessages, apply_id, end_user_id):
"""
写入一条会话数据,返回 session_id
优化版本确保写入时间不超过1秒
@@ -46,7 +46,7 @@ class RedisSessionStore:
"id": self.uudi,
"sessionid": userid,
"apply_id": apply_id,
"group_id": group_id,
"end_user_id": end_user_id,
"messages": messages,
"aimessages": aimessages,
"starttime": starttime
@@ -67,7 +67,7 @@ class RedisSessionStore:
def save_sessions_batch(self, sessions_data):
"""
批量写入多条会话数据,返回 session_id 列表
sessions_data: list of dict, 每个 dict 包含 userid, messages, aimessages, apply_id, group_id
sessions_data: list of dict, 每个 dict 包含 userid, messages, aimessages, apply_id, end_user_id
优化版本:批量操作,大幅提升性能
"""
try:
@@ -83,7 +83,7 @@ class RedisSessionStore:
"id": self.uudi,
"sessionid": session.get('userid'),
"apply_id": session.get('apply_id'),
"group_id": session.get('group_id'),
"end_user_id": session.get('end_user_id'),
"messages": session.get('messages'),
"aimessages": session.get('aimessages'),
"starttime": starttime
@@ -108,9 +108,9 @@ class RedisSessionStore:
data = self.r.hgetall(key)
return data if data else None
def get_session_apply_group(self, sessionid, apply_id, group_id):
def get_session_apply_group(self, sessionid, apply_id, end_user_id):
"""
根据 sessionid、apply_id 和 group_id 三个条件查询会话数据
根据 sessionid、apply_id 和 end_user_id 三个条件查询会话数据
"""
result_items = []
@@ -124,7 +124,7 @@ class RedisSessionStore:
# 检查三个条件是否都匹配
if (data.get('sessionid') == sessionid and
data.get('apply_id') == apply_id and
data.get('group_id') == group_id):
data.get('end_user_id') == end_user_id):
result_items.append(data)
return result_items
@@ -172,7 +172,7 @@ class RedisSessionStore:
def delete_duplicate_sessions(self):
"""
删除重复会话数据,条件:
"sessionid""user_id""group_id""messages""aimessages" 五个字段都相同的只保留一个,其他删除
"sessionid""user_id""end_user_id""messages""aimessages" 五个字段都相同的只保留一个,其他删除
优化版本:使用 pipeline 批量操作确保在1秒内完成
"""
import time
@@ -202,12 +202,12 @@ class RedisSessionStore:
# 获取五个字段的值
sessionid = data.get('sessionid', '')
user_id = data.get('id', '')
group_id = data.get('group_id', '')
end_user_id = data.get('end_user_id', '')
messages = data.get('messages', '')
aimessages = data.get('aimessages', '')
# 用五元组作为唯一标识
identifier = (sessionid, user_id, group_id, messages, aimessages)
identifier = (sessionid, user_id, end_user_id, messages, aimessages)
if identifier in seen:
# 重复,标记为待删除
@@ -248,9 +248,9 @@ class RedisSessionStore:
result_items = []
return (result_items)
def find_user_apply_group(self, sessionid, apply_id, group_id):
def find_user_apply_group(self, sessionid, apply_id, end_user_id):
"""
根据 sessionid、apply_id 和 group_id 三个条件查询会话数据返回最新的6条
根据 sessionid、apply_id 和 end_user_id 三个条件查询会话数据返回最新的6条
"""
import time
start_time = time.time()
@@ -276,7 +276,7 @@ class RedisSessionStore:
# 检查是否符合三个条件
if (data.get('apply_id') == apply_id and
data.get('group_id') == group_id):
data.get('end_user_id') == end_user_id):
# 支持模糊匹配 sessionid 或者完全匹配
if sessionid in data.get('sessionid', '') or data.get('sessionid') == sessionid:
matched_items.append({

View File

@@ -59,7 +59,7 @@ class SessionService:
self,
user_id: str,
apply_id: str,
group_id: str
end_user_id: str
) -> List[dict]:
"""
Retrieve conversation history from Redis.
@@ -67,20 +67,20 @@ class SessionService:
Args:
user_id: User identifier
apply_id: Application identifier
group_id: Group identifier
end_user_id: Group identifier
Returns:
List of conversation history items with Query and Answer keys
Returns empty list if no history found or on error
"""
try:
history = self.store.find_user_apply_group(user_id, apply_id, group_id)
history = self.store.find_user_apply_group(user_id, apply_id, end_user_id)
# Validate history structure
if not isinstance(history, list):
logger.warning(
f"Invalid history format for user {user_id}, "
f"apply {apply_id}, group {group_id}: expected list, got {type(history)}"
f"apply {apply_id}, group {end_user_id}: expected list, got {type(history)}"
)
return []
@@ -89,7 +89,7 @@ class SessionService:
except Exception as e:
logger.error(
f"Failed to retrieve history for user {user_id}, "
f"apply {apply_id}, group {group_id}: {e}",
f"apply {apply_id}, group {end_user_id}: {e}",
exc_info=True
)
# Return empty list on error to allow execution to continue
@@ -100,7 +100,7 @@ class SessionService:
user_id: str,
query: str,
apply_id: str,
group_id: str,
end_user_id: str,
ai_response: str
) -> Optional[str]:
"""
@@ -110,7 +110,7 @@ class SessionService:
user_id: User identifier
query: User query/message
apply_id: Application identifier
group_id: Group identifier
end_user_id: Group identifier
ai_response: AI response/answer
Returns:
@@ -131,7 +131,7 @@ class SessionService:
userid=user_id,
messages=query,
apply_id=apply_id,
group_id=group_id,
end_user_id=end_user_id,
aimessages=ai_response
)
@@ -152,7 +152,7 @@ class SessionService:
Duplicates are identified by matching:
- sessionid
- user_id (id field)
- group_id
- end_user_id
- messages
- aimessages

View File

@@ -29,9 +29,7 @@ logger = get_agent_logger(__name__)
async def write(
user_id: str,
apply_id: str,
group_id: str,
end_user_id: str,
memory_config: MemoryConfig,
messages: list,
ref_id: str = "wyl20251027",
@@ -40,9 +38,7 @@ async def write(
Execute the complete knowledge extraction pipeline.
Args:
user_id: User identifier
apply_id: Application identifier
group_id: Group identifier
end_user_id: End user identifier
memory_config: MemoryConfig object containing all configuration
messages: Structured message list [{"role": "user", "content": "..."}, ...]
ref_id: Reference ID, defaults to "wyl20251027"
@@ -58,7 +54,7 @@ async def write(
logger.info(f"LLM model: {memory_config.llm_model_name}")
logger.info(f"Embedding model: {memory_config.embedding_model_name}")
logger.info(f"Chunker strategy: {chunker_strategy}")
logger.info(f"Group ID: {group_id}")
logger.info(f"End User ID: {end_user_id}")
# Construct clients from memory_config using factory pattern with db session
with get_db_context() as db:
@@ -83,9 +79,7 @@ async def write(
step_start = time.time()
chunked_dialogs = await get_chunked_dialogs(
chunker_strategy=chunker_strategy,
group_id=group_id,
user_id=user_id,
apply_id=apply_id,
end_user_id=end_user_id,
messages=messages,
ref_id=ref_id,
config_id=config_id,

View File

@@ -16,13 +16,13 @@ class FilteredTags(BaseModel):
"""用于接收LLM筛选后的核心标签列表的模型。"""
meaningful_tags: List[str] = Field(..., description="从原始列表中筛选出的具有核心代表意义的名词列表。")
async def filter_tags_with_llm(tags: List[str], group_id: str) -> List[str]:
async def filter_tags_with_llm(tags: List[str], end_user_id: str) -> List[str]:
"""
使用LLM筛选标签列表仅保留具有代表性的核心名词。
Args:
tags: 原始标签列表
group_id: 用户组ID用于获取配置
end_user_id: 用户组ID用于获取配置
Returns:
筛选后的标签列表
@@ -37,12 +37,12 @@ async def filter_tags_with_llm(tags: List[str], group_id: str) -> List[str]:
get_end_user_connected_config,
)
connected_config = get_end_user_connected_config(group_id, db)
connected_config = get_end_user_connected_config(end_user_id, db)
config_id = connected_config.get("memory_config_id")
if not config_id:
raise ValueError(
f"No memory_config_id found for group_id: {group_id}. "
f"No memory_config_id found for end_user_id: {end_user_id}. "
"Please ensure the user has a valid memory configuration."
)
@@ -87,7 +87,7 @@ async def filter_tags_with_llm(tags: List[str], group_id: str) -> List[str]:
async def get_raw_tags_from_db(
connector: Neo4jConnector,
group_id: str,
end_user_id: str,
limit: int,
by_user: bool = False
) -> List[Tuple[str, int]]:
@@ -99,9 +99,9 @@ async def get_raw_tags_from_db(
Args:
connector: Neo4j连接器实例
group_id: 如果by_user=False则为group_id如果by_user=True则为user_id
end_user_id: 如果by_user=False则为end_user_id如果by_user=True则为user_id
limit: 返回的标签数量限制
by_user: 是否按user_id查询默认Falsegroup_id查询
by_user: 是否按user_id查询默认Falseend_user_id查询
Returns:
List[Tuple[str, int]]: 标签名称和频率的元组列表
@@ -119,7 +119,7 @@ async def get_raw_tags_from_db(
else:
query = (
"MATCH (e:ExtractedEntity) "
"WHERE e.group_id = $id AND e.entity_type <> '人物' AND e.name IS NOT NULL AND NOT e.name IN $names_to_exclude "
"WHERE e.end_user_id = $id AND e.entity_type <> '人物' AND e.name IS NOT NULL AND NOT e.name IN $names_to_exclude "
"RETURN e.name AS name, count(e) AS frequency "
"ORDER BY frequency DESC "
"LIMIT $limit"
@@ -128,44 +128,44 @@ async def get_raw_tags_from_db(
# 使用项目的Neo4jConnector执行查询
results = await connector.execute_query(
query,
id=group_id,
id=end_user_id,
limit=limit,
names_to_exclude=names_to_exclude
)
return [(record["name"], record["frequency"]) for record in results]
async def get_hot_memory_tags(group_id: str, limit: int = 40, by_user: bool = False) -> List[Tuple[str, int]]:
async def get_hot_memory_tags(end_user_id: str, limit: int = 40, by_user: bool = False) -> List[Tuple[str, int]]:
"""
获取原始标签然后使用LLM进行筛选返回最终的热门标签列表。
查询更多的标签(limit=40)给LLM提供更丰富的上下文进行筛选。
Args:
group_id: 必需参数。如果by_user=False则为group_id如果by_user=True则为user_id
end_user_id: 必需参数。如果by_user=False则为end_user_id如果by_user=True则为user_id
limit: 返回的标签数量限制
by_user: 是否按user_id查询默认Falsegroup_id查询
by_user: 是否按user_id查询默认Falseend_user_id查询
Raises:
ValueError: 如果group_id未提供或为空
ValueError: 如果end_user_id未提供或为空
"""
# 验证group_id必须提供且不为空
if not group_id or not group_id.strip():
# 验证end_user_id必须提供且不为空
if not end_user_id or not end_user_id.strip():
raise ValueError(
"group_id is required. Please provide a valid group_id or user_id."
"end_user_id is required. Please provide a valid end_user_id or user_id."
)
# 使用项目的Neo4jConnector
connector = Neo4jConnector()
try:
# 1. 从数据库获取原始排名靠前的标签
raw_tags_with_freq = await get_raw_tags_from_db(connector, group_id, limit, by_user=by_user)
raw_tags_with_freq = await get_raw_tags_from_db(connector, end_user_id, limit, by_user=by_user)
if not raw_tags_with_freq:
return []
raw_tag_names = [tag for tag, freq in raw_tags_with_freq]
# 2. 初始化LLM客户端并使用LLM筛选出有意义的标签
meaningful_tag_names = await filter_tags_with_llm(raw_tag_names, group_id)
meaningful_tag_names = await filter_tags_with_llm(raw_tag_names, end_user_id)
# 3. 根据LLM的筛选结果构建最终的标签列表保留原始频率和顺序
final_tags = []

View File

@@ -75,8 +75,8 @@ class MemoryDataSource:
start_date = time_range.start_date if time_range else None
end_date = time_range.end_date if time_range else None
summary_dicts = await self.memory_summary_repo.find_by_group_id(
group_id=user_id,
summary_dicts = await self.memory_summary_repo.find_by_end_user_id(
end_user_id=user_id,
limit=limit,
start_date=start_date,
end_date=end_date

View File

@@ -41,7 +41,7 @@ DIALOGUE_EMBEDDING_SEARCH = """
WITH $embedding AS q
MATCH (d:Dialogue)
WHERE d.dialog_embedding IS NOT NULL
AND ($group_id IS NULL OR d.group_id = $group_id)
AND ($end_user_id IS NULL OR d.end_user_id = $end_user_id)
WITH d, q, d.dialog_embedding AS v
WITH d,
reduce(dot = 0.0, i IN range(0, size(q)-1) | dot + toFloat(q[i]) * toFloat(v[i])) AS dot,
@@ -50,7 +50,7 @@ WITH d,
WITH d, CASE WHEN qnorm = 0 OR vnorm = 0 THEN 0.0 ELSE dot / (qnorm * vnorm) END AS score
WHERE score > $threshold
RETURN d.id AS dialog_id,
d.group_id AS group_id,
d.end_user_id AS end_user_id,
d.content AS content,
d.created_at AS created_at,
d.expired_at AS expired_at,

View File

@@ -36,7 +36,7 @@ from app.repositories.neo4j.neo4j_connector import Neo4jConnector
async def ingest_contexts_via_full_pipeline(
contexts: List[str],
group_id: str,
end_user_id: str,
chunker_strategy: str | None = None,
embedding_name: str | None = None,
save_chunk_output: bool = False,
@@ -48,7 +48,7 @@ async def ingest_contexts_via_full_pipeline(
This function mirrors the steps in main(), but starts from raw text contexts.
Args:
contexts: List of dialogue texts, each containing lines like "role: message".
group_id: Group ID to assign to generated DialogData and graph nodes.
end_user_id: Group ID to assign to generated DialogData and graph nodes.
chunker_strategy: Optional chunker strategy; defaults to SELECTED_CHUNKER_STRATEGY.
embedding_name: Optional embedding model ID; defaults to SELECTED_EMBEDDING_ID.
save_chunk_output: If True, write chunked DialogData list to a JSON file for debugging.
@@ -109,7 +109,7 @@ async def ingest_contexts_via_full_pipeline(
dialog = DialogData(
context=context_model,
ref_id=f"pipeline_item_{idx}",
group_id=group_id,
end_user_id=end_user_id,
user_id="default_user",
apply_id="default_application",
)
@@ -318,16 +318,16 @@ async def handle_context_processing(args):
print("No contexts provided for processing.")
return False
return await main_from_contexts(contexts, args.context_group_id)
return await main_from_contexts(contexts, args.context_end_user_id)
async def main_from_contexts(contexts: List[str], group_id: str):
async def main_from_contexts(contexts: List[str], end_user_id: str):
"""Run the pipeline from provided dialogue contexts instead of test data."""
print("=== Running pipeline from provided contexts ===")
success = await ingest_contexts_via_full_pipeline(
contexts=contexts,
group_id=group_id,
end_user_id=end_user_id,
chunker_strategy=SELECTED_CHUNKER_STRATEGY,
embedding_name=SELECTED_EMBEDDING_ID,
save_chunk_output=True

View File

@@ -47,7 +47,7 @@ from app.core.memory.llm_tools.openai_embedder import OpenAIEmbedderClient
from app.core.memory.utils.definitions import (
PROJECT_ROOT,
SELECTED_EMBEDDING_ID,
SELECTED_GROUP_ID,
SELECTED_end_user_id,
SELECTED_LLM_ID,
)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -59,7 +59,7 @@ from app.services.memory_config_service import MemoryConfigService
async def run_locomo_benchmark(
sample_size: int = 20,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
search_type: str = "hybrid",
search_limit: int = 12,
context_char_budget: int = 8000,
@@ -85,7 +85,7 @@ async def run_locomo_benchmark(
Args:
sample_size: Number of QA pairs to evaluate (from first conversation)
group_id: Database group ID for retrieval (uses default if None)
end_user_id: Database group ID for retrieval (uses default if None)
search_type: "keyword", "embedding", or "hybrid"
search_limit: Max documents to retrieve per query
context_char_budget: Max characters for context
@@ -96,8 +96,8 @@ async def run_locomo_benchmark(
Returns:
Dictionary with evaluation results including metrics, timing, and samples
"""
# Use default group_id if not provided
group_id = group_id or SELECTED_GROUP_ID
# Use default end_user_id if not provided
end_user_id = end_user_id or SELECTED_end_user_id
# Determine data path
data_path = os.path.join(PROJECT_ROOT, "data", "locomo10.json")
@@ -110,7 +110,7 @@ async def run_locomo_benchmark(
print(f"{'='*60}")
print("📊 Configuration:")
print(f" Sample size: {sample_size}")
print(f" Group ID: {group_id}")
print(f" Group ID: {end_user_id}")
print(f" Search type: {search_type}")
print(f" Search limit: {search_limit}")
print(f" Context budget: {context_char_budget} chars")
@@ -134,7 +134,7 @@ async def run_locomo_benchmark(
# Step 2: Extract conversations and ingest if needed
if skip_ingest:
print("⏭️ Skipping data ingestion (using existing data in Neo4j)")
print(f" Group ID: {group_id}\n")
print(f" Group ID: {end_user_id}\n")
else:
print("💾 Checking database ingestion...")
try:
@@ -142,10 +142,10 @@ async def run_locomo_benchmark(
print(f"📝 Extracted {len(conversations)} conversations")
# Always ingest for now (ingestion check not implemented)
print(f"🔄 Ingesting conversations into group '{group_id}'...")
print(f"🔄 Ingesting conversations into group '{end_user_id}'...")
success = await ingest_conversations_if_needed(
conversations=conversations,
group_id=group_id,
end_user_id=end_user_id,
reset=reset_group
)
@@ -224,7 +224,7 @@ async def run_locomo_benchmark(
try:
retrieved_info = await retrieve_relevant_information(
question=question,
group_id=group_id,
end_user_id=end_user_id,
search_type=search_type,
search_limit=search_limit,
connector=connector,
@@ -409,7 +409,7 @@ async def run_locomo_benchmark(
"sample_size": len(qa_items),
"timestamp": datetime.now().isoformat(),
"params": {
"group_id": group_id,
"end_user_id": end_user_id,
"search_type": search_type,
"search_limit": search_limit,
"context_char_budget": context_char_budget,
@@ -467,7 +467,7 @@ def main():
help="Number of QA pairs to evaluate"
)
parser.add_argument(
"--group_id",
"--end_user_id",
type=str,
default=None,
help="Database group ID for retrieval (uses default if not specified)"
@@ -516,7 +516,7 @@ def main():
# Run benchmark
result = asyncio.run(run_locomo_benchmark(
sample_size=args.sample_size,
group_id=args.group_id,
end_user_id=args.end_user_id,
search_type=args.search_type,
search_limit=args.search_limit,
context_char_budget=args.context_char_budget,

View File

@@ -555,7 +555,7 @@ async def run_enhanced_evaluation():
search_results = await run_hybrid_search(
query_text=q,
search_type="hybrid",
group_id="locomo_sk",
end_user_id="locomo_sk",
limit=20,
include=["statements", "chunks", "entities", "summaries"],
alpha=0.6, # BM25权重

View File

@@ -348,7 +348,7 @@ def select_and_format_information(
async def retrieve_relevant_information(
question: str,
group_id: str,
end_user_id: str,
search_type: str,
search_limit: int,
connector: Any,
@@ -368,7 +368,7 @@ async def retrieve_relevant_information(
Args:
question: Question to search for
group_id: Database group ID (identifies which conversation memory to search)
end_user_id: Database group ID (identifies which conversation memory to search)
search_type: "keyword", "embedding", or "hybrid"
search_limit: Max memory pieces to retrieve
connector: Neo4j connector instance
@@ -396,7 +396,7 @@ async def retrieve_relevant_information(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"],
)
@@ -455,7 +455,7 @@ async def retrieve_relevant_information(
search_results = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit
)
@@ -491,7 +491,7 @@ async def retrieve_relevant_information(
search_results = await run_hybrid_search(
query_text=question,
search_type=search_type,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"],
output_path=None,
@@ -524,7 +524,7 @@ async def retrieve_relevant_information(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"],
)
@@ -584,7 +584,7 @@ async def retrieve_relevant_information(
async def ingest_conversations_if_needed(
conversations: List[str],
group_id: str,
end_user_id: str,
reset: bool = False
) -> bool:
"""
@@ -603,7 +603,7 @@ async def ingest_conversations_if_needed(
Args:
conversations: List of raw conversation texts from LoCoMo dataset
Example: ["User: I went to Paris. AI: When was that?", ...]
group_id: Target group ID for database storage
end_user_id: Target group ID for database storage
reset: Whether to clear existing data first (not implemented in wrapper)
Returns:
@@ -617,7 +617,7 @@ async def ingest_conversations_if_needed(
try:
success = await ingest_contexts_via_full_pipeline(
contexts=conversations,
group_id=group_id,
end_user_id=end_user_id,
save_chunk_output=True
)
return success

View File

@@ -30,7 +30,7 @@ from app.core.memory.storage_services.search import run_hybrid_search
from app.core.memory.utils.config.definitions import (
PROJECT_ROOT,
SELECTED_EMBEDDING_ID,
SELECTED_GROUP_ID,
SELECTED_end_user_id,
SELECTED_LLM_ID,
)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -249,7 +249,7 @@ def get_search_params_by_category(category: str):
async def run_locomo_eval(
sample_size: int = 1,
group_id: str | None = None,
end_user_id: str | None = None,
search_limit: int = 8,
context_char_budget: int = 4000, # 保持默认值不变
llm_temperature: float = 0.0,
@@ -262,7 +262,7 @@ async def run_locomo_eval(
) -> Dict[str, Any]:
# 函数内部使用三路检索逻辑,但保持参数签名不变
group_id = group_id or SELECTED_GROUP_ID
end_user_id = end_user_id or SELECTED_end_user_id
data_path = os.path.join(PROJECT_ROOT, "data", "locomo10.json")
if not os.path.exists(data_path):
data_path = os.path.join(os.getcwd(), "data", "locomo10.json")
@@ -340,7 +340,7 @@ async def run_locomo_eval(
# 关键修复:强制重新摄入纯净的对话数据
print("🔄 强制重新摄入纯净的对话数据...")
await ingest_contexts_via_full_pipeline(contents, group_id, save_chunk_output=True)
await ingest_contexts_via_full_pipeline(contents, end_user_id, save_chunk_output=True)
# 使用异步LLM客户端
with get_db_context() as db:
@@ -405,7 +405,7 @@ async def run_locomo_eval(
connector=connector,
embedder_client=embedder,
query_text=q,
group_id=group_id,
end_user_id=end_user_id,
limit=adjusted_limit,
include=["chunks", "statements", "entities", "summaries"], # 修复:使用正确的类型
)
@@ -456,7 +456,7 @@ async def run_locomo_eval(
search_results = await search_graph(
connector=connector,
q=q,
group_id=group_id,
end_user_id=end_user_id,
limit=adjusted_limit
)
dialogs = search_results.get("dialogues", [])
@@ -486,7 +486,7 @@ async def run_locomo_eval(
search_results = await run_hybrid_search(
query_text=q,
search_type=search_type,
group_id=group_id,
end_user_id=end_user_id,
limit=adjusted_limit,
include=["chunks", "statements", "entities", "summaries"],
output_path=None,
@@ -524,7 +524,7 @@ async def run_locomo_eval(
connector=connector,
embedder_client=embedder,
query_text=q,
group_id=group_id,
end_user_id=end_user_id,
limit=adjusted_limit,
include=["chunks", "statements", "entities", "summaries"],
)
@@ -597,7 +597,7 @@ async def run_locomo_eval(
"dialogues": [
{
"uuid": d.get("uuid", ""),
"group_id": d.get("group_id", ""),
"end_user_id": d.get("end_user_id", ""),
"content": d.get("content", "")[:200] + "..." if len(d.get("content", "")) > 200 else d.get("content", ""),
"score": d.get("score", 0.0)
}
@@ -795,7 +795,7 @@ async def run_locomo_eval(
},
"samples": samples,
"params": {
"group_id": group_id,
"end_user_id": end_user_id,
"search_limit": search_limit,
"context_char_budget": context_char_budget,
"search_type": search_type,
@@ -825,7 +825,7 @@ async def run_locomo_eval(
def main():
parser = argparse.ArgumentParser(description="Run LoCoMo evaluation with Qwen search")
parser.add_argument("--sample_size", type=int, default=1, help="Number of samples to evaluate")
parser.add_argument("--group_id", type=str, default=None, help="Group ID for retrieval")
parser.add_argument("--end_user_id", type=str, default=None, help="Group ID for retrieval")
parser.add_argument("--search_limit", type=int, default=8, help="Search limit per query")
parser.add_argument("--context_char_budget", type=int, default=12000, help="Max characters for context")
parser.add_argument("--llm_temperature", type=float, default=0.0, help="LLM temperature")
@@ -841,7 +841,7 @@ def main():
result = asyncio.run(run_locomo_eval(
sample_size=args.sample_size,
group_id=args.group_id,
end_user_id=args.end_user_id,
search_limit=args.search_limit,
context_char_budget=args.context_char_budget,
llm_temperature=args.llm_temperature,

View File

@@ -523,11 +523,11 @@ def generate_query_keywords_cn(question: str) -> List[str]:
# 通过别名匹配进行实体关键词检索多token合并
async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[str], group_id: str | None, limit: int) -> List[Dict[str, Any]]:
async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[str], end_user_id: str | None, limit: int) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
try:
for tok in tokens:
rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q=tok, group_id=group_id, limit=limit)
rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q=tok, end_user_id=end_user_id, limit=limit)
if rows:
results.extend(rows)
except Exception:
@@ -547,15 +547,15 @@ async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[st
# 通过对话/陈述中的entity_ids反查实体名称
_FETCH_ENTITIES_BY_IDS = """
MATCH (e:ExtractedEntity)
WHERE e.id IN $ids AND ($group_id IS NULL OR e.group_id = $group_id)
RETURN e.id AS id, e.name AS name, e.group_id AS group_id, e.entity_type AS entity_type
WHERE e.id IN $ids AND ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type
"""
async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], group_id: str | None) -> List[Dict[str, Any]]:
async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], end_user_id: str | None) -> List[Dict[str, Any]]:
if not ids:
return []
try:
rows = await connector.execute_query(_FETCH_ENTITIES_BY_IDS, ids=list({i for i in ids if i}), group_id=group_id)
rows = await connector.execute_query(_FETCH_ENTITIES_BY_IDS, ids=list({i for i in ids if i}), end_user_id=end_user_id)
return rows or []
except Exception:
return []
@@ -565,18 +565,18 @@ async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], grou
_TIME_ENTITY_SEARCH = """
MATCH (e:ExtractedEntity)
WHERE e.entity_type CONTAINS "TIME" OR e.entity_type CONTAINS "DATE" OR e.name =~ $date_pattern
AND ($group_id IS NULL OR e.group_id = $group_id)
RETURN e.id AS id, e.name AS name, e.group_id AS group_id, e.entity_type AS entity_type
AND ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type
LIMIT $limit
"""
async def _search_time_entities(connector: Neo4jConnector, group_id: str | None, limit: int = 5) -> List[Dict[str, Any]]:
async def _search_time_entities(connector: Neo4jConnector, end_user_id: str | None, limit: int = 5) -> List[Dict[str, Any]]:
"""专门搜索时间相关的实体"""
try:
date_pattern = r".*\d{4}.*|.*\d{1,2}月\d{1,2}日.*"
rows = await connector.execute_query(_TIME_ENTITY_SEARCH,
date_pattern=date_pattern,
group_id=group_id,
end_user_id=end_user_id,
limit=limit)
return rows or []
except Exception:
@@ -623,7 +623,7 @@ def _resolve_relative_times_cn_en(text: str, anchor: datetime) -> str:
async def run_longmemeval_test(
sample_size: int = 3,
group_id: str = "longmemeval_zh_bak_3",
end_user_id: str = "longmemeval_zh_bak_3",
search_limit: int = 8,
context_char_budget: int = 4000,
llm_temperature: float = 0.0,
@@ -677,13 +677,13 @@ async def run_longmemeval_test(
contexts.extend(selected)
print(f"📥 摄入 {len(contexts)} 个上下文到数据库")
if reset_group_before_ingest and group_id:
if reset_group_before_ingest and end_user_id:
try:
_tmp_conn = Neo4jConnector()
await _tmp_conn.delete_group(group_id)
print(f"🧹 已清空组 {group_id} 的历史图数据")
await _tmp_conn.delete_group(end_user_id)
print(f"🧹 已清空组 {end_user_id} 的历史图数据")
except Exception as _e:
print(f"⚠️ 清空组数据失败(忽略继续): {group_id} - {_e}")
print(f"⚠️ 清空组数据失败(忽略继续): {end_user_id} - {_e}")
finally:
try:
await _tmp_conn.close()
@@ -695,7 +695,7 @@ async def run_longmemeval_test(
else:
await _ingest_fn(
contexts,
group_id,
end_user_id,
save_chunk_output=save_chunk_output,
save_chunk_output_path=save_chunk_output_path,
)
@@ -750,7 +750,7 @@ async def run_longmemeval_test(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"],
)
@@ -795,7 +795,7 @@ async def run_longmemeval_test(
search_results = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
)
chunks = search_results.get("chunks", [])
@@ -830,7 +830,7 @@ async def run_longmemeval_test(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"],
)
@@ -848,7 +848,7 @@ async def run_longmemeval_test(
kw_res = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
)
if isinstance(kw_res, dict):
@@ -859,7 +859,7 @@ async def run_longmemeval_test(
# 时间推理问题的特殊处理
if is_temporal:
# 专门搜索时间实体
time_entities = await _search_time_entities(connector, group_id, search_limit//2)
time_entities = await _search_time_entities(connector, end_user_id, search_limit//2)
if time_entities:
kw_entities.extend(time_entities)
# 添加时间相关关键词检索
@@ -869,7 +869,7 @@ async def run_longmemeval_test(
time_res = await search_graph(
connector=connector,
q=tk,
group_id=group_id,
end_user_id=end_user_id,
limit=2,
)
if isinstance(time_res, dict):
@@ -880,7 +880,7 @@ async def run_longmemeval_test(
# 中文关键词拆分后做别名匹配
cn_tokens = _extract_cn_tokens(question)
alias_entities = await _search_entities_by_aliases(connector, cn_tokens, group_id, search_limit)
alias_entities = await _search_entities_by_aliases(connector, cn_tokens, end_user_id, search_limit)
if alias_entities:
kw_entities.extend(alias_entities)
@@ -894,7 +894,7 @@ async def run_longmemeval_test(
except Exception:
pass
if ids:
id_entities = await _fetch_entities_by_ids(connector, ids, group_id)
id_entities = await _fetch_entities_by_ids(connector, ids, end_user_id)
if id_entities:
kw_entities.extend(id_entities)
@@ -908,7 +908,7 @@ async def run_longmemeval_test(
sub_res = await search_graph(
connector=connector,
q=str(kw),
group_id=group_id,
end_user_id=end_user_id,
limit=max(3, search_limit // 2),
)
if isinstance(sub_res, dict):
@@ -927,7 +927,7 @@ async def run_longmemeval_test(
opt_res = await search_graph(
connector=connector,
q=str(opt),
group_id=group_id,
end_user_id=end_user_id,
limit=max(3, search_limit // 2),
)
if isinstance(opt_res, dict):

View File

@@ -498,11 +498,11 @@ def smart_context_selection(contexts: List[str], question: str, max_chars: int =
# 通过别名匹配进行实体关键词检索多token合并
async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[str], group_id: str | None, limit: int) -> List[Dict[str, Any]]:
async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[str], end_user_id: str | None, limit: int) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
try:
for tok in tokens:
rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q=tok, group_id=group_id, limit=limit)
rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q=tok, end_user_id=end_user_id, limit=limit)
if rows:
results.extend(rows)
except Exception:
@@ -522,15 +522,15 @@ async def _search_entities_by_aliases(connector: Neo4jConnector, tokens: List[st
# 通过对话/陈述中的entity_ids反查实体名称
_FETCH_ENTITIES_BY_IDS = """
MATCH (e:ExtractedEntity)
WHERE e.id IN $ids AND ($group_id IS NULL OR e.group_id = $group_id)
RETURN e.id AS id, e.name AS name, e.group_id AS group_id, e.entity_type AS entity_type
WHERE e.id IN $ids AND ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type
"""
async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], group_id: str | None) -> List[Dict[str, Any]]:
async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], end_user_id: str | None) -> List[Dict[str, Any]]:
if not ids:
return []
try:
rows = await connector.execute_query(_FETCH_ENTITIES_BY_IDS, ids=list({i for i in ids if i}), group_id=group_id)
rows = await connector.execute_query(_FETCH_ENTITIES_BY_IDS, ids=list({i for i in ids if i}), end_user_id=end_user_id)
return rows or []
except Exception:
return []
@@ -540,18 +540,18 @@ async def _fetch_entities_by_ids(connector: Neo4jConnector, ids: List[str], grou
_TIME_ENTITY_SEARCH = """
MATCH (e:ExtractedEntity)
WHERE e.entity_type CONTAINS "TIME" OR e.entity_type CONTAINS "DATE" OR e.name =~ $date_pattern
AND ($group_id IS NULL OR e.group_id = $group_id)
RETURN e.id AS id, e.name AS name, e.group_id AS group_id, e.entity_type AS entity_type
AND ($end_user_id IS NULL OR e.end_user_id = $end_user_id)
RETURN e.id AS id, e.name AS name, e.end_user_id AS end_user_id, e.entity_type AS entity_type
LIMIT $limit
"""
async def _search_time_entities(connector: Neo4jConnector, group_id: str | None, limit: int = 5) -> List[Dict[str, Any]]:
async def _search_time_entities(connector: Neo4jConnector, end_user_id: str | None, limit: int = 5) -> List[Dict[str, Any]]:
"""专门搜索时间相关的实体"""
try:
date_pattern = r".*\d{4}.*|.*\d{1,2}月\d{1,2}日.*"
rows = await connector.execute_query(_TIME_ENTITY_SEARCH,
date_pattern=date_pattern,
group_id=group_id,
end_user_id=end_user_id,
limit=limit)
return rows or []
except Exception:
@@ -559,25 +559,25 @@ async def _search_time_entities(connector: Neo4jConnector, group_id: str | None,
# 技术术语专门检索
async def _search_tech_terms(connector: Neo4jConnector, question: str, group_id: str | None, limit: int = 3) -> List[Dict[str, Any]]:
async def _search_tech_terms(connector: Neo4jConnector, question: str, end_user_id: str | None, limit: int = 3) -> List[Dict[str, Any]]:
"""专门搜索技术术语相关的实体"""
tech_entities = []
try:
# GPS相关
if any(term in question for term in ["GPS", "导航", "定位系统"]):
gps_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="GPS", group_id=group_id, limit=limit)
gps_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="GPS", end_user_id=end_user_id, limit=limit)
if gps_rows:
tech_entities.extend(gps_rows)
# 活动相关
if any(term in question for term in ["工作坊", "研讨会", "网络研讨会"]):
workshop_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="工作坊", group_id=group_id, limit=limit)
workshop_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="工作坊", end_user_id=end_user_id, limit=limit)
if workshop_rows:
tech_entities.extend(workshop_rows)
# 时间顺序相关
if any(term in question for term in ["", "", "第一个"]):
time_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="第一次", group_id=group_id, limit=limit)
time_rows = await connector.execute_query(SEARCH_ENTITIES_BY_NAME, q="第一次", end_user_id=end_user_id, limit=limit)
if time_rows:
tech_entities.extend(time_rows)
@@ -627,7 +627,7 @@ def _resolve_relative_times_cn_en(text: str, anchor: datetime) -> str:
async def run_longmemeval_test(
sample_size: int = 3,
group_id: str = "longmemeval_zh_bak_2",
end_user_id: str = "longmemeval_zh_bak_2",
search_limit: int = 8,
context_char_budget: int = 4000,
llm_temperature: float = 0.0,
@@ -707,7 +707,7 @@ async def run_longmemeval_test(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["dialogues", "statements", "entities"],
)
@@ -746,7 +746,7 @@ async def run_longmemeval_test(
search_results = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
)
dialogs = search_results.get("dialogues", [])
@@ -776,7 +776,7 @@ async def run_longmemeval_test(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["dialogues", "statements", "entities"],
)
@@ -792,7 +792,7 @@ async def run_longmemeval_test(
kw_res = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
)
if isinstance(kw_res, dict):
@@ -801,14 +801,14 @@ async def run_longmemeval_test(
kw_entities = kw_res.get("entities", []) or []
# 技术术语专门检索
tech_entities = await _search_tech_terms(connector, question, group_id, search_limit//2)
tech_entities = await _search_tech_terms(connector, question, end_user_id, search_limit//2)
if tech_entities:
kw_entities.extend(tech_entities)
# 时间推理问题的特殊处理
if is_temporal:
# 专门搜索时间实体
time_entities = await _search_time_entities(connector, group_id, search_limit//2)
time_entities = await _search_time_entities(connector, end_user_id, search_limit//2)
if time_entities:
kw_entities.extend(time_entities)
# 添加时间相关关键词检索
@@ -818,7 +818,7 @@ async def run_longmemeval_test(
time_res = await search_graph(
connector=connector,
q=tk,
group_id=group_id,
end_user_id=end_user_id,
limit=2,
)
if isinstance(time_res, dict):
@@ -829,7 +829,7 @@ async def run_longmemeval_test(
# 中文关键词拆分后做别名匹配
cn_tokens = generate_query_keywords_cn(question) # 使用增强版关键词提取
alias_entities = await _search_entities_by_aliases(connector, cn_tokens, group_id, search_limit)
alias_entities = await _search_entities_by_aliases(connector, cn_tokens, end_user_id, search_limit)
if alias_entities:
kw_entities.extend(alias_entities)
@@ -843,7 +843,7 @@ async def run_longmemeval_test(
except Exception:
pass
if ids:
id_entities = await _fetch_entities_by_ids(connector, ids, group_id)
id_entities = await _fetch_entities_by_ids(connector, ids, end_user_id)
if id_entities:
kw_entities.extend(id_entities)
@@ -857,7 +857,7 @@ async def run_longmemeval_test(
sub_res = await search_graph(
connector=connector,
q=str(kw),
group_id=group_id,
end_user_id=end_user_id,
limit=max(3, search_limit // 2),
)
if isinstance(sub_res, dict):
@@ -876,7 +876,7 @@ async def run_longmemeval_test(
opt_res = await search_graph(
connector=connector,
q=str(opt),
group_id=group_id,
end_user_id=group_id,
limit=max(3, search_limit // 2),
)
if isinstance(opt_res, dict):

View File

@@ -27,7 +27,7 @@ from app.core.memory.storage_services.search import run_hybrid_search
from app.core.memory.utils.config.definitions import (
PROJECT_ROOT,
SELECTED_EMBEDDING_ID,
SELECTED_GROUP_ID,
SELECTED_end_user_id,
SELECTED_LLM_ID,
)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -135,8 +135,8 @@ def _combine_dialogues_for_hybrid(results: Dict[str, Any]) -> List[Dict[str, Any
return merged
async def run_memsciqa_eval(sample_size: int = 1, group_id: str | None = None, search_limit: int = 8, context_char_budget: int = 4000, llm_temperature: float = 0.0, llm_max_tokens: int = 64, search_type: str = "hybrid", memory_config: "MemoryConfig" = None) -> Dict[str, Any]:
group_id = group_id or SELECTED_GROUP_ID
async def run_memsciqa_eval(sample_size: int = 1, end_user_id: str | None = None, search_limit: int = 8, context_char_budget: int = 4000, llm_temperature: float = 0.0, llm_max_tokens: int = 64, search_type: str = "hybrid", memory_config: "MemoryConfig" = None) -> Dict[str, Any]:
end_user_id = end_user_id or SELECTED_end_user_id
# Load data
data_path = os.path.join(PROJECT_ROOT, "data", "msc_self_instruct.jsonl")
if not os.path.exists(data_path):
@@ -147,7 +147,7 @@ async def run_memsciqa_eval(sample_size: int = 1, group_id: str | None = None, s
# 改为:每条样本仅摄入一个上下文(完整对话转录),避免多上下文摄入
# 说明memsciqa 数据集的每个样本天然只有一个对话,保持按样本一上下文的策略
contexts: List[str] = [build_context_from_dialog(item) for item in items]
await ingest_contexts_via_full_pipeline(contexts, group_id)
await ingest_contexts_via_full_pipeline(contexts, end_user_id)
# LLM client (使用异步调用)
with get_db_context() as db:
@@ -173,7 +173,7 @@ async def run_memsciqa_eval(sample_size: int = 1, group_id: str | None = None, s
results = await run_hybrid_search(
query_text=question,
search_type=search_type,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["dialogues", "statements", "entities"],
output_path=None,
@@ -298,7 +298,7 @@ def main():
load_dotenv()
parser = argparse.ArgumentParser(description="Evaluate DMR (memsciqa) with graph search and Qwen")
parser.add_argument("--sample-size", type=int, default=1, help="评测样本数量")
parser.add_argument("--group-id", type=str, default=None, help="可选 group_id默认取 runtime.json")
parser.add_argument("--group-id", type=str, default=None, help="可选 end_user_id默认取 runtime.json")
parser.add_argument("--search-limit", type=int, default=8, help="每类检索最大返回数")
parser.add_argument("--context-char-budget", type=int, default=4000, help="上下文字符预算")
parser.add_argument("--llm-temperature", type=float, default=0.0, help="LLM 温度")
@@ -309,7 +309,7 @@ def main():
result = asyncio.run(
run_memsciqa_eval(
sample_size=args.sample_size,
group_id=args.group_id,
end_user_id=args.end_user_id,
search_limit=args.search_limit,
context_char_budget=args.context_char_budget,
llm_temperature=args.llm_temperature,

View File

@@ -33,7 +33,7 @@ from app.core.memory.llm_tools.openai_embedder import OpenAIEmbedderClient
from app.core.memory.utils.config.definitions import (
PROJECT_ROOT,
SELECTED_EMBEDDING_ID,
SELECTED_GROUP_ID,
SELECTED_end_user_id,
SELECTED_LLM_ID,
)
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
@@ -198,7 +198,7 @@ def load_dataset_memsciqa(data_path: str) -> List[Dict[str, Any]]:
async def run_memsciqa_test(
sample_size: int = 3,
group_id: str | None = None,
end_user_id: str | None = None,
search_limit: int = 8,
context_char_budget: int = 4000,
llm_temperature: float = 0.0,
@@ -216,7 +216,7 @@ async def run_memsciqa_test(
"""
# 默认使用指定的 memsci 组 ID
group_id = group_id or "group_memsci"
end_user_id = end_user_id or "group_memsci"
# 数据路径解析(项目根与当前工作目录兜底)
if not data_path:
@@ -282,7 +282,7 @@ async def run_memsciqa_test(
connector=connector,
embedder_client=embedder,
query_text=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"], # 使用 chunks 而不是 dialogues
)
@@ -291,7 +291,7 @@ async def run_memsciqa_test(
results = await search_graph(
connector=connector,
q=question,
group_id=group_id,
end_user_id=end_user_id,
limit=search_limit,
include=["chunks", "statements", "entities", "summaries"], # 使用 chunks 而不是 dialogues
)
@@ -499,7 +499,7 @@ async def run_memsciqa_test(
},
"samples": samples,
"params": {
"group_id": group_id,
"end_user_id": end_user_id,
"search_limit": search_limit,
"context_char_budget": context_char_budget,
"llm_temperature": llm_temperature,
@@ -542,7 +542,7 @@ def main():
result = asyncio.run(
run_memsciqa_test(
sample_size=sample_size,
group_id=args.group_id,
end_user_id=args.end_user_id,
search_limit=args.search_limit,
context_char_budget=args.context_char_budget,
llm_temperature=args.llm_temperature,

View File

@@ -15,7 +15,7 @@ except Exception:
return None
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
from app.core.memory.utils.config.definitions import SELECTED_GROUP_ID, PROJECT_ROOT
from app.core.memory.utils.config.definitions import SELECTED_end_user_id, PROJECT_ROOT
from app.core.memory.evaluation.memsciqa.evaluate_qa import run_memsciqa_eval
from app.core.memory.evaluation.longmemeval.qwen_search_eval import run_longmemeval_test
@@ -26,7 +26,7 @@ async def run(
dataset: str,
sample_size: int,
reset_group: bool,
group_id: str | None,
end_user_id: str | None,
judge_model: str | None = None,
search_limit: int | None = None,
context_char_budget: int | None = None,
@@ -37,17 +37,17 @@ async def run(
max_contexts_per_item: int | None = None,
) -> Dict[str, Any]:
# 恢复原始风格:统一入口做路由,并沿用各数据集既有默认
group_id = group_id or SELECTED_GROUP_ID
end_user_id = end_user_id or SELECTED_end_user_id
if reset_group:
connector = Neo4jConnector()
try:
await connector.delete_group(group_id)
await connector.delete_group(end_user_id)
finally:
await connector.close()
if dataset == "locomo":
kwargs: Dict[str, Any] = {"sample_size": sample_size, "group_id": group_id}
kwargs: Dict[str, Any] = {"sample_size": sample_size, "end_user_id": end_user_id}
if search_limit is not None:
kwargs["search_limit"] = search_limit
if context_char_budget is not None:
@@ -61,7 +61,7 @@ async def run(
return await run_locomo_eval(**kwargs)
if dataset == "memsciqa":
kwargs: Dict[str, Any] = {"sample_size": sample_size, "group_id": group_id}
kwargs: Dict[str, Any] = {"sample_size": sample_size, "end_user_id": end_user_id}
if search_limit is not None:
kwargs["search_limit"] = search_limit
if context_char_budget is not None:
@@ -75,7 +75,7 @@ async def run(
return await run_memsciqa_eval(**kwargs)
if dataset == "longmemeval":
kwargs: Dict[str, Any] = {"sample_size": sample_size, "group_id": group_id}
kwargs: Dict[str, Any] = {"sample_size": sample_size, "end_user_id": end_user_id}
if search_limit is not None:
kwargs["search_limit"] = search_limit
if context_char_budget is not None:
@@ -99,8 +99,8 @@ def main():
parser = argparse.ArgumentParser(description="统一评估入口memsciqa / longmemeval / locomo")
parser.add_argument("--dataset", choices=["memsciqa", "longmemeval", "locomo"], required=True)
parser.add_argument("--sample-size", type=int, default=1, help="先用一条数据跑通")
parser.add_argument("--reset-group", action="store_true", help="运行前清空当前 group_id 的图数据")
parser.add_argument("--group-id", type=str, default=None, help="可选 group_id默认取 runtime.json")
parser.add_argument("--reset-group", action="store_true", help="运行前清空当前 end_user_id 的图数据")
parser.add_argument("--group-id", type=str, default=None, help="可选 end_user_id默认取 runtime.json")
parser.add_argument("--judge-model", type=str, default=None, help="可选longmemeval 判别式评测模型名")
parser.add_argument("--search-limit", type=int, default=None, help="检索返回的对话节点数量上限(不提供则使用各脚本默认)")
parser.add_argument("--context-char-budget", type=int, default=None, help="上下文字符预算(不提供则使用各脚本默认)")
@@ -117,7 +117,7 @@ def main():
args.dataset,
args.sample_size,
args.reset_group,
args.group_id,
args.end_user_id,
args.judge_model,
args.search_limit,
args.context_char_budget,

View File

@@ -72,7 +72,7 @@ class TemporalSearchParams(BaseModel):
"""Parameters for temporal search queries in the knowledge graph.
Attributes:
group_id: Group ID to filter search results (default: 'test')
end_user_id: Group ID to filter search results (default: 'test')
apply_id: Application ID to filter search results
user_id: User ID to filter search results
start_date: Start date for temporal filtering (format: 'YYYY-MM-DD')
@@ -81,7 +81,7 @@ class TemporalSearchParams(BaseModel):
invalid_date: Date when memory should be invalid (format: 'YYYY-MM-DD')
limit: Maximum number of results to return (default: 3)
"""
group_id: Optional[str] = Field("test", description="The group ID to filter the search.")
end_user_id: Optional[str] = Field("test", description="The group ID to filter the search.")
apply_id: Optional[str] = Field(None, description="The apply ID to filter the search.")
user_id: Optional[str] = Field(None, description="The user ID to filter the search.")
start_date: Optional[str] = Field(None, description="The start date for the search.")

View File

@@ -103,9 +103,7 @@ class Edge(BaseModel):
id: Unique identifier for the edge
source: ID of the source node
target: ID of the target node
group_id: Group ID for multi-tenancy
user_id: User ID for user-specific data
apply_id: Application ID for application-specific data
end_user_id: End user ID for multi-tenancy
run_id: Unique identifier for the pipeline run that created this edge
created_at: Timestamp when the edge was created (system perspective)
expired_at: Optional timestamp when the edge expires (system perspective)
@@ -113,9 +111,7 @@ class Edge(BaseModel):
id: str = Field(default_factory=lambda: uuid4().hex, description="A unique identifier for the edge.")
source: str = Field(..., description="The ID of the source node.")
target: str = Field(..., description="The ID of the target node.")
group_id: str = Field(..., description="The group ID of the edge.")
user_id: str = Field(..., description="The user ID of the edge.")
apply_id: str = Field(..., description="The apply ID of the edge.")
end_user_id: str = Field(..., description="The end user ID of the edge.")
run_id: str = Field(default_factory=lambda: uuid4().hex, description="Unique identifier for this pipeline run.")
created_at: datetime = Field(..., description="The valid time of the edge from system perspective.")
expired_at: Optional[datetime] = Field(None, description="The expired time of the edge from system perspective.")
@@ -185,18 +181,14 @@ class Node(BaseModel):
Attributes:
id: Unique identifier for the node
name: Name of the node
group_id: Group ID for multi-tenancy
user_id: User ID for user-specific data
apply_id: Application ID for application-specific data
end_user_id: End user ID for multi-tenancy
run_id: Unique identifier for the pipeline run that created this node
created_at: Timestamp when the node was created (system perspective)
expired_at: Optional timestamp when the node expires (system perspective)
"""
id: str = Field(..., description="The unique identifier for the node.")
name: str = Field(..., description="The name of the node.")
group_id: str = Field(..., description="The group ID of the node.")
user_id: str = Field(..., description="The user ID of the edge.")
apply_id: str = Field(..., description="The apply ID of the edge.")
end_user_id: str = Field(..., description="The end user ID of the node.")
run_id: str = Field(default_factory=lambda: uuid4().hex, description="Unique identifier for this pipeline run.")
created_at: datetime = Field(..., description="The valid time of the node from system perspective.")
expired_at: Optional[datetime] = Field(None, description="The expired time of the node from system perspective.")

View File

@@ -55,7 +55,7 @@ class Statement(BaseModel):
Attributes:
id: Unique identifier for the statement
chunk_id: ID of the parent chunk this statement belongs to
group_id: Optional group ID for multi-tenancy
end_user_id: Optional group ID for multi-tenancy
statement: The actual statement text content
speaker: Optional speaker identifier ('用户' for user, 'AI' for AI responses)
statement_embedding: Optional embedding vector for the statement
@@ -73,7 +73,7 @@ class Statement(BaseModel):
"""
id: str = Field(default_factory=lambda: uuid4().hex, description="A unique identifier for the statement.")
chunk_id: str = Field(..., description="ID of the parent chunk this statement belongs to.")
group_id: Optional[str] = Field(None, description="ID of the group this statement belongs to.")
end_user_id: Optional[str] = Field(None, description="ID of the group this statement belongs to.")
statement: str = Field(..., description="The text content of the statement.")
speaker: Optional[str] = Field(None, description="Speaker identifier: 'user' for user messages, 'assistant' for AI responses")
statement_embedding: Optional[List[float]] = Field(None, description="The embedding vector of the statement.")
@@ -159,9 +159,7 @@ class DialogData(BaseModel):
context: Full conversation context
dialog_embedding: Optional embedding vector for the entire dialog
ref_id: Reference ID linking to external dialog system
group_id: Group ID for multi-tenancy
user_id: User ID for user-specific data
apply_id: Application ID for application-specific data
end_user_id: End user ID for multi-tenancy
created_at: Timestamp when the dialog was created
expired_at: Timestamp when the dialog expires (default: far future)
metadata: Additional metadata as key-value pairs
@@ -175,9 +173,7 @@ class DialogData(BaseModel):
context: ConversationContext = Field(..., description="The full conversation context as a single string.")
dialog_embedding: Optional[List[float]] = Field(None, description="The embedding vector of the dialog.")
ref_id: str = Field(..., description="Refer to external dialog id. This is used to link to the original dialog.")
group_id: str = Field(default=..., description="Group ID of dialogue data")
user_id: str = Field(..., description="USER ID of dialogue data")
apply_id: str = Field(..., description="APPLY ID of dialogue data")
end_user_id: str = Field(default=..., description="End user ID of dialogue data")
run_id: str = Field(default_factory=lambda: uuid4().hex, description="Unique identifier for this pipeline run.")
created_at: datetime = Field(default_factory=datetime.now, description="The timestamp when the dialog was created.")
expired_at: datetime = Field(default_factory=lambda: datetime(9999, 12, 31), description="The timestamp when the dialog expires.")
@@ -256,5 +252,5 @@ class DialogData(BaseModel):
"""
for chunk in self.chunks:
for statement in chunk.statements:
if statement.group_id is None:
statement.group_id = self.group_id
if statement.end_user_id is None:
statement.end_user_id = self.end_user_id

View File

@@ -6,6 +6,7 @@ import os
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from uuid import UUID
if TYPE_CHECKING:
from app.schemas.memory_config_schema import MemoryConfig
@@ -396,13 +397,13 @@ def rerank_with_activation(
return reranked
def log_search_query(query_text: str, search_type: str, group_id: str | None, limit: int, include: List[str], log_file: str = None):
def log_search_query(query_text: str, search_type: str, end_user_id: str | None, limit: int, include: List[str], log_file: str = None):
"""Log search query information using the logger.
Args:
query_text: The search query text
search_type: Type of search (keyword, embedding, hybrid)
group_id: Group identifier for filtering
end_user_id: Group identifier for filtering
limit: Maximum number of results
include: List of result types to include
log_file: Deprecated parameter, kept for backward compatibility
@@ -413,7 +414,7 @@ def log_search_query(query_text: str, search_type: str, group_id: str | None, li
# Log using the standard logger
logger.info(
f"Search query: query='{cleaned_query}', type={search_type}, "
f"group_id={group_id}, limit={limit}, include={include}"
f"end_user_id={end_user_id}, limit={limit}, include={include}"
)
@@ -672,7 +673,7 @@ def apply_reranker_placeholder(
async def run_hybrid_search(
query_text: str,
search_type: str,
group_id: str | None,
end_user_id: str | None,
limit: int,
include: List[str],
output_path: str | None,
@@ -692,6 +693,9 @@ async def run_hybrid_search(
# Start overall timing
search_start_time = time.time()
latency_metrics = {}
print(100*'-')
print(memory_config)
print(100 * '-')
logger.info(f"using embedding_id:{memory_config.embedding_model_id}...")
# Clean and normalize the incoming query before use/logging
@@ -715,7 +719,7 @@ async def run_hybrid_search(
}
# Log the search query
log_search_query(query_text, search_type, group_id, limit, include)
log_search_query(query_text, search_type, end_user_id, limit, include)
connector = Neo4jConnector()
results = {}
@@ -732,7 +736,7 @@ async def run_hybrid_search(
search_graph(
connector=connector,
q=query_text,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include
)
@@ -769,7 +773,7 @@ async def run_hybrid_search(
connector=connector,
embedder_client=embedder,
query_text=query_text,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include,
)
@@ -916,9 +920,7 @@ async def run_hybrid_search(
async def search_by_temporal(
group_id: Optional[str] = "test",
apply_id: Optional[str] = None,
user_id: Optional[str] = None,
end_user_id: Optional[str] = "test",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
valid_date: Optional[str] = None,
@@ -929,7 +931,7 @@ async def search_by_temporal(
Temporal search across Statements.
- Matches statements created between start_date and end_date
- Optionally filters by group_id
- Optionally filters by end_user_id
- Returns up to 'limit' statements
"""
connector = Neo4jConnector()
@@ -939,9 +941,7 @@ async def search_by_temporal(
end_date = normalize_date_safe(end_date)
params = TemporalSearchParams.model_validate({
"group_id": group_id,
"apply_id": apply_id,
"user_id": user_id,
"end_user_id": end_user_id,
"start_date": start_date,
"end_date": end_date,
"valid_date": valid_date,
@@ -950,9 +950,7 @@ async def search_by_temporal(
})
statements = await search_graph_by_temporal(
connector=connector,
group_id=params.group_id,
apply_id=params.apply_id,
user_id=params.user_id,
end_user_id=params.end_user_id,
start_date=params.start_date,
end_date=params.end_date,
valid_date=params.valid_date,
@@ -964,9 +962,7 @@ async def search_by_temporal(
async def search_by_keyword_temporal(
query_text: str,
group_id: Optional[str] = "test",
apply_id: Optional[str] = None,
user_id: Optional[str] = None,
end_user_id: Optional[str] = "test",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
valid_date: Optional[str] = None,
@@ -987,9 +983,7 @@ async def search_by_keyword_temporal(
invalid_date = normalize_date_safe(invalid_date)
params = TemporalSearchParams.model_validate({
"group_id": group_id,
"apply_id": apply_id,
"user_id": user_id,
"end_user_id": end_user_id,
"start_date": start_date,
"end_date": end_date,
"valid_date": valid_date,
@@ -999,9 +993,7 @@ async def search_by_keyword_temporal(
statements = await search_graph_by_keyword_temporal(
connector=connector,
query_text=query_text,
group_id=params.group_id,
apply_id=params.apply_id,
user_id=params.user_id,
end_user_id=params.end_user_id,
start_date=params.start_date,
end_date=params.end_date,
valid_date=params.valid_date,
@@ -1013,7 +1005,7 @@ async def search_by_keyword_temporal(
async def search_chunk_by_chunk_id(
chunk_id: str,
group_id: Optional[str] = "test",
end_user_id: Optional[str] = "test",
limit: int = 1,
):
"""
@@ -1023,8 +1015,68 @@ async def search_chunk_by_chunk_id(
chunks = await search_graph_by_chunk_id(
connector=connector,
chunk_id=chunk_id,
group_id=group_id,
end_user_id=end_user_id,
limit=limit
)
return {"chunks": chunks}
if __name__ == '__main__':
# 测试混合检索功能
from app.schemas.memory_config_schema import MemoryConfig
from app.db import get_db
from app.services.memory_config_service import MemoryConfigService
# 从数据库获取真实配置
db = next(get_db())
try:
config_service = MemoryConfigService(db)
# 使用 config_id=17 获取配置
memory_config = config_service.load_memory_config(config_id=17)
if not memory_config:
print("错误:找不到 config_id=17 的配置")
print("请先在数据库中创建配置,或修改 config_id")
exit(1)
print(f"✓ 成功加载配置: {memory_config.config_name}")
print(f" - Workspace: {memory_config.workspace_name}")
print(f" - LLM Model: {memory_config.llm_model_name}")
print(f" - Embedding Model: {memory_config.embedding_model_name}")
print(f" - Storage Type: {memory_config.storage_type}")
print()
# 修改这里的参数进行测试
test_end_user_id = "021886bc-fab9-4fd5-b607-497b262e0381" # 修改为你的 end_user_id
test_query = "小明擅长什么?" # 修改为你的查询
print(f"开始测试检索...")
print(f" - Query: {test_query}")
print(f" - End User ID: {test_end_user_id}")
print(f" - Search Type: hybrid")
print()
results = asyncio.run(run_hybrid_search(
query_text=test_query,
search_type="hybrid", # 可选: "keyword", "embedding", "hybrid"
end_user_id=test_end_user_id,
limit=10,
include=["statements", "entities", "chunks", "summaries"],
output_path=None,
memory_config=memory_config,
rerank_alpha=0.6,
use_forgetting_rerank=False,
use_llm_rerank=False
))
print("=" * 80)
print("检索结果:")
print("=" * 80)
print(results)
except Exception as e:
print(f"错误: {e}")
import traceback
traceback.print_exc()
finally:
db.close()

View File

@@ -555,8 +555,8 @@ class DataPreprocessor:
dialog_id = item.get('dialog_id', item.get('ref_id', item.get('id', f'dialog_{i}')))
# 获取group_id如果不存在则生成默认值
group_id = item.get('group_id', f'group_default_{i}')
# 获取end_user_id如果不存在则生成默认值
end_user_id = item.get('end_user_id', f'group_default_{i}')
user_id = item.get('user_id', f'user_default_{i}')
apply_id = item.get('apply_id', f'apply_default_{i}')
@@ -574,7 +574,7 @@ class DataPreprocessor:
dialog_data = DialogData(
context=context,
ref_id=dialog_id,
group_id=group_id,
end_user_id=end_user_id,
user_id=user_id,
apply_id=apply_id,
metadata=metadata
@@ -644,7 +644,7 @@ class DataPreprocessor:
context = ConversationContext(msgs=messages)
dialog_id = item.get('dialog_id', item.get('ref_id', item.get('id', f'dialog_{i}')))
group_id = item.get('group_id', f'group_default_{i}')
end_user_id = item.get('end_user_id', f'group_default_{i}')
user_id = item.get('user_id', f'user_default_{i}')
apply_id = item.get('apply_id', f'apply_default_{i}')
@@ -657,7 +657,7 @@ class DataPreprocessor:
dialog_data = DialogData(
context=context,
ref_id=dialog_id,
group_id=group_id,
end_user_id=end_user_id,
user_id=user_id,
apply_id=apply_id,
metadata=metadata

View File

@@ -199,7 +199,7 @@ def accurate_match(
entity_nodes: List[ExtractedEntityNode]
) -> Tuple[List[ExtractedEntityNode], Dict[str, str], Dict[str, Dict]]:
"""
精确匹配:按 (group_id, name, entity_type) 合并实体并建立重定向与合并记录。
精确匹配:按 (end_user_id, name, entity_type) 合并实体并建立重定向与合并记录。
返回: (deduped_entities, id_redirect, exact_merge_map)
"""
exact_merge_map: Dict[str, Dict] = {}
@@ -210,8 +210,8 @@ def accurate_match(
for ent in entity_nodes:
name_norm = (getattr(ent, "name", "") or "").strip()
type_norm = (getattr(ent, "entity_type", "") or "").strip()
key = f"{getattr(ent, 'group_id', None)}|{name_norm}|{type_norm}"
# 为避免跨业务组误并,明确以 group_id 为范围边界
key = f"{getattr(ent, 'end_user_id', None)}|{name_norm}|{type_norm}"
# 为避免跨业务组误并,明确以 end_user_id 为范围边界
if key not in canonical_map:
canonical_map[key] = ent
id_redirect[ent.id] = ent.id
@@ -223,11 +223,11 @@ def accurate_match(
id_redirect[ent.id] = canonical.id
# 记录精确匹配的合并项(使用规范化键,避免外层变量误用)
try:
k = f"{canonical.group_id}|{(canonical.name or '').strip()}|{(canonical.entity_type or '').strip()}"
k = f"{canonical.end_user_id}|{(canonical.name or '').strip()}|{(canonical.entity_type or '').strip()}"
if k not in exact_merge_map:
exact_merge_map[k] = {
"canonical_id": canonical.id,
"group_id": canonical.group_id,
"end_user_id": canonical.end_user_id,
"name": canonical.name,
"entity_type": canonical.entity_type,
"merged_ids": set(),
@@ -596,7 +596,7 @@ def fuzzy_match(
b = deduped_entities[j]
# 跳过不同业务组的实体
if getattr(a, "group_id", None) != getattr(b, "group_id", None):
if getattr(a, "end_user_id", None) != getattr(b, "end_user_id", None):
j += 1
continue
@@ -671,7 +671,7 @@ def fuzzy_match(
merge_reason = "[别名匹配]" if alias_match_merge else "[模糊]"
merge_reason = "[别名匹配]" if alias_match_merge else "[模糊]"
fuzzy_merge_records.append(
f"{merge_reason} 规范实体 {a.id} ({a.group_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.group_id}|{b.name}|{b.entity_type}) | "
f"{merge_reason} 规范实体 {a.id} ({a.end_user_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.end_user_id}|{b.name}|{b.entity_type}) | "
f"s_name={s_name:.3f}, s_type={s_type:.3f}, overall={overall:.3f}, exact_alias={has_exact_match}"
)
except Exception:
@@ -779,7 +779,7 @@ async def LLM_decision( # 决策中包含去重和消歧的功能
# 记录 LLM 融合日志
try:
llm_records.append(
f"[LLM融合] 规范实体 {a.id} ({a.group_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.group_id}|{b.name}|{b.entity_type})"
f"[LLM融合] 规范实体 {a.id} ({a.end_user_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.end_user_id}|{b.name}|{b.entity_type})"
)
# 详细的“同类名称相似”记录改由 LLM 去重模块统一生成以携带 conf/reason
except Exception:
@@ -847,7 +847,7 @@ async def LLM_disamb_decision(
id_redirect[k] = a.id
try:
disamb_records.append(
f"[DISAMB合并应用] 规范实体 {a.id} ({a.group_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.group_id}|{b.name}|{b.entity_type})"
f"[DISAMB合并应用] 规范实体 {a.id} ({a.end_user_id}|{a.name}|{a.entity_type}) <- 合并实体 {b.id} ({b.end_user_id}|{b.name}|{b.entity_type})"
)
except Exception:
pass

View File

@@ -174,7 +174,7 @@ async def _judge_pair(
pass
# 3. 构建LLM判断的“上下文信息”规则层计算的所有特征 判断上下文特征有助于实体消歧首先判断的类型关系
ctx = {
"same_group": getattr(a, "group_id", None) == getattr(b, "group_id", None),
"same_group": getattr(a, "end_user_id", None) == getattr(b, "end_user_id", None),
"type_ok": _simple_type_ok(getattr(a, "entity_type", None), getattr(b, "entity_type", None)),
"type_similarity": _type_similarity(getattr(a, "entity_type", None), getattr(b, "entity_type", None)),
"name_text_sim": name_text_sim,
@@ -235,7 +235,7 @@ async def _judge_pair_disamb(
except Exception:
pass
ctx = {
"same_group": getattr(a, "group_id", None) == getattr(b, "group_id", None),
"same_group": getattr(a, "end_user_id", None) == getattr(b, "end_user_id", None),
"type_ok": _simple_type_ok(getattr(a, "entity_type", None), getattr(b, "entity_type", None)),
"name_text_sim": name_text_sim,
"name_embed_sim": name_embed_sim,
@@ -317,8 +317,8 @@ async def llm_dedup_entities( # 保留对偶判断作为子流程,是为了
a = entity_nodes[i]
for j in range(i + 1, len(entity_nodes)):
b = entity_nodes[j]
# 规则1必须属于同一组group_id相同不同组的实体不重复
if getattr(a, "group_id", None) != getattr(b, "group_id", None):
# 规则1必须属于同一组end_user_id相同不同组的实体不重复
if getattr(a, "end_user_id", None) != getattr(b, "end_user_id", None):
continue
# 规则2类型必须兼容调用_simple_type_ok判断
if not _simple_type_ok(getattr(a, "entity_type", None), getattr(b, "entity_type", None)):
@@ -474,7 +474,7 @@ async def llm_dedup_entities_iterative_blocks( # 迭代分块并发 LLM 去重
- max_rounds: upper bound for iterative passes (default 3)
- auto_merge_threshold: decision confidence for auto-merge when no co-occurrence (default 0.90)
- co_ctx_threshold: lower threshold when co-occurrence is detected (default 0.83)
- shuffle_each_round: whether to shuffle entities within group_id each round to vary block composition
- shuffle_each_round: whether to shuffle entities within end_user_id each round to vary block composition
Returns:
- global_redirect: dict losing_id -> canonical_id accumulated across rounds
@@ -509,7 +509,7 @@ async def llm_dedup_entities_iterative_blocks( # 迭代分块并发 LLM 去重
def _partition_blocks(nodes: List[ExtractedEntityNode]) -> List[List[ExtractedEntityNode]]:
"""
group_id 分块,避免跨组实体在同一块,减少无效候选对
end_user_id 分块,避免跨组实体在同一块,减少无效候选对
Args:
nodes: 实体节点列表
@@ -519,7 +519,7 @@ async def llm_dedup_entities_iterative_blocks( # 迭代分块并发 LLM 去重
"""
groups: Dict[str, List[ExtractedEntityNode]] = {}
for e in nodes:
gid = getattr(e, "group_id", None)
gid = getattr(e, "end_user_id", None)
groups.setdefault(str(gid), []).append(e)
blocks: List[List[ExtractedEntityNode]] = []
for gid, arr in groups.items():
@@ -559,7 +559,7 @@ async def llm_dedup_entities_iterative_blocks( # 迭代分块并发 LLM 去重
# Collapse nodes to canonical reps before each round to avoid redundant comparisons
# 步骤1折叠实体合并已确定的重复实体减少后续计算量
current_nodes = _collapse_nodes(current_nodes)
# 步骤2分块group_id分块避免跨组处理
# 步骤2分块end_user_id分块避免跨组处理
blocks = _partition_blocks(current_nodes)
if not blocks: # 无块可处理(实体已全部折叠),退出循环
break
@@ -645,7 +645,7 @@ async def llm_disambiguate_pairs_iterative(
a = entity_nodes[i]
b = entity_nodes[j]
# 必须同组
if getattr(a, "group_id", None) != getattr(b, "group_id", None):
if getattr(a, "end_user_id", None) != getattr(b, "end_user_id", None):
continue
ta = getattr(a, "entity_type", None)
tb = getattr(b, "entity_type", None)

View File

@@ -61,7 +61,7 @@ def _row_to_entity(row: Dict[str, Any]) -> ExtractedEntityNode:
return ExtractedEntityNode(
id=row.get("id"),
name=row.get("name") or "",
group_id=row.get("group_id") or "",
end_user_id=row.get("end_user_id") or "",
user_id=row.get("user_id") or "",
apply_id=row.get("apply_id") or "",
created_at=_parse_dt(row.get("created_at")),
@@ -79,7 +79,7 @@ def _row_to_entity(row: Dict[str, Any]) -> ExtractedEntityNode:
async def second_layer_dedup_and_merge_with_neo4j( # 二层去重的核心逻辑,与 Neo4j 中同组实体联合去重
connector: Neo4jConnector,
group_id: str, # 用于定位neo4j中同一组的实体确保只在同组内去重
end_user_id: str, # 用于定位neo4j中同一组的实体确保只在同组内去重
entity_nodes: List[ExtractedEntityNode], # 输入的实体节点列表,包含待去重的实体
statement_entity_edges: List[StatementEntityEdge], # 输入的语句实体边列表,用于处理实体之间的关系
entity_entity_edges: List[EntityEntityEdge], # 输入的实体实体边列表,用于处理实体之间的关系
@@ -88,7 +88,7 @@ async def second_layer_dedup_and_merge_with_neo4j( # 二层去重的核心逻辑
) -> Tuple[List[ExtractedEntityNode], List[StatementEntityEdge], List[EntityEntityEdge]]:
"""
第二层去重消歧:
- 以第一层结果为索引,检索相同 group_id 下的 DB 候选实体
- 以第一层结果为索引,检索相同 end_user_id 下的 DB 候选实体
- 将 DB 候选与当前实体集合联合,按既有精确/模糊/LLM 决策进行融合
- 返回融合后的实体与重定向后的边(边已指向规范 ID优先 DB ID
"""
@@ -102,7 +102,7 @@ async def second_layer_dedup_and_merge_with_neo4j( # 二层去重的核心逻辑
]
candidates_map = await get_dedup_candidates_for_entities( # 从 Neo4j 中查询候选实体并将结果赋值给candidates_map等待异步操作完成
connector=connector, group_id=group_id,
connector=connector, end_user_id=end_user_id,
entities=incoming_rows, # 传入参数:第一层实体的核心信息(作为查询索引)
use_contains_fallback=True # 传入参数:启用 “包含关系” 作为匹配失败的降级策略若精确匹配无结果用包含关系召回候选与src\database\cypher_queries.py的307产生联动
)

View File

@@ -57,11 +57,11 @@ async def dedup_layers_and_merge_and_return(
if pipeline_config is None:
raise ValueError("pipeline_config is required for dedup_layers_and_merge_and_return")
# 先探测 group_id决定报告写入策略
group_id: Optional[str] = None
# 先探测 end_user_id决定报告写入策略
end_user_id: Optional[str] = None
for dd in dialog_data_list:
group_id = getattr(dd, "group_id", None)
if group_id:
end_user_id = getattr(dd, "end_user_id", None)
if end_user_id:
break
# 第一层去重消歧
@@ -82,11 +82,11 @@ async def dedup_layers_and_merge_and_return(
# 第二层去重消歧:与 Neo4j 中同组实体联合融合
try:
if group_id:
if end_user_id:
if connector:
fused_entity_nodes, fused_statement_entity_edges, fused_entity_entity_edges = await second_layer_dedup_and_merge_with_neo4j(
connector=connector,
group_id=group_id,
end_user_id=end_user_id,
entity_nodes=dedup_entity_nodes,
statement_entity_edges=dedup_statement_entity_edges,
entity_entity_edges=dedup_entity_entity_edges,
@@ -96,7 +96,7 @@ async def dedup_layers_and_merge_and_return(
else:
print("Skip second-layer dedup: missing connector")
else:
print("Skip second-layer dedup: missing group_id")
print("Skip second-layer dedup: missing end_user_id")
except Exception as e:
print(f"Second-layer dedup failed: {e}")

View File

@@ -287,7 +287,7 @@ class ExtractionOrchestrator:
for d_idx, dialog in enumerate(dialog_data_list):
dialogue_content = dialog.content if self.config.statement_extraction.include_dialogue_context else None
for c_idx, chunk in enumerate(dialog.chunks):
all_chunks.append((chunk, dialog.group_id, dialogue_content))
all_chunks.append((chunk, dialog.end_user_id, dialogue_content))
chunk_metadata.append((d_idx, c_idx))
logger.info(f"收集到 {len(all_chunks)} 个分块,开始全局并行提取")
@@ -299,9 +299,9 @@ class ExtractionOrchestrator:
# 全局并行处理所有分块
async def extract_for_chunk(chunk_data, chunk_index):
nonlocal completed_chunks
chunk, group_id, dialogue_content = chunk_data
chunk, end_user_id, dialogue_content = chunk_data
try:
statements = await self.statement_extractor._extract_statements(chunk, group_id, dialogue_content)
statements = await self.statement_extractor._extract_statements(chunk, end_user_id, dialogue_content)
# 流式输出:每提取完一个分块的陈述句,立即发送进度
# 注意:只在试运行模式下发送陈述句详情,正式模式不发送
@@ -992,9 +992,7 @@ class ExtractionOrchestrator:
id=dialog_data.id,
name=f"Dialog_{dialog_data.id}", # 添加必需的 name 字段
ref_id=dialog_data.ref_id,
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
content=dialog_data.context.content if dialog_data.context else "",
dialog_embedding=dialog_data.dialog_embedding if hasattr(dialog_data, 'dialog_embedding') else None,
@@ -1012,9 +1010,7 @@ class ExtractionOrchestrator:
id=chunk.id,
name=f"Chunk_{chunk.id}", # 添加必需的 name 字段
dialog_id=dialog_data.id,
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
content=chunk.content,
chunk_embedding=chunk.chunk_embedding,
@@ -1035,9 +1031,7 @@ class ExtractionOrchestrator:
stmt_type=getattr(statement, 'stmt_type', 'general'), # 添加必需的 stmt_type 字段
temporal_info=getattr(statement, 'temporal_info', TemporalInfo.ATEMPORAL), # 添加必需的 temporal_info 字段
connect_strength=statement.connect_strength if statement.connect_strength is not None else 'Strong', # 添加必需的 connect_strength 字段
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
statement=statement.statement,
speaker=getattr(statement, 'speaker', None), # 添加 speaker 字段
@@ -1060,9 +1054,7 @@ class ExtractionOrchestrator:
statement_chunk_edge = StatementChunkEdge(
source=statement.id,
target=chunk.id,
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
created_at=dialog_data.created_at,
)
@@ -1095,9 +1087,7 @@ class ExtractionOrchestrator:
aliases=getattr(entity, 'aliases', []) or [], # 传递从三元组提取阶段获取的aliases
name_embedding=getattr(entity, 'name_embedding', None),
is_explicit_memory=getattr(entity, 'is_explicit_memory', False), # 新增:传递语义记忆标记
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
created_at=dialog_data.created_at,
expired_at=dialog_data.expired_at,
@@ -1112,9 +1102,7 @@ class ExtractionOrchestrator:
source=statement.id,
target=entity.id,
connect_strength=entity_connect_strength if entity_connect_strength is not None else 'Strong',
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
created_at=dialog_data.created_at,
)
@@ -1134,9 +1122,7 @@ class ExtractionOrchestrator:
relation_type=triplet.predicate,
statement=statement.statement,
source_statement_id=statement.id,
group_id=dialog_data.group_id,
user_id=dialog_data.user_id,
apply_id=dialog_data.apply_id,
end_user_id=dialog_data.end_user_id,
run_id=dialog_data.run_id, # 使用 dialog_data 的 run_id
created_at=dialog_data.created_at,
expired_at=dialog_data.expired_at,
@@ -1763,14 +1749,14 @@ class ExtractionOrchestrator:
async def get_chunked_dialogs(
chunker_strategy: str = "RecursiveChunker",
group_id: str = "group_1",
end_user_id: str = "group_1",
indices: Optional[List[int]] = None,
) -> List[DialogData]:
"""从测试数据生成分块对话
Args:
chunker_strategy: 分块策略(默认: RecursiveChunker
group_id: 组ID
end_user_id: 组ID
indices: 要处理的数据索引列表(可选)
Returns:
@@ -1834,7 +1820,7 @@ async def get_chunked_dialogs(
dialog_data = DialogData(
context=conversation_context,
ref_id=data['id'],
group_id=group_id,
end_user_id=end_user_id,
metadata=dialog_metadata,
)
@@ -1936,7 +1922,7 @@ async def get_chunked_dialogs_from_preprocessed(
async def get_chunked_dialogs_with_preprocessing(
chunker_strategy: str = "RecursiveChunker",
group_id: str = "default",
end_user_id: str = "default",
user_id: str = "default",
apply_id: str = "default",
indices: Optional[List[int]] = None,
@@ -1948,7 +1934,7 @@ async def get_chunked_dialogs_with_preprocessing(
Args:
chunker_strategy: 分块策略
group_id: 组ID
end_user_id: 组ID
user_id: 用户ID
apply_id: 应用ID
indices: 要处理的数据索引列表
@@ -1976,11 +1962,9 @@ async def get_chunked_dialogs_with_preprocessing(
indices=indices,
)
# 设置 group_id, user_id, apply_id
# 设置 end_user_id
for dd in preprocessed_data:
dd.group_id = group_id
dd.user_id = user_id
dd.apply_id = apply_id
dd.end_user_id = end_user_id
# 步骤2: 语义剪枝
try:

View File

@@ -193,9 +193,9 @@ async def _process_chunk_summary(
node = MemorySummaryNode(
id=uuid4().hex,
name=title if title else f"MemorySummaryChunk_{chunk.id}",
group_id=dialog.group_id,
user_id=dialog.user_id,
apply_id=dialog.apply_id,
end_user_id=dialog.end_user_id,
user_id=dialog.end_user_id,
apply_id=dialog.end_user_id,
run_id=dialog.run_id, # 使用 dialog 的 run_id
created_at=datetime.now(),
expired_at=datetime(9999, 12, 31),

View File

@@ -82,12 +82,12 @@ class StatementExtractor:
logger.warning(f"Chunk {getattr(chunk, 'id', 'unknown')} has no speaker field or is empty")
return None
async def _extract_statements(self, chunk, group_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]:
async def _extract_statements(self, chunk, end_user_id: Optional[str] = None, dialogue_content: str = None) -> List[Statement]:
"""Process a single chunk and return extracted statements
Args:
chunk: Chunk object to process
group_id: Group ID to assign to all statements in this chunk
end_user_id: Group ID to assign to all statements in this chunk
dialogue_content: Full dialogue content to provide as context
Returns:
@@ -158,7 +158,7 @@ class StatementExtractor:
temporal_info=temporal_type,
relevence_info=relevence_info,
chunk_id=chunk.id,
group_id=group_id,
end_user_id=end_user_id,
speaker=chunk_speaker,
)
@@ -184,10 +184,10 @@ class StatementExtractor:
logger.info(f"Processing {len(chunks_to_process)} chunks for statement extraction")
# Process all chunks concurrently, passing the group_id and dialogue content from dialog_data
# Process all chunks concurrently, passing the end_user_id and dialogue content from dialog_data
dialogue_content = dialog_data.content if self.config.include_dialogue_context else None
results = await asyncio.gather(
*[self._extract_statements(chunk, dialog_data.group_id, dialogue_content) for chunk in chunks_to_process],
*[self._extract_statements(chunk, dialog_data.end_user_id, dialogue_content) for chunk in chunks_to_process],
return_exceptions=True
)
@@ -225,7 +225,7 @@ class StatementExtractor:
for i, statement in enumerate(statements, 1):
f.write(f"Statement {i}:\n")
f.write(f"Id: {statement.id}\n")
f.write(f"Group Id: {statement.group_id}\n")
f.write(f"Group Id: {statement.end_user_id}\n")
f.write(f"Content: {statement.statement}\n")
f.write(f"Type: {statement.stmt_type.value}\n")
f.write(f"Temporal Info: {statement.temporal_info.value}\n")
@@ -298,7 +298,7 @@ class StatementExtractor:
dialog_sections.append({
"dialog_id": dialog.ref_id,
"group_id": dialog.group_id,
"end_user_id": dialog.end_user_id,
"content": dialog.content if getattr(dialog, "content", None) else "",
"strong": strong_relations,
"weak": weak_relations,
@@ -312,7 +312,7 @@ class StatementExtractor:
for idx, section in enumerate(dialog_sections, 1):
f.write(f"Dialog {idx}:\n")
f.write(f"Dialog ID: {section.get('dialog_id', '')}\n")
f.write(f"Group ID: {section.get('group_id', '')}\n")
f.write(f"Group ID: {section.get('end_user_id', '')}\n")
f.write("Content:\n")
f.write(f"{section.get('content', '')}\n")
f.write("-" * 40 + "\n\n")

View File

@@ -132,7 +132,7 @@ class TemporalExtractor:
prompt_logger.info("")
prompt_logger.info("=== TEMPORAL EXTRACTION RESULTS ===")
prompt_logger.info(
f"[Temporal] Dialog ref_id={getattr(dialog_data, 'ref_id', None)}, group_id={getattr(dialog_data, 'group_id', None)}"
f"[Temporal] Dialog ref_id={getattr(dialog_data, 'ref_id', None)}, end_user_id={getattr(dialog_data, 'end_user_id', None)}"
)
except Exception:
pass

View File

@@ -116,7 +116,7 @@ class TripletExtractor:
logger.info(f"Processing {len(all_statements)} statements for triplet extraction...")
try:
prompt_logger.info(
f"[Triplet] Dialog ref_id={getattr(dialog_data, 'ref_id', None)}, group_id={getattr(dialog_data, 'group_id', None)}, statements_to_process={len(all_statements)}"
f"[Triplet] Dialog ref_id={getattr(dialog_data, 'ref_id', None)}, end_user_id={getattr(dialog_data, 'end_user_id', None)}, statements_to_process={len(all_statements)}"
)
except Exception:
pass

View File

@@ -75,7 +75,7 @@ class AccessHistoryManager:
self,
node_id: str,
node_label: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
current_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""
@@ -91,7 +91,7 @@ class AccessHistoryManager:
Args:
node_id: 节点ID
node_label: 节点标签Statement, ExtractedEntity, MemorySummary
group_id: 组ID可选用于过滤
end_user_id: 组ID可选用于过滤
current_time: 当前时间(可选,默认使用系统时间)
Returns:
@@ -123,7 +123,7 @@ class AccessHistoryManager:
for attempt in range(self.max_retries):
try:
# 步骤1读取当前节点状态
node_data = await self._fetch_node(node_id, node_label, group_id)
node_data = await self._fetch_node(node_id, node_label, end_user_id)
if not node_data:
raise ValueError(
@@ -142,7 +142,7 @@ class AccessHistoryManager:
node_id=node_id,
node_label=node_label,
update_data=update_data,
group_id=group_id
end_user_id=end_user_id
)
logger.info(
@@ -172,7 +172,7 @@ class AccessHistoryManager:
self,
node_ids: List[str],
node_label: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
current_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
@@ -184,7 +184,7 @@ class AccessHistoryManager:
Args:
node_ids: 节点ID列表
node_label: 节点标签(所有节点必须是同一类型)
group_id: 组ID可选
end_user_id: 组ID可选
current_time: 当前时间(可选)
Returns:
@@ -202,7 +202,7 @@ class AccessHistoryManager:
task = self.record_access(
node_id=node_id,
node_label=node_label,
group_id=group_id,
end_user_id=end_user_id,
current_time=current_time
)
tasks.append(task)
@@ -235,7 +235,7 @@ class AccessHistoryManager:
self,
node_id: str,
node_label: str,
group_id: Optional[str] = None
end_user_id: Optional[str] = None
) -> Tuple[ConsistencyCheckResult, Optional[str]]:
"""
检查节点数据的一致性
@@ -249,14 +249,14 @@ class AccessHistoryManager:
Args:
node_id: 节点ID
node_label: 节点标签
group_id: 组ID可选
end_user_id: 组ID可选
Returns:
Tuple[ConsistencyCheckResult, Optional[str]]:
- 一致性检查结果枚举
- 错误描述(如果不一致)
"""
node_data = await self._fetch_node(node_id, node_label, group_id)
node_data = await self._fetch_node(node_id, node_label, end_user_id)
if not node_data:
return ConsistencyCheckResult.CONSISTENT, None
@@ -305,7 +305,7 @@ class AccessHistoryManager:
async def check_batch_consistency(
self,
node_label: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
limit: int = 1000
) -> Dict[str, Any]:
"""
@@ -313,7 +313,7 @@ class AccessHistoryManager:
Args:
node_label: 节点标签
group_id: 组ID可选
end_user_id: 组ID可选
limit: 检查的最大节点数
Returns:
@@ -329,16 +329,16 @@ class AccessHistoryManager:
MATCH (n:{node_label})
WHERE n.access_history IS NOT NULL
"""
if group_id:
query += " AND n.group_id = $group_id"
if end_user_id:
query += " AND n.end_user_id = $end_user_id"
query += """
RETURN n.id as id
LIMIT $limit
"""
params = {"limit": limit}
if group_id:
params["group_id"] = group_id
if end_user_id:
params["end_user_id"] = end_user_id
results = await self.connector.execute_query(query, **params)
node_ids = [r['id'] for r in results]
@@ -351,7 +351,7 @@ class AccessHistoryManager:
result, message = await self.check_consistency(
node_id=node_id,
node_label=node_label,
group_id=group_id
end_user_id=end_user_id
)
if result == ConsistencyCheckResult.CONSISTENT:
@@ -387,7 +387,7 @@ class AccessHistoryManager:
self,
node_id: str,
node_label: str,
group_id: Optional[str] = None
end_user_id: Optional[str] = None
) -> bool:
"""
自动修复节点的数据不一致问题
@@ -401,7 +401,7 @@ class AccessHistoryManager:
Args:
node_id: 节点ID
node_label: 节点标签
group_id: 组ID可选
end_user_id: 组ID可选
Returns:
bool: 修复成功返回True否则返回False
@@ -411,7 +411,7 @@ class AccessHistoryManager:
result, message = await self.check_consistency(
node_id=node_id,
node_label=node_label,
group_id=group_id
end_user_id=end_user_id
)
if result == ConsistencyCheckResult.CONSISTENT:
@@ -419,7 +419,7 @@ class AccessHistoryManager:
return True
# 获取节点数据
node_data = await self._fetch_node(node_id, node_label, group_id)
node_data = await self._fetch_node(node_id, node_label, end_user_id)
if not node_data:
logger.error(f"节点不存在,无法修复: {node_label}[{node_id}]")
return False
@@ -457,8 +457,8 @@ class AccessHistoryManager:
query = f"""
MATCH (n:{node_label} {{id: $node_id}})
"""
if group_id:
query += " WHERE n.group_id = $group_id"
if end_user_id:
query += " WHERE n.end_user_id = $end_user_id"
query += """
SET n += $repair_data
RETURN n
@@ -468,8 +468,8 @@ class AccessHistoryManager:
'node_id': node_id,
'repair_data': repair_data
}
if group_id:
params['group_id'] = group_id
if end_user_id:
params['end_user_id'] = end_user_id
await self.connector.execute_query(query, **params)
@@ -491,7 +491,7 @@ class AccessHistoryManager:
self,
node_id: str,
node_label: str,
group_id: Optional[str] = None
end_user_id: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
获取节点数据
@@ -499,7 +499,7 @@ class AccessHistoryManager:
Args:
node_id: 节点ID
node_label: 节点标签
group_id: 组ID可选
end_user_id: 组ID可选
Returns:
Optional[Dict[str, Any]]: 节点数据如果不存在返回None
@@ -507,8 +507,8 @@ class AccessHistoryManager:
query = f"""
MATCH (n:{node_label} {{id: $node_id}})
"""
if group_id:
query += " WHERE n.group_id = $group_id"
if end_user_id:
query += " WHERE n.end_user_id = $end_user_id"
query += """
RETURN n.id as id,
n.importance_score as importance_score,
@@ -519,8 +519,8 @@ class AccessHistoryManager:
"""
params = {'node_id': node_id}
if group_id:
params['group_id'] = group_id
if end_user_id:
params['end_user_id'] = end_user_id
results = await self.connector.execute_query(query, **params)
@@ -585,7 +585,7 @@ class AccessHistoryManager:
node_id: str,
node_label: str,
update_data: Dict[str, Any],
group_id: Optional[str] = None
end_user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
原子性更新节点(使用乐观锁)
@@ -597,7 +597,7 @@ class AccessHistoryManager:
node_id: 节点ID
node_label: 节点标签
update_data: 更新数据
group_id: 组ID可选
end_user_id: 组ID可选
Returns:
Dict[str, Any]: 更新后的节点数据
@@ -606,13 +606,13 @@ class AccessHistoryManager:
RuntimeError: 如果更新失败或发生版本冲突
"""
# 定义事务函数
async def update_transaction(tx, node_id, node_label, update_data, group_id):
async def update_transaction(tx, node_id, node_label, update_data, end_user_id):
# 步骤1读取当前节点并获取版本号
read_query = f"""
MATCH (n:{node_label} {{id: $node_id}})
"""
if group_id:
read_query += " WHERE n.group_id = $group_id"
if end_user_id:
read_query += " WHERE n.end_user_id = $end_user_id"
read_query += """
RETURN n.id as id,
n.version as version,
@@ -624,8 +624,8 @@ class AccessHistoryManager:
"""
read_params = {'node_id': node_id}
if group_id:
read_params['group_id'] = group_id
if end_user_id:
read_params['end_user_id'] = end_user_id
read_result = await tx.run(read_query, **read_params)
current_node = await read_result.single()
@@ -656,8 +656,8 @@ class AccessHistoryManager:
# 构建 WHERE 子句
where_conditions = []
if group_id:
where_conditions.append("n.group_id = $group_id")
if end_user_id:
where_conditions.append("n.end_user_id = $end_user_id")
# 添加版本检查
if current_version > 0:
@@ -695,8 +695,8 @@ class AccessHistoryManager:
'last_access_time': update_data['last_access_time'],
'access_count': update_data['access_count']
}
if group_id:
update_params['group_id'] = group_id
if end_user_id:
update_params['end_user_id'] = end_user_id
update_result = await tx.run(update_query, **update_params)
updated_node = await update_result.single()
@@ -720,7 +720,7 @@ class AccessHistoryManager:
node_id=node_id,
node_label=node_label,
update_data=update_data,
group_id=group_id
end_user_id=end_user_id
)
return result
except Exception as e:

View File

@@ -66,7 +66,7 @@ class ForgettingScheduler:
async def run_forgetting_cycle(
self,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
max_merge_batch_size: int = 100,
min_days_since_access: int = 30,
config_id: Optional[int] = None,
@@ -77,7 +77,7 @@ class ForgettingScheduler:
Args:
group_id: 组 ID可选用于过滤特定组的节点
end_user_id: 组 ID可选用于过滤特定组的节点
max_merge_batch_size: 单次最大融合节点对数(默认 100
min_days_since_access: 最小未访问天数(默认 30 天)
config_id: 配置ID可选用于获取 llm_id
@@ -107,19 +107,19 @@ class ForgettingScheduler:
start_time_iso = start_time.isoformat()
logger.info(
f"开始遗忘周期: group_id={group_id}, "
f"开始遗忘周期: end_user_id={end_user_id}, "
f"max_batch={max_merge_batch_size}, "
f"min_days={min_days_since_access}"
)
try:
# 步骤1统计遗忘前的节点数量
nodes_before = await self._count_knowledge_nodes(group_id)
nodes_before = await self._count_knowledge_nodes(end_user_id)
logger.info(f"遗忘前节点总数: {nodes_before}")
# 步骤2识别可遗忘的节点对
forgettable_pairs = await self.forgetting_strategy.find_forgettable_nodes(
group_id=group_id,
end_user_id=end_user_id,
min_days_since_access=min_days_since_access
)
@@ -213,7 +213,7 @@ class ForgettingScheduler:
'statement_text': pair['statement_text'],
'statement_activation': pair['statement_activation'],
'statement_importance': pair['statement_importance'],
'group_id': group_id
'end_user_id': end_user_id
}
entity_node = {
@@ -222,7 +222,7 @@ class ForgettingScheduler:
'entity_type': pair['entity_type'],
'entity_activation': pair['entity_activation'],
'entity_importance': pair['entity_importance'],
'group_id': group_id
'end_user_id': end_user_id
}
# 融合节点
@@ -262,7 +262,7 @@ class ForgettingScheduler:
continue
# 步骤6统计遗忘后的节点数量
nodes_after = await self._count_knowledge_nodes(group_id)
nodes_after = await self._count_knowledge_nodes(end_user_id)
logger.info(f"遗忘后节点总数: {nodes_after}")
# 步骤7生成遗忘报告
@@ -315,7 +315,7 @@ class ForgettingScheduler:
async def _count_knowledge_nodes(
self,
group_id: Optional[str] = None
end_user_id: Optional[str] = None
) -> int:
"""
统计知识层节点总数
@@ -323,7 +323,7 @@ class ForgettingScheduler:
统计 Statement、ExtractedEntity 和 MemorySummary 节点的总数。
Args:
group_id: 组 ID可选用于过滤特定组的节点
end_user_id: 组 ID可选用于过滤特定组的节点
Returns:
int: 知识层节点总数
@@ -333,16 +333,16 @@ class ForgettingScheduler:
WHERE (n:Statement OR n:ExtractedEntity OR n:MemorySummary)
"""
if group_id:
query += " AND n.group_id = $group_id"
if end_user_id:
query += " AND n.end_user_id = $end_user_id"
query += """
RETURN count(n) as total
"""
params = {}
if group_id:
params['group_id'] = group_id
if end_user_id:
end_user_id['end_user_id'] = end_user_id
results = await self.connector.execute_query(query, **params)

View File

@@ -90,7 +90,7 @@ class ForgettingStrategy:
async def find_forgettable_nodes(
self,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
min_days_since_access: int = 30
) -> List[Dict[str, Any]]:
"""
@@ -102,7 +102,7 @@ class ForgettingStrategy:
3. Statement 和 Entity 之间存在关系边
Args:
group_id: 组 ID可选用于过滤特定组的节点
end_user_id: 组 ID可选用于过滤特定组的节点
min_days_since_access: 最小未访问天数(默认 30 天)
Returns:
@@ -136,8 +136,8 @@ class ForgettingStrategy:
AND (e.entity_type IS NULL OR e.entity_type <> 'Person')
"""
if group_id:
query += " AND s.group_id = $group_id AND e.group_id = $group_id"
if end_user_id:
query += " AND s.end_user_id = $end_user_id AND e.end_user_id = $end_user_id"
query += """
RETURN s.id as statement_id,
@@ -159,8 +159,8 @@ class ForgettingStrategy:
'threshold': self.forgetting_threshold,
'cutoff_time': cutoff_time_iso
}
if group_id:
params['group_id'] = group_id
if end_user_id:
params['end_user_id'] = end_user_id
results = await self.connector.execute_query(query, **params)
@@ -247,8 +247,8 @@ class ForgettingStrategy:
entity_activation = entity_node['entity_activation']
entity_importance = entity_node['entity_importance']
# 获取 group_id从 statement 或 entity 节点)
group_id = statement_node.get('group_id') or entity_node.get('group_id')
# 获取 end_user_id从 statement 或 entity 节点)
end_user_id = statement_node.get('end_user_id') or entity_node.get('end_user_id')
# 生成摘要内容
summary_text = await self._generate_summary(
@@ -325,7 +325,7 @@ class ForgettingStrategy:
last_access_time: $current_time,
access_count: 1,
version: 1,
group_id: $group_id,
end_user_id: $end_user_id,
created_at: datetime($current_time),
merged_at: datetime($current_time)
})
@@ -423,7 +423,7 @@ class ForgettingStrategy:
'inherited_activation': inherited_activation,
'inherited_importance': inherited_importance,
'current_time': current_time_iso,
'group_id': group_id
'end_user_id': end_user_id
}
try:

View File

@@ -37,7 +37,7 @@ __all__ = [
async def run_hybrid_search(
query_text: str,
search_type: str = "hybrid",
group_id: str | None = None,
end_user_id: str | None = None,
apply_id: str | None = None,
user_id: str | None = None,
limit: int = 50,
@@ -54,7 +54,7 @@ async def run_hybrid_search(
Args:
query_text: 查询文本
search_type: 搜索类型("hybrid", "keyword", "semantic"
group_id: 组ID过滤
end_user_id: 组ID过滤
apply_id: 应用ID过滤
user_id: 用户ID过滤
limit: 每个类别的最大结果数
@@ -104,7 +104,7 @@ async def run_hybrid_search(
# 执行搜索
result = await strategy.search(
query_text=query_text,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include,
alpha=alpha,

View File

@@ -77,7 +77,7 @@
# async def search(
# self,
# query_text: str,
# group_id: Optional[str] = None,
# end_user_id: Optional[str] = None,
# limit: int = 50,
# include: Optional[List[str]] = None,
# **kwargs
@@ -86,7 +86,7 @@
# Args:
# query_text: 查询文本
# group_id: 可选的组ID过滤
# end_user_id: 可选的组ID过滤
# limit: 每个类别的最大结果数
# include: 要包含的搜索类别列表
# **kwargs: 其他搜索参数如alpha, use_forgetting_curve
@@ -94,7 +94,7 @@
# Returns:
# SearchResult: 搜索结果对象
# """
# logger.info(f"执行混合搜索: query='{query_text}', group_id={group_id}, limit={limit}")
# logger.info(f"执行混合搜索: query='{query_text}', end_user_id={end_user_id}, limit={limit}")
# # 从kwargs中获取参数
# alpha = kwargs.get("alpha", self.alpha)
@@ -107,14 +107,14 @@
# # 并行执行关键词搜索和语义搜索
# keyword_result = await self.keyword_strategy.search(
# query_text=query_text,
# group_id=group_id,
# end_user_id=end_user_id,
# limit=limit,
# include=include_list
# )
# semantic_result = await self.semantic_strategy.search(
# query_text=query_text,
# group_id=group_id,
# end_user_id=end_user_id,
# limit=limit,
# include=include_list
# )
@@ -139,7 +139,7 @@
# metadata = self._create_metadata(
# query_text=query_text,
# search_type="hybrid",
# group_id=group_id,
# end_user_id=end_user_id,
# limit=limit,
# include=include_list,
# alpha=alpha,
@@ -165,7 +165,7 @@
# metadata=self._create_metadata(
# query_text=query_text,
# search_type="hybrid",
# group_id=group_id,
# end_user_id=end_user_id,
# limit=limit,
# error=str(e)
# )

View File

@@ -44,7 +44,7 @@ class KeywordSearchStrategy(SearchStrategy):
async def search(
self,
query_text: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
limit: int = 50,
include: Optional[List[str]] = None,
**kwargs
@@ -53,7 +53,7 @@ class KeywordSearchStrategy(SearchStrategy):
Args:
query_text: 查询文本
group_id: 可选的组ID过滤
end_user_id: 可选的组ID过滤
limit: 每个类别的最大结果数
include: 要包含的搜索类别列表
**kwargs: 其他搜索参数
@@ -61,7 +61,7 @@ class KeywordSearchStrategy(SearchStrategy):
Returns:
SearchResult: 搜索结果对象
"""
logger.info(f"执行关键词搜索: query='{query_text}', group_id={group_id}, limit={limit}")
logger.info(f"执行关键词搜索: query='{query_text}', end_user_id={end_user_id}, limit={limit}")
# 获取有效的搜索类别
include_list = self._get_include_list(include)
@@ -75,7 +75,7 @@ class KeywordSearchStrategy(SearchStrategy):
results_dict = await search_graph(
connector=self.connector,
q=query_text,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include_list
)
@@ -84,7 +84,7 @@ class KeywordSearchStrategy(SearchStrategy):
metadata = self._create_metadata(
query_text=query_text,
search_type="keyword",
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include_list
)
@@ -115,7 +115,7 @@ class KeywordSearchStrategy(SearchStrategy):
metadata=self._create_metadata(
query_text=query_text,
search_type="keyword",
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
error=str(e)
)

View File

@@ -58,7 +58,7 @@ class SearchStrategy(ABC):
async def search(
self,
query_text: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
limit: int = 50,
include: Optional[List[str]] = None,
**kwargs
@@ -67,7 +67,7 @@ class SearchStrategy(ABC):
Args:
query_text: 查询文本
group_id: 可选的组ID过滤
end_user_id: 可选的组ID过滤
limit: 每个类别的最大结果数
include: 要包含的搜索类别列表statements, chunks, entities, summaries
**kwargs: 其他搜索参数
@@ -81,7 +81,7 @@ class SearchStrategy(ABC):
self,
query_text: str,
search_type: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
limit: int = 50,
**kwargs
) -> Dict[str, Any]:
@@ -90,7 +90,7 @@ class SearchStrategy(ABC):
Args:
query_text: 查询文本
search_type: 搜索类型
group_id: 组ID
end_user_id: 组ID
limit: 结果限制
**kwargs: 其他元数据
@@ -100,7 +100,7 @@ class SearchStrategy(ABC):
metadata = {
"query": query_text,
"search_type": search_type,
"group_id": group_id,
"end_user_id": end_user_id,
"limit": limit,
"timestamp": datetime.now().isoformat()
}

View File

@@ -85,7 +85,7 @@ class SemanticSearchStrategy(SearchStrategy):
async def search(
self,
query_text: str,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
limit: int = 50,
include: Optional[List[str]] = None,
**kwargs
@@ -94,7 +94,7 @@ class SemanticSearchStrategy(SearchStrategy):
Args:
query_text: 查询文本
group_id: 可选的组ID过滤
end_user_id: 可选的组ID过滤
limit: 每个类别的最大结果数
include: 要包含的搜索类别列表
**kwargs: 其他搜索参数
@@ -102,7 +102,7 @@ class SemanticSearchStrategy(SearchStrategy):
Returns:
SearchResult: 搜索结果对象
"""
logger.info(f"执行语义搜索: query='{query_text}', group_id={group_id}, limit={limit}")
logger.info(f"执行语义搜索: query='{query_text}', end_user_id={end_user_id}, limit={limit}")
# 获取有效的搜索类别
include_list = self._get_include_list(include)
@@ -119,7 +119,7 @@ class SemanticSearchStrategy(SearchStrategy):
connector=self.connector,
embedder_client=self.embedder_client,
query_text=query_text,
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include_list
)
@@ -128,7 +128,7 @@ class SemanticSearchStrategy(SearchStrategy):
metadata = self._create_metadata(
query_text=query_text,
search_type="semantic",
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
include=include_list
)
@@ -159,7 +159,7 @@ class SemanticSearchStrategy(SearchStrategy):
metadata=self._create_metadata(
query_text=query_text,
search_type="semantic",
group_id=group_id,
end_user_id=end_user_id,
limit=limit,
error=str(e)
)

View File

@@ -23,7 +23,7 @@ async def _load_(data: List[Any]) -> List[Dict]:
target_keys = [
"id",
"statement",
"group_id",
"end_user_id",
"chunk_id",
"created_at",
"expired_at",
@@ -75,7 +75,7 @@ async def get_data(result):
"""
EXCLUDE_FIELDS = {
"user_id",
"group_id",
"end_user_id",
"entity_type",
"connect_strength",
"relationship_type",

View File

@@ -62,7 +62,7 @@ class ConfigAuditLogger:
self,
config_id: str,
user_id: Optional[str] = None,
group_id: Optional[str] = None,
end_user_id: Optional[str] = None,
success: bool = True,
details: Optional[Dict[str, Any]] = None
):
@@ -72,14 +72,14 @@ class ConfigAuditLogger:
Args:
config_id: 配置 ID
user_id: 用户 ID可选
group_id: 组 ID可选
end_user_id: 组 ID可选
success: 是否成功
details: 详细信息(可选)
"""
result = "SUCCESS" if success else "FAILED"
msg = (
f"CONFIG_LOAD config_id={config_id} "
f"user={user_id or 'N/A'} group={group_id or 'N/A'} "
f"user={user_id or 'N/A'} group={end_user_id or 'N/A'} "
f"result={result}"
)
if details:
@@ -121,7 +121,7 @@ class ConfigAuditLogger:
self,
operation: str,
config_id: str,
group_id: str,
end_user_id: str,
success: bool = True,
duration: Optional[float] = None,
error: Optional[str] = None,
@@ -133,7 +133,7 @@ class ConfigAuditLogger:
Args:
operation: 操作类型WRITE, READ 等)
config_id: 配置 ID
group_id: 组 ID
end_user_id: 组 ID
success: 是否成功
duration: 操作耗时(秒)
error: 错误信息(可选)
@@ -142,7 +142,7 @@ class ConfigAuditLogger:
result = "SUCCESS" if success else "FAILED"
msg = (
f"{operation.upper()} config_id={config_id} "
f"group={group_id} result={result}"
f"group={end_user_id} result={result}"
)
if duration is not None:
msg += f" duration={duration:.2f}s"