From ee50b25d06d3f41550eacc37d3983d229126ad24 Mon Sep 17 00:00:00 2001 From: lixinyue11 <94037597+lixinyue11@users.noreply.github.com> Date: Thu, 29 Jan 2026 19:27:02 +0800 Subject: [PATCH] Add/develop memory (#247) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 * 遗漏的历史映射 --- .../controllers/memory_forget_controller.py | 8 +- api/app/core/agent/langchain_agent.py | 80 ++++++++++--------- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/api/app/controllers/memory_forget_controller.py b/api/app/controllers/memory_forget_controller.py index ea55ea26..2b5ef72f 100644 --- a/api/app/controllers/memory_forget_controller.py +++ b/api/app/controllers/memory_forget_controller.py @@ -84,10 +84,8 @@ async def trigger_forgetting_cycle( connected_config = get_end_user_connected_config(end_user_id, db) config_id = connected_config.get("memory_config_id") - config_id = resolve_config_id(int(config_id), db) + config_id = resolve_config_id((config_id), db) - - if config_id is None: api_logger.warning(f"终端用户 {end_user_id} 未关联记忆配置") return fail(BizCode.INVALID_PARAMETER, f"终端用户 {end_user_id} 未关联记忆配置", "memory_config_id is None") @@ -199,7 +197,7 @@ async def update_forgetting_config( ApiResponse: 包含更新结果的响应 """ workspace_id = current_user.current_workspace_id - payload.config_id=resolve_config_id(int(payload.config_id), db) + payload.config_id=resolve_config_id((payload.config_id), db) # 检查用户是否已选择工作空间 @@ -330,7 +328,7 @@ async def get_forgetting_curve( ApiResponse: 包含遗忘曲线数据的响应 """ workspace_id = current_user.current_workspace_id - request.config_id = resolve_config_id(int(request.config_id), db) + request.config_id = resolve_config_id((request.config_id), db) # 检查用户是否已选择工作空间 if workspace_id is None: api_logger.warning(f"用户 {current_user.username} 尝试获取遗忘曲线但未选择工作空间") diff --git a/api/app/core/agent/langchain_agent.py b/api/app/core/agent/langchain_agent.py index eb58c905..a34c781f 100644 --- a/api/app/core/agent/langchain_agent.py +++ b/api/app/core/agent/langchain_agent.py @@ -177,7 +177,6 @@ class LangChainAgent: # messagss_list.append(f'用户:{query}。AI回复:{aimessages}') # retrieved_content.append({query: aimessages}) # return messagss_list,retrieved_content - async def write(self, storage_type, end_user_id, user_message, ai_message, user_rag_memory_id, actual_end_user_id, actual_config_id): """ 写入记忆(支持结构化消息) @@ -200,49 +199,52 @@ class LangChainAgent: """ db = next(get_db()) - actual_config_id=resolve_config_id(actual_config_id, db) - if storage_type == "rag": - # RAG 模式:组合消息为字符串格式(保持原有逻辑) - combined_message = f"user: {user_message}\nassistant: {ai_message}" - await write_rag(end_user_id, combined_message, user_rag_memory_id) - logger.info(f'RAG_Agent:{end_user_id};{user_rag_memory_id}') - else: - # Neo4j 模式:使用结构化消息列表 - structured_messages = [] + try: + actual_config_id=resolve_config_id(actual_config_id, db) - # 始终添加用户消息(如果不为空) - if user_message: - structured_messages.append({"role": "user", "content": user_message}) + if storage_type == "rag": + # RAG 模式:组合消息为字符串格式(保持原有逻辑) + combined_message = f"user: {user_message}\nassistant: {ai_message}" + await write_rag(end_user_id, combined_message, user_rag_memory_id) + logger.info(f'RAG_Agent:{end_user_id};{user_rag_memory_id}') + else: + # Neo4j 模式:使用结构化消息列表 + structured_messages = [] - # 只有当 AI 回复不为空时才添加 assistant 消息 - if ai_message: - structured_messages.append({"role": "assistant", "content": ai_message}) + # 始终添加用户消息(如果不为空) + if user_message: + structured_messages.append({"role": "user", "content": user_message}) - # 如果没有消息,直接返回 - if not structured_messages: - logger.warning(f"No messages to write for user {actual_end_user_id}") - return + # 只有当 AI 回复不为空时才添加 assistant 消息 + if ai_message: + structured_messages.append({"role": "assistant", "content": ai_message}) - # 调用 Celery 任务,传递结构化消息列表 - # 数据流: - # 1. structured_messages 传递给 write_message_task - # 2. write_message_task 调用 memory_agent_service.write_memory - # 3. write_memory 调用 write_tools.write,传递 messages 参数 - # 4. write_tools.write 调用 get_chunked_dialogs,传递 messages 参数 - # 5. get_chunked_dialogs 为每条消息创建独立的 Chunk,设置 speaker 字段 - # 6. 每个 Chunk 保存到 Neo4j,包含 speaker 字段 - logger.info(f"[WRITE] Submitting Celery task - user={actual_end_user_id}, messages={len(structured_messages)}, config={actual_config_id}") - write_id = write_message_task.delay( - actual_end_user_id, # end_user_id: 用户ID - structured_messages, # message: 结构化消息列表 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] - actual_config_id, # config_id: 配置ID - storage_type, # storage_type: "neo4j" - user_rag_memory_id # user_rag_memory_id: RAG记忆ID(Neo4j模式下不使用) - ) - logger.info(f"[WRITE] Celery task submitted - task_id={write_id}") - write_status = get_task_memory_write_result(str(write_id)) - logger.info(f'[WRITE] Task result - user={actual_end_user_id}, status={write_status}') + # 如果没有消息,直接返回 + if not structured_messages: + logger.warning(f"No messages to write for user {actual_end_user_id}") + return + # 调用 Celery 任务,传递结构化消息列表 + # 数据流: + # 1. structured_messages 传递给 write_message_task + # 2. write_message_task 调用 memory_agent_service.write_memory + # 3. write_memory 调用 write_tools.write,传递 messages 参数 + # 4. write_tools.write 调用 get_chunked_dialogs,传递 messages 参数 + # 5. get_chunked_dialogs 为每条消息创建独立的 Chunk,设置 speaker 字段 + # 6. 每个 Chunk 保存到 Neo4j,包含 speaker 字段 + logger.info(f"[WRITE] Submitting Celery task - user={actual_end_user_id}, messages={len(structured_messages)}, config={actual_config_id}") + write_id = write_message_task.delay( + actual_end_user_id, # end_user_id: 用户ID + structured_messages, # message: 结构化消息列表 [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] + actual_config_id, # config_id: 配置ID + storage_type, # storage_type: "neo4j" + user_rag_memory_id # user_rag_memory_id: RAG记忆ID(Neo4j模式下不使用) + ) + logger.info(f"[WRITE] Celery task submitted - task_id={write_id}") + write_status = get_task_memory_write_result(str(write_id)) + logger.info(f'[WRITE] Task result - user={actual_end_user_id}, status={write_status}') + finally: + db.close() async def chat( self, message: str,