diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index dea555b9..ea7962c1 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user, cur_workspace_access_guard -from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail +from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage from app.schemas.response_schema import PageData, PageMeta from app.services.app_service import AppService from app.services.app_log_service import AppLogService @@ -78,17 +78,32 @@ def get_app_log_detail( # 验证应用访问权限 app_service = AppService(db) - app_service.get_app(app_id, workspace_id) + app = app_service.get_app(app_id, workspace_id) # 使用 Service 层查询 log_service = AppLogService(db) - conversation, node_executions_map = log_service.get_conversation_detail( + conversation, messages, node_executions_map = log_service.get_conversation_detail( app_id=app_id, conversation_id=conversation_id, - workspace_id=workspace_id + workspace_id=workspace_id, + app_type=app.type ) - detail = AppLogConversationDetail.model_validate(conversation) - detail.node_executions_map = node_executions_map + # 构建基础会话信息(不经过 ORM relationship) + base = AppLogConversation.model_validate(conversation) + + # 单独处理 messages,避免触发 SQLAlchemy relationship 校验 + if messages and isinstance(messages[0], AppLogMessage): + # 工作流:已经是 AppLogMessage 实例 + msg_list = messages + else: + # Agent:ORM Message 对象逐个转换 + msg_list = [AppLogMessage.model_validate(m) for m in messages] + + detail = AppLogConversationDetail( + **base.model_dump(), + messages=msg_list, + node_executions_map=node_executions_map, + ) return success(data=detail) diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 1633b9c7..1ac46180 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -180,6 +180,8 @@ class IterationRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name, "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index e555a228..ca432a04 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -210,6 +210,8 @@ class LoopRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_name, "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index a60a8428..ce9ddd44 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -14,6 +14,7 @@ class AppLogMessage(BaseModel): conversation_id: uuid.UUID role: str = Field(description="角色: user / assistant / system") content: str + status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed") meta_data: Optional[Dict[str, Any]] = None created_at: datetime.datetime @@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel): input: Optional[Any] = None process: Optional[Any] = None output: Optional[Any] = None + cycle_items: Optional[List[Any]] = None elapsed_time: Optional[float] = None token_usage: Optional[Dict[str, Any]] = None diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index b2801df3..e8a80d40 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -1,16 +1,17 @@ """应用日志服务层""" import uuid +import datetime as dt from typing import Optional, Tuple -from datetime import datetime from sqlalchemy import select from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger +from app.models.app_model import AppType from app.models.conversation_model import Conversation, Message from app.models.workflow_model import WorkflowExecution from app.repositories.conversation_repository import ConversationRepository, MessageRepository -from app.schemas.app_log_schema import AppLogNodeExecution +from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution logger = get_business_logger() @@ -83,51 +84,40 @@ class AppLogService: self, app_id: uuid.UUID, conversation_id: uuid.UUID, - workspace_id: uuid.UUID - ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: + workspace_id: uuid.UUID, + app_type: str = AppType.AGENT + ) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]: """ - 查询会话详情(包含消息和工作流节点执行记录) - - Args: - app_id: 应用 ID - conversation_id: 会话 ID - workspace_id: 工作空间 ID + 查询会话详情 Returns: - Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: - (包含消息的会话对象, 按消息ID分组的节点执行记录) - - Raises: - ResourceNotFoundException: 当会话不存在时 + Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]] """ logger.info( "查询应用日志会话详情", extra={ "app_id": str(app_id), "conversation_id": str(conversation_id), - "workspace_id": str(workspace_id) + "workspace_id": str(workspace_id), + "app_type": app_type } ) - # 查询会话 conversation = self.conversation_repository.get_conversation_for_app_log( conversation_id=conversation_id, app_id=app_id, workspace_id=workspace_id ) - # 查询消息(按时间正序) - messages = self.message_repository.get_messages_by_conversation( - conversation_id=conversation_id - ) - - # 将消息附加到会话对象 - conversation.messages = messages - - # 查询工作流节点执行记录(按消息分组) - _, node_executions_map = self._get_workflow_node_executions_with_map( - conversation_id, messages - ) + if app_type == AppType.WORKFLOW: + messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id) + else: + messages = self.message_repository.get_messages_by_conversation( + conversation_id=conversation_id + ) + _, node_executions_map = self._get_workflow_node_executions_with_map( + conversation_id, messages + ) logger.info( "查询应用日志会话详情成功", @@ -139,7 +129,97 @@ class AppLogService: } ) - return conversation, node_executions_map + return conversation, messages, node_executions_map + + def _get_workflow_messages_and_nodes( + self, + conversation_id: uuid.UUID, + ) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]: + """ + 工作流应用专用:从 workflow_executions 构建 messages 和节点日志。 + + 每条 WorkflowExecution 对应一轮对话: + - user message:来自 execution.input_data + - assistant message:来自 execution.output_data(失败时内容为错误信息) + 节点日志以 execution id 为 key 分组。 + + Returns: + (messages 列表, node_executions_map) + """ + stmt = ( + select(WorkflowExecution) + .where( + WorkflowExecution.conversation_id == conversation_id, + WorkflowExecution.status.in_(["completed", "failed"]) + ) + .order_by(WorkflowExecution.started_at.asc()) + ) + executions = list(self.db.scalars(stmt).all()) + + messages: list[AppLogMessage] = [] + node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + + for execution in executions: + started_at = execution.started_at or dt.datetime.now() + completed_at = execution.completed_at or started_at + + # assistant message 的 id,同时作为 node_executions_map 的 key + assistant_msg_id = uuid.uuid5(execution.id, "assistant") + + # --- user message(输入)--- + input_content = _extract_text(execution.input_data) + user_msg = AppLogMessage( + id=uuid.uuid5(execution.id, "user"), + conversation_id=conversation_id, + role="user", + content=input_content, + meta_data=None, + created_at=started_at, + ) + messages.append(user_msg) + + # --- assistant message(输出)--- + if execution.status == "completed": + output_content = _extract_text(execution.output_data) + meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time} + else: + output_content = _extract_text(execution.output_data) or "" + meta = {"error": execution.error_message, "error_node_id": execution.error_node_id} + + assistant_msg = AppLogMessage( + id=assistant_msg_id, + conversation_id=conversation_id, + role="assistant", + content=output_content, + status=execution.status, + meta_data=meta, + created_at=completed_at, + ) + messages.append(assistant_msg) + + # --- 节点执行记录,key 与 assistant message id 一致 --- + execution_nodes = [] + for node_exec in execution.node_executions: + output_data = dict(node_exec.output_data or {}) + cycle_items = output_data.pop("cycle_items", None) + execution_nodes.append(AppLogNodeExecution( + node_id=node_exec.node_id, + node_type=node_exec.node_type, + node_name=node_exec.node_name, + status=node_exec.status, + error=node_exec.error_message, + input=node_exec.input_data, + process=None, + output=output_data, + cycle_items=cycle_items, + elapsed_time=node_exec.elapsed_time, + token_usage=node_exec.token_usage, + )) + + if execution_nodes: + node_executions_map[str(assistant_msg_id)] = execution_nodes + + return messages, node_executions_map def _get_workflow_node_executions_with_map( self, @@ -191,6 +271,8 @@ class AppLogService: # 构建节点执行记录列表 execution_nodes = [] for node_exec in execution.node_executions: + output_data = dict(node_exec.output_data or {}) + cycle_items = output_data.pop("cycle_items", None) node_execution = AppLogNodeExecution( node_id=node_exec.node_id, node_type=node_exec.node_type, @@ -199,7 +281,8 @@ class AppLogService: error=node_exec.error_message, input=node_exec.input_data, process=None, - output=node_exec.output_data, + output=output_data, + cycle_items=cycle_items, elapsed_time=node_exec.elapsed_time, token_usage=node_exec.token_usage, ) @@ -223,9 +306,9 @@ class AppLogService: if msg_id_str in used_message_ids: continue if msg.created_at and msg.created_at >= execution.started_at: - dt = (msg.created_at - execution.started_at).total_seconds() - if best_dt is None or dt < best_dt: - best_dt = dt + delta = (msg.created_at - execution.started_at).total_seconds() + if best_dt is None or delta < best_dt: + best_dt = delta best_msg = msg if not best_msg: @@ -236,3 +319,17 @@ class AppLogService: node_executions_map[msg_id_str] = execution_nodes return node_executions, node_executions_map + + +def _extract_text(data: Optional[dict]) -> str: + """从 workflow execution 的 input_data / output_data 中提取可读文本。 + + 优先取 'text'、'content'、'output' 字段;若都没有则 JSON 序列化整个 dict。 + """ + if not data: + return "" + for key in ("text", "content", "output", "result", "answer"): + if key in data and isinstance(data[key], str): + return data[key] + import json + return json.dumps(data, ensure_ascii=False)