diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index dea555b9..ea7962c1 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger from app.core.response_utils import success from app.db import get_db from app.dependencies import get_current_user, cur_workspace_access_guard -from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail +from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage from app.schemas.response_schema import PageData, PageMeta from app.services.app_service import AppService from app.services.app_log_service import AppLogService @@ -78,17 +78,32 @@ def get_app_log_detail( # 验证应用访问权限 app_service = AppService(db) - app_service.get_app(app_id, workspace_id) + app = app_service.get_app(app_id, workspace_id) # 使用 Service 层查询 log_service = AppLogService(db) - conversation, node_executions_map = log_service.get_conversation_detail( + conversation, messages, node_executions_map = log_service.get_conversation_detail( app_id=app_id, conversation_id=conversation_id, - workspace_id=workspace_id + workspace_id=workspace_id, + app_type=app.type ) - detail = AppLogConversationDetail.model_validate(conversation) - detail.node_executions_map = node_executions_map + # 构建基础会话信息(不经过 ORM relationship) + base = AppLogConversation.model_validate(conversation) + + # 单独处理 messages,避免触发 SQLAlchemy relationship 校验 + if messages and isinstance(messages[0], AppLogMessage): + # 工作流:已经是 AppLogMessage 实例 + msg_list = messages + else: + # Agent:ORM Message 对象逐个转换 + msg_list = [AppLogMessage.model_validate(m) for m in messages] + + detail = AppLogConversationDetail( + **base.model_dump(), + messages=msg_list, + node_executions_map=node_executions_map, + ) return success(data=detail) diff --git a/api/app/core/tools/custom/base.py b/api/app/core/tools/custom/base.py index c03fe206..06237d32 100644 --- a/api/app/core/tools/custom/base.py +++ b/api/app/core/tools/custom/base.py @@ -73,6 +73,7 @@ class CustomTool(BaseTool): # 添加通用参数(基于第一个操作的参数) if self._parsed_operations: first_operation = next(iter(self._parsed_operations.values())) + # path/query 参数 for param_name, param_info in first_operation.get("parameters", {}).items(): params.append(ToolParameter( name=param_name, @@ -85,6 +86,23 @@ class CustomTool(BaseTool): maximum=param_info.get("maximum"), pattern=param_info.get("pattern") )) + # requestBody 参数 — 将 body 字段平铺为独立参数暴露给模型 + request_body = first_operation.get("request_body") + if request_body: + body_schema = request_body.get("properties", {}) + required_fields = request_body.get("required", []) + for prop_name, prop_schema in body_schema.items(): + params.append(ToolParameter( + name=prop_name, + type=self._convert_openapi_type(prop_schema.get("type", "string")), + description=prop_schema.get("description", ""), + required=prop_name in required_fields, + default=prop_schema.get("default"), + enum=prop_schema.get("enum"), + minimum=prop_schema.get("minimum"), + maximum=prop_schema.get("maximum"), + pattern=prop_schema.get("pattern") + )) return params diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 1633b9c7..3ee7774f 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -180,6 +180,9 @@ class IterationRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index e555a228..93f1a1e4 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -210,6 +210,9 @@ class LoopRuntime: "cycle_id": self.node_id, "cycle_idx": idx, "node_id": node_name, + "node_type": node_type, + "node_name": node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/schemas/app_log_schema.py b/api/app/schemas/app_log_schema.py index a60a8428..ce9ddd44 100644 --- a/api/app/schemas/app_log_schema.py +++ b/api/app/schemas/app_log_schema.py @@ -14,6 +14,7 @@ class AppLogMessage(BaseModel): conversation_id: uuid.UUID role: str = Field(description="角色: user / assistant / system") content: str + status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed") meta_data: Optional[Dict[str, Any]] = None created_at: datetime.datetime @@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel): input: Optional[Any] = None process: Optional[Any] = None output: Optional[Any] = None + cycle_items: Optional[List[Any]] = None elapsed_time: Optional[float] = None token_usage: Optional[Dict[str, Any]] = None diff --git a/api/app/services/api_key_service.py b/api/app/services/api_key_service.py index 49b07121..e4367e98 100644 --- a/api/app/services/api_key_service.py +++ b/api/app/services/api_key_service.py @@ -65,6 +65,11 @@ class ApiKeyService: BizCode.BAD_REQUEST ) + if data.resource_id: + app = db.get(App, data.resource_id) + if not app or not app.current_release_id: + raise BusinessException("该应用未发布", BizCode.APP_NOT_PUBLISHED) + # 生成 API Key api_key = generate_api_key(data.type) diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 8f5052e6..7ca05d42 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -1,16 +1,17 @@ """应用日志服务层""" import uuid +import datetime as dt from typing import Optional, Tuple -from datetime import datetime from sqlalchemy import select from sqlalchemy.orm import Session from app.core.logging_config import get_business_logger +from app.models.app_model import AppType from app.models.conversation_model import Conversation, Message from app.models.workflow_model import WorkflowExecution from app.repositories.conversation_repository import ConversationRepository, MessageRepository -from app.schemas.app_log_schema import AppLogNodeExecution +from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution logger = get_business_logger() @@ -83,51 +84,40 @@ class AppLogService: self, app_id: uuid.UUID, conversation_id: uuid.UUID, - workspace_id: uuid.UUID - ) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: + workspace_id: uuid.UUID, + app_type: str = AppType.AGENT + ) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]: """ - 查询会话详情(包含消息和工作流节点执行记录) - - Args: - app_id: 应用 ID - conversation_id: 会话 ID - workspace_id: 工作空间 ID + 查询会话详情 Returns: - Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: - (包含消息的会话对象, 按消息ID分组的节点执行记录) - - Raises: - ResourceNotFoundException: 当会话不存在时 + Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]] """ logger.info( "查询应用日志会话详情", extra={ "app_id": str(app_id), "conversation_id": str(conversation_id), - "workspace_id": str(workspace_id) + "workspace_id": str(workspace_id), + "app_type": app_type } ) - # 查询会话 conversation = self.conversation_repository.get_conversation_for_app_log( conversation_id=conversation_id, app_id=app_id, workspace_id=workspace_id ) - # 查询消息(按时间正序) - messages = self.message_repository.get_messages_by_conversation( - conversation_id=conversation_id - ) - - # 将消息附加到会话对象 - conversation.messages = messages - - # 查询工作流节点执行记录(按消息分组) - _, node_executions_map = self._get_workflow_node_executions_with_map( - conversation_id, messages - ) + if app_type == AppType.WORKFLOW: + messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id) + else: + messages = self.message_repository.get_messages_by_conversation( + conversation_id=conversation_id + ) + node_executions_map = self._get_workflow_node_executions_with_map( + conversation_id, messages + ) logger.info( "查询应用日志会话详情成功", @@ -139,13 +129,129 @@ class AppLogService: } ) - return conversation, node_executions_map + return conversation, messages, node_executions_map + + def _get_workflow_messages_and_nodes( + self, + conversation_id: uuid.UUID, + ) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]: + """ + 工作流应用专用:从 workflow_executions 构建 messages 和节点日志。 + + 每条 WorkflowExecution 对应一轮对话: + - user message:来自 execution.input_data(content 取 message 字段,files 放 meta_data) + - assistant message:来自 execution.output_data(失败时内容为错误信息) + 开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。 + + Returns: + (messages 列表, node_executions_map) + """ + stmt = ( + select(WorkflowExecution) + .where( + WorkflowExecution.conversation_id == conversation_id, + WorkflowExecution.status.in_(["completed", "failed"]) + ) + .order_by(WorkflowExecution.started_at.asc()) + ) + executions = list(self.db.scalars(stmt).all()) + + # 查开场白:Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息 + opening_stmt = ( + select(Message) + .where( + Message.conversation_id == conversation_id, + Message.role == "assistant", + ) + .order_by(Message.created_at.asc()) + .limit(10) + ) + early_messages = list(self.db.scalars(opening_stmt).all()) + suggested_questions: list = [] + for m in early_messages: + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data: + suggested_questions = m.meta_data.get("suggested_questions") or [] + break + + messages: list[AppLogMessage] = [] + node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + + # 如果有开场白,作为第一条 assistant 消息插入 + if suggested_questions or early_messages: + opening_msg = next( + (m for m in early_messages + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data), + None + ) + if opening_msg: + messages.append(AppLogMessage( + id=opening_msg.id, + conversation_id=conversation_id, + role="assistant", + content=opening_msg.content, + status=None, + meta_data={"suggested_questions": suggested_questions}, + created_at=opening_msg.created_at, + )) + + for execution in executions: + started_at = execution.started_at or dt.datetime.now() + completed_at = execution.completed_at or started_at + + # assistant message 的 id,同时作为 node_executions_map 的 key + assistant_msg_id = uuid.uuid5(execution.id, "assistant") + + # --- user message(输入)--- + input_data = execution.input_data or {} + input_content = input_data.get("message") or _extract_text(input_data) + + # 跳过没有用户输入的 execution(如开场白触发的记录) + if not input_content or not input_content.strip(): + continue + + files = input_data.get("files") or [] + user_msg = AppLogMessage( + id=uuid.uuid5(execution.id, "user"), + conversation_id=conversation_id, + role="user", + content=input_content, + meta_data={"files": files} if files else None, + created_at=started_at, + ) + messages.append(user_msg) + + # --- assistant message(输出)--- + if execution.status == "completed": + output_content = _extract_text(execution.output_data) + meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time} + else: + output_content = _extract_text(execution.output_data) or "" + meta = {"error": execution.error_message, "error_node_id": execution.error_node_id} + + assistant_msg = AppLogMessage( + id=assistant_msg_id, + conversation_id=conversation_id, + role="assistant", + content=output_content, + status=execution.status, + meta_data=meta, + created_at=completed_at, + ) + messages.append(assistant_msg) + + # --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 --- + execution_nodes = _build_nodes_from_output_data(execution.output_data) + + if execution_nodes: + node_executions_map[str(assistant_msg_id)] = execution_nodes + + return messages, node_executions_map def _get_workflow_node_executions_with_map( self, conversation_id: uuid.UUID, messages: list[Message] - ) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + ) -> dict[str, list[AppLogNodeExecution]]: """ 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 @@ -157,13 +263,12 @@ class AppLogService: Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) """ - node_executions = [] node_executions_map: dict[str, list[AppLogNodeExecution]] = {} # 查询该会话关联的所有工作流执行记录(按时间正序) stmt = select(WorkflowExecution).where( WorkflowExecution.conversation_id == conversation_id, - WorkflowExecution.status == "completed" + WorkflowExecution.status.in_(["completed", "failed"]) ).order_by(WorkflowExecution.started_at.asc()) executions = self.db.scalars(stmt).all() @@ -188,10 +293,18 @@ class AppLogService: used_message_ids: set[str] = set() for execution in executions: - if not execution.output_data: + # 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取 + execution_nodes = _build_nodes_from_output_data(execution.output_data) + + if not execution_nodes: continue - # 找到该 execution 对应的 assistant message + # 失败的执行没有 assistant message,直接用 execution id 作为 key + if execution.status == "failed": + node_executions_map[f"execution_{str(execution.id)}"] = execution_nodes + continue + + # completed:通过时序匹配关联到对应的 assistant message # 逻辑:找 execution.started_at 之后最近的、未使用的 assistant message best_msg = None best_dt = None @@ -200,9 +313,9 @@ class AppLogService: if msg_id_str in used_message_ids: continue if msg.created_at and msg.created_at >= execution.started_at: - dt = (msg.created_at - execution.started_at).total_seconds() - if best_dt is None or dt < best_dt: - best_dt = dt + delta = (msg.created_at - execution.started_at).total_seconds() + if best_dt is None or delta < best_dt: + best_dt = delta best_msg = msg if not best_msg: @@ -210,31 +323,76 @@ class AppLogService: msg_id_str = str(best_msg.id) used_message_ids.add(msg_id_str) + node_executions_map[msg_id_str] = execution_nodes - # 提取节点输出 - output_data = execution.output_data - if isinstance(output_data, dict): - node_outputs = output_data.get("node_outputs", {}) - execution_nodes = [] - for node_id, node_data in node_outputs.items(): - if not isinstance(node_data, dict): - continue - node_execution = AppLogNodeExecution( - node_id=node_data.get("node_id", node_id), - node_type=node_data.get("node_type", "unknown"), - node_name=node_data.get("node_name"), - status=node_data.get("status", "unknown"), - error=node_data.get("error"), - input=node_data.get("input"), - process=node_data.get("process"), - output=node_data.get("output"), - elapsed_time=node_data.get("elapsed_time"), - token_usage=node_data.get("token_usage"), - ) - node_executions.append(node_execution) - execution_nodes.append(node_execution) + return node_executions_map - # 将节点记录关联到 message_id - node_executions_map[msg_id_str] = execution_nodes - return node_executions, node_executions_map +def _extract_text(data: Optional[dict]) -> str: + """从 workflow execution 的 input_data / output_data 中提取可读文本。 + + 优先取 'text'、'content'、'output' 字段;若都没有则 JSON 序列化整个 dict。 + """ + if not data: + return "" + for key in ("message", "text", "content", "output", "result", "answer"): + if key in data and isinstance(data[key], str): + return data[key] + import json + return json.dumps(data, ensure_ascii=False) + + +def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]: + """从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。 + + output_data 结构: + { + "node_outputs": { + "": { + "node_type": ..., + "node_name": ..., + "status": ..., + "input": ..., + "output": ..., + "elapsed_time": ..., + "token_usage": ..., + "error": ..., + "cycle_items": [...], + ... + } + }, + "error": ..., + ... + } + """ + if not output_data: + return [] + node_outputs: dict = output_data.get("node_outputs") or {} + result = [] + for node_id, node_data in node_outputs.items(): + if not isinstance(node_data, dict): + continue + output = dict(node_data) + cycle_items = output.pop("cycle_items", None) + # 把已知的顶层字段剥离,剩余的作为 output + node_type = output.pop("node_type", "unknown") + node_name = output.pop("node_name", None) + status = output.pop("status", "completed") + error = output.pop("error", None) + inp = output.pop("input", None) + elapsed_time = output.pop("elapsed_time", None) + token_usage = output.pop("token_usage", None) + result.append(AppLogNodeExecution( + node_id=node_id, + node_type=node_type, + node_name=node_name, + status=status, + error=error, + input=inp, + process=None, + output=output if output else None, + cycle_items=cycle_items, + elapsed_time=elapsed_time, + token_usage=token_usage, + )) + return result diff --git a/api/app/services/tool_service.py b/api/app/services/tool_service.py index 9a59cd81..ff734c9d 100644 --- a/api/app/services/tool_service.py +++ b/api/app/services/tool_service.py @@ -815,11 +815,12 @@ class ToolService: "default": param_info.get("default") }) - # 请求体参数 + # 请求体参数 — _extract_request_body 返回 {"schema": {...}, "required": bool, ...} request_body = operation.get("request_body") if request_body: - schema_props = request_body.get("schema", {}).get("properties", {}) - required_props = request_body.get("schema", {}).get("required", []) + body_schema = request_body.get("schema", {}) + schema_props = body_schema.get("properties", {}) + required_props = body_schema.get("required", []) for prop_name, prop_schema in schema_props.items(): parameters.append({ diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 63b7073e..b35656d9 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -17,8 +17,9 @@ from app.core.workflow.executor import execute_workflow, execute_workflow_stream from app.core.workflow.nodes.enums import NodeType from app.core.workflow.validator import validate_workflow_config from app.db import get_db +from sqlalchemy import select from app.models import App -from app.models.workflow_model import WorkflowConfig, WorkflowExecution +from app.models.workflow_model import WorkflowConfig, WorkflowExecution, WorkflowNodeExecution from app.repositories import knowledge_repository from app.repositories.workflow_repository import ( WorkflowConfigRepository, @@ -918,6 +919,7 @@ class WorkflowService: input_data["conv_messages"] = conv_messages init_message_length = len(input_data.get("conv_messages", [])) message_id = uuid.uuid4() + _cycle_items: dict[str, list] = {} # 新会话时写入开场白 is_new_conversation = init_message_length == 0 @@ -948,6 +950,15 @@ class WorkflowService: memory_storage_type=storage_type, user_rag_memory_id=user_rag_memory_id ): + event_type = event.get("event") + event_data = event.get("data", {}) + + if event_type == "cycle_item": + cycle_id = event_data.get("cycle_id") + if cycle_id not in _cycle_items: + _cycle_items[cycle_id] = [] + _cycle_items[cycle_id].append(event_data) + if event.get("event") == "workflow_end": status = event.get("data", {}).get("status") token_usage = event.get("data", {}).get("token_usage", {}) or {} @@ -1019,6 +1030,18 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") + # 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"] + if _cycle_items and execution.output_data: + import copy + new_output_data = copy.deepcopy(execution.output_data) + node_outputs = new_output_data.setdefault("node_outputs", {}) + for cycle_node_id, items in _cycle_items.items(): + if cycle_node_id in node_outputs: + node_outputs[cycle_node_id]["cycle_items"] = items + else: + node_outputs[cycle_node_id] = {"cycle_items": items} + execution.output_data = new_output_data + self.db.commit() elif event.get("event") == "workflow_start": event["data"]["message_id"] = str(message_id) event = self._emit(public, event)