feat(chat): add message_id field to chat API response

This commit is contained in:
Eternity
2026-03-06 13:37:16 +08:00
parent a008f5fbef
commit e833db954a
4 changed files with 49 additions and 28 deletions

View File

@@ -111,7 +111,7 @@ async def Split_The_Problem(state: ReadState) -> ReadState:
"error_type": type(e).__name__, "error_type": type(e).__name__,
"error_message": str(e), "error_message": str(e),
"content_length": len(content), "content_length": len(content),
"llm_model_id": memory_config.llm_model_id if memory_config else None "llm_model_id": str(memory_config.llm_model_id) if memory_config else None
} }
logger.error(f"Split_The_Problem error details: {error_details}") logger.error(f"Split_The_Problem error details: {error_details}")
@@ -221,7 +221,7 @@ async def Problem_Extension(state: ReadState) -> ReadState:
"error_type": type(e).__name__, "error_type": type(e).__name__,
"error_message": str(e), "error_message": str(e),
"questions_count": len(databasets), "questions_count": len(databasets),
"llm_model_id": memory_config.llm_model_id if memory_config else None "llm_model_id": str(memory_config.llm_model_id) if memory_config else None
} }
logger.error(f"Problem_Extension error details: {error_details}") logger.error(f"Problem_Extension error details: {error_details}")

View File

@@ -144,7 +144,7 @@ class AppChatService:
) )
# 保存消息 # 保存消息
self.conversation_service.save_conversation_messages( message_id = self.conversation_service.save_conversation_messages(
conversation_id=conversation_id, conversation_id=conversation_id,
user_message=message, user_message=message,
assistant_message=result["content"], assistant_message=result["content"],
@@ -163,6 +163,7 @@ class AppChatService:
return { return {
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message_id": str(message_id),
"message": result["content"], "message": result["content"],
"usage": result.get("usage", { "usage": result.get("usage", {
"prompt_tokens": 0, "prompt_tokens": 0,
@@ -191,7 +192,11 @@ class AppChatService:
try: try:
start_time = time.time() start_time = time.time()
config_id = None config_id = None
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" message_id = uuid.uuid4()
yield f"event: start\ndata: {json.dumps({
'conversation_id': str(conversation_id),
"message_id": str(message_id)
}, ensure_ascii=False)}\n\n"
variables = self.agent_service.prepare_variables(variables, config.variables) variables = self.agent_service.prepare_variables(variables, config.variables)
# 获取模型配置ID # 获取模型配置ID
@@ -296,6 +301,7 @@ class AppChatService:
) )
self.conversation_service.add_message( self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=full_content, content=full_content,
@@ -373,7 +379,7 @@ class AppChatService:
content=message content=message
) )
self.conversation_service.add_message( ai_message = self.conversation_service.add_message(
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=result.get("message", ""), content=result.get("message", ""),
@@ -391,6 +397,7 @@ class AppChatService:
return { return {
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message": result.get("message", ""), "message": result.get("message", ""),
"message_id": str(ai_message.id),
"usage": { "usage": {
"prompt_tokens": 0, "prompt_tokens": 0,
"completion_tokens": 0, "completion_tokens": 0,
@@ -419,9 +426,9 @@ class AppChatService:
variables = {} variables = {}
try: try:
message_id = uuid.uuid4()
# 发送开始事件 # 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id), "message_id": str(message_id)}, ensure_ascii=False)}\n\n"
full_content = "" full_content = ""
total_tokens = 0 total_tokens = 0
@@ -429,6 +436,7 @@ class AppChatService:
# 2. 创建编排器 # 2. 创建编排器
orchestrator = MultiAgentOrchestrator(self.db, config) orchestrator = MultiAgentOrchestrator(self.db, config)
# 3. 流式执行任务 # 3. 流式执行任务
async for event in orchestrator.execute_stream( async for event in orchestrator.execute_stream(
message=message, message=message,
@@ -472,6 +480,7 @@ class AppChatService:
) )
self.conversation_service.add_message( self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=full_content, content=full_content,

View File

@@ -178,7 +178,8 @@ class ConversationService:
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
role: str, role: str,
content: str, content: str,
meta_data: Optional[dict] = None meta_data: Optional[dict] = None,
message_id: Optional[uuid.UUID] = None,
) -> Message: ) -> Message:
""" """
Add a message to a conversation using UnitOfWork. Add a message to a conversation using UnitOfWork.
@@ -188,6 +189,7 @@ class ConversationService:
role (str): Role of the message sender ('user' or 'assistant'). role (str): Role of the message sender ('user' or 'assistant').
content (str): Message content. content (str): Message content.
meta_data (Optional[dict]): Optional metadata. meta_data (Optional[dict]): Optional metadata.
message_id (Optional[uuid.UUID]): Optional custom message UUID.
Returns: Returns:
Message: Newly created Message instance. Message: Newly created Message instance.
@@ -198,6 +200,7 @@ class ConversationService:
) )
message = Message( message = Message(
id=message_id if message_id else uuid.uuid4(),
conversation_id=conversation_id, conversation_id=conversation_id,
role=role, role=role,
content=content, content=content,
@@ -317,7 +320,7 @@ class ConversationService:
content=user_message content=user_message
) )
self.add_message( ai_message = self.add_message(
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=assistant_message, content=assistant_message,
@@ -332,6 +335,7 @@ class ConversationService:
"assistant_message_length": len(assistant_message) "assistant_message_length": len(assistant_message)
} }
) )
return ai_message.id
def delete_conversation( def delete_conversation(
self, self,

View File

@@ -496,6 +496,7 @@ class WorkflowService:
"event": "start", "event": "start",
"data": { "data": {
"conversation_id": payload.get("conversation_id"), "conversation_id": payload.get("conversation_id"),
"message_id": payload.get("message_id")
} }
} }
case "workflow_end": case "workflow_end":
@@ -624,24 +625,28 @@ class WorkflowService:
workspace_id=str(workspace_id), workspace_id=str(workspace_id),
user_id=payload.user_id user_id=payload.user_id
) )
# 更新执行结果 # 更新执行结果
if result.get("status") == "completed": if result.get("status") == "completed":
token_usage = result.get("token_usage", {}) or {} token_usage = result.get("token_usage", {}) or {}
final_messages = result.get("messages", [])[init_message_length:]
for message in final_messages:
message_obj = self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
if message["role"] != "user":
result["message_id"] = str(message_obj.id)
self.update_execution_status( self.update_execution_status(
execution.execution_id, execution.execution_id,
"completed", "completed",
output_data=result, output_data=result,
token_usage=token_usage.get("total_tokens", None) token_usage=token_usage.get("total_tokens", None)
) )
final_messages = result.get("messages", [])[init_message_length:] logger.error(f"Workflow Run Failed, execution_id: {execution.execution_id},"
for message in final_messages: f" error: {result.get('error')}")
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
logger.info(f"Workflow Run Success, " logger.info(f"Workflow Run Success, "
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
else: else:
@@ -659,6 +664,7 @@ class WorkflowService:
# "messages": result.get("messages"), # "messages": result.get("messages"),
"output": result.get("output"), # 最终输出(字符串) "output": result.get("output"), # 最终输出(字符串)
"message": result.get("output"), # 最终输出(字符串) "message": result.get("output"), # 最终输出(字符串)
"message_id": result.get("message_id"),
# "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID "conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
"error_message": result.get("error"), "error_message": result.get("error"),
@@ -756,7 +762,7 @@ class WorkflowService:
input_data["conv_messages"] = last_state.get("messages") or [] input_data["conv_messages"] = last_state.get("messages") or []
break break
init_message_length = len(input_data.get("conv_messages", [])) init_message_length = len(input_data.get("conv_messages", []))
message_id = uuid.uuid4()
async for event in execute_workflow_stream( async for event in execute_workflow_stream(
workflow_config=workflow_config_dict, workflow_config=workflow_config_dict,
input_data=input_data, input_data=input_data,
@@ -765,24 +771,24 @@ class WorkflowService:
user_id=payload.user_id, user_id=payload.user_id,
): ):
if event.get("event") == "workflow_end": if event.get("event") == "workflow_end":
status = event.get("data", {}).get("status") status = event.get("data", {}).get("status")
token_usage = event.get("data", {}).get("token_usage", {}) or {} token_usage = event.get("data", {}).get("token_usage", {}) or {}
if status == "completed": if status == "completed":
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
for message in final_messages:
self.conversation_service.add_message(
message_id=message_id if message["role"] != "user" else uuid.uuid4(),
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
self.update_execution_status( self.update_execution_status(
execution.execution_id, execution.execution_id,
"completed", "completed",
output_data=event.get("data"), output_data=event.get("data"),
token_usage=token_usage.get("total_tokens", None) token_usage=token_usage.get("total_tokens", None)
) )
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
for message in final_messages:
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
logger.info(f"Workflow Run Success, " logger.info(f"Workflow Run Success, "
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
elif status == "failed": elif status == "failed":
@@ -793,6 +799,8 @@ class WorkflowService:
) )
else: else:
logger.error(f"unexpect workflow run status, status: {status}") logger.error(f"unexpect workflow run status, status: {status}")
elif event.get("event") == "workflow_start":
event["data"]["message_id"] = str(message_id)
event = self._emit(public, event) event = self._emit(public, event)
if event: if event:
yield event yield event