From e833db954a7d397c45edf738f6cc17b4b35a53e5 Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Fri, 6 Mar 2026 13:37:16 +0800 Subject: [PATCH] feat(chat): add message_id field to chat API response --- .../langgraph_graph/nodes/problem_nodes.py | 4 +- api/app/services/app_chat_service.py | 19 ++++++-- api/app/services/conversation_service.py | 8 +++- api/app/services/workflow_service.py | 46 +++++++++++-------- 4 files changed, 49 insertions(+), 28 deletions(-) diff --git a/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py b/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py index c8cc0460..784e5802 100644 --- a/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py +++ b/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py @@ -111,7 +111,7 @@ async def Split_The_Problem(state: ReadState) -> ReadState: "error_type": type(e).__name__, "error_message": str(e), "content_length": len(content), - "llm_model_id": memory_config.llm_model_id if memory_config else None + "llm_model_id": str(memory_config.llm_model_id) if memory_config else None } logger.error(f"Split_The_Problem error details: {error_details}") @@ -221,7 +221,7 @@ async def Problem_Extension(state: ReadState) -> ReadState: "error_type": type(e).__name__, "error_message": str(e), "questions_count": len(databasets), - "llm_model_id": memory_config.llm_model_id if memory_config else None + "llm_model_id": str(memory_config.llm_model_id) if memory_config else None } logger.error(f"Problem_Extension error details: {error_details}") diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 5430d2f9..f3cdde2a 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -144,7 +144,7 @@ class AppChatService: ) # 保存消息 - self.conversation_service.save_conversation_messages( + message_id = self.conversation_service.save_conversation_messages( conversation_id=conversation_id, user_message=message, assistant_message=result["content"], @@ -163,6 +163,7 @@ class AppChatService: return { "conversation_id": conversation_id, + "message_id": str(message_id), "message": result["content"], "usage": result.get("usage", { "prompt_tokens": 0, @@ -191,7 +192,11 @@ class AppChatService: try: start_time = time.time() config_id = None - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" + message_id = uuid.uuid4() + yield f"event: start\ndata: {json.dumps({ + 'conversation_id': str(conversation_id), + "message_id": str(message_id) + }, ensure_ascii=False)}\n\n" variables = self.agent_service.prepare_variables(variables, config.variables) # 获取模型配置ID @@ -296,6 +301,7 @@ class AppChatService: ) self.conversation_service.add_message( + message_id=message_id, conversation_id=conversation_id, role="assistant", content=full_content, @@ -373,7 +379,7 @@ class AppChatService: content=message ) - self.conversation_service.add_message( + ai_message = self.conversation_service.add_message( conversation_id=conversation_id, role="assistant", content=result.get("message", ""), @@ -391,6 +397,7 @@ class AppChatService: return { "conversation_id": conversation_id, "message": result.get("message", ""), + "message_id": str(ai_message.id), "usage": { "prompt_tokens": 0, "completion_tokens": 0, @@ -419,9 +426,9 @@ class AppChatService: variables = {} try: - + message_id = uuid.uuid4() # 发送开始事件 - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" + yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id), "message_id": str(message_id)}, ensure_ascii=False)}\n\n" full_content = "" total_tokens = 0 @@ -429,6 +436,7 @@ class AppChatService: # 2. 创建编排器 orchestrator = MultiAgentOrchestrator(self.db, config) + # 3. 流式执行任务 async for event in orchestrator.execute_stream( message=message, @@ -472,6 +480,7 @@ class AppChatService: ) self.conversation_service.add_message( + message_id=message_id, conversation_id=conversation_id, role="assistant", content=full_content, diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py index 553aefc4..aff5f533 100644 --- a/api/app/services/conversation_service.py +++ b/api/app/services/conversation_service.py @@ -178,7 +178,8 @@ class ConversationService: conversation_id: uuid.UUID, role: str, content: str, - meta_data: Optional[dict] = None + meta_data: Optional[dict] = None, + message_id: Optional[uuid.UUID] = None, ) -> Message: """ Add a message to a conversation using UnitOfWork. @@ -188,6 +189,7 @@ class ConversationService: role (str): Role of the message sender ('user' or 'assistant'). content (str): Message content. meta_data (Optional[dict]): Optional metadata. + message_id (Optional[uuid.UUID]): Optional custom message UUID. Returns: Message: Newly created Message instance. @@ -198,6 +200,7 @@ class ConversationService: ) message = Message( + id=message_id if message_id else uuid.uuid4(), conversation_id=conversation_id, role=role, content=content, @@ -317,7 +320,7 @@ class ConversationService: content=user_message ) - self.add_message( + ai_message = self.add_message( conversation_id=conversation_id, role="assistant", content=assistant_message, @@ -332,6 +335,7 @@ class ConversationService: "assistant_message_length": len(assistant_message) } ) + return ai_message.id def delete_conversation( self, diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index d13e3454..ce27e276 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -496,6 +496,7 @@ class WorkflowService: "event": "start", "data": { "conversation_id": payload.get("conversation_id"), + "message_id": payload.get("message_id") } } case "workflow_end": @@ -624,24 +625,28 @@ class WorkflowService: workspace_id=str(workspace_id), user_id=payload.user_id ) - # 更新执行结果 if result.get("status") == "completed": token_usage = result.get("token_usage", {}) or {} + + final_messages = result.get("messages", [])[init_message_length:] + for message in final_messages: + message_obj = self.conversation_service.add_message( + conversation_id=conversation_id_uuid, + role=message["role"], + content=message["content"], + meta_data=None if message["role"] == "user" else {"usage": token_usage} + ) + if message["role"] != "user": + result["message_id"] = str(message_obj.id) self.update_execution_status( execution.execution_id, "completed", output_data=result, token_usage=token_usage.get("total_tokens", None) ) - final_messages = result.get("messages", [])[init_message_length:] - for message in final_messages: - self.conversation_service.add_message( - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) + logger.error(f"Workflow Run Failed, execution_id: {execution.execution_id}," + f" error: {result.get('error')}") logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") else: @@ -659,6 +664,7 @@ class WorkflowService: # "messages": result.get("messages"), "output": result.get("output"), # 最终输出(字符串) "message": result.get("output"), # 最终输出(字符串) + "message_id": result.get("message_id"), # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID "error_message": result.get("error"), @@ -756,7 +762,7 @@ class WorkflowService: input_data["conv_messages"] = last_state.get("messages") or [] break init_message_length = len(input_data.get("conv_messages", [])) - + message_id = uuid.uuid4() async for event in execute_workflow_stream( workflow_config=workflow_config_dict, input_data=input_data, @@ -765,24 +771,24 @@ class WorkflowService: user_id=payload.user_id, ): if event.get("event") == "workflow_end": - status = event.get("data", {}).get("status") token_usage = event.get("data", {}).get("token_usage", {}) or {} if status == "completed": + final_messages = event.get("data", {}).get("messages", [])[init_message_length:] + for message in final_messages: + self.conversation_service.add_message( + message_id=message_id if message["role"] != "user" else uuid.uuid4(), + conversation_id=conversation_id_uuid, + role=message["role"], + content=message["content"], + meta_data=None if message["role"] == "user" else {"usage": token_usage} + ) self.update_execution_status( execution.execution_id, "completed", output_data=event.get("data"), token_usage=token_usage.get("total_tokens", None) ) - final_messages = event.get("data", {}).get("messages", [])[init_message_length:] - for message in final_messages: - self.conversation_service.add_message( - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") elif status == "failed": @@ -793,6 +799,8 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") + elif event.get("event") == "workflow_start": + event["data"]["message_id"] = str(message_id) event = self._emit(public, event) if event: yield event