diff --git a/api/app/controllers/app_log_controller.py b/api/app/controllers/app_log_controller.py index ea7962c1..90fbd4ea 100644 --- a/api/app/controllers/app_log_controller.py +++ b/api/app/controllers/app_log_controller.py @@ -41,7 +41,7 @@ def list_app_logs( # 验证应用访问权限 app_service = AppService(db) - app_service.get_app(app_id, workspace_id) + app = app_service.get_app(app_id, workspace_id) # 使用 Service 层查询 log_service = AppLogService(db) @@ -51,7 +51,8 @@ def list_app_logs( page=page, pagesize=pagesize, is_draft=is_draft, - keyword=keyword + keyword=keyword, + app_type=app.type, ) items = [AppLogConversation.model_validate(c) for c in conversations] diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 6ac48ede..ea05db87 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext from app.core.workflow.engine.state_manager import WorkflowStateManager from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer +from app.core.workflow.nodes.base_node import NodeExecutionError logger = logging.getLogger(__name__) @@ -326,10 +327,43 @@ class WorkflowExecutor: logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", exc_info=True) + + # 1) 尝试从 checkpoint 回补已成功节点的 node_outputs + recovered: dict[str, Any] = {} + try: + if self.graph is not None: + recovered = self.graph.get_state( + self.execution_context.checkpoint_config + ).values or {} + except Exception as recover_err: + logger.warning( + f"Recover state on failure failed: {recover_err}, " + f"execution_id={self.execution_context.execution_id}" + ) + if result is None: - result = {"error": str(e)} + result = dict(recovered) if recovered else {} else: - result["error"] = str(e) + # 已有 result 与 recovered 合并,node_outputs 深度合并 + for k, v in recovered.items(): + if k == "node_outputs" and isinstance(v, dict): + existing = result.get("node_outputs") or {} + result["node_outputs"] = {**v, **existing} + else: + result.setdefault(k, v) + + # 2) 如果是节点抛出的 NodeExecutionError,把失败节点的 node_output 注入 node_outputs + failed_node_id: str | None = None + if isinstance(e, NodeExecutionError): + failed_node_id = e.node_id + node_outputs = result.setdefault("node_outputs", {}) + # 不覆盖已有(理论上不会有),保底写入失败节点记录 + node_outputs.setdefault(e.node_id, e.node_output) + + result["error"] = str(e) + if failed_node_id: + result["error_node"] = failed_node_id + yield { "event": "workflow_end", "data": self.result_builder.build_final_output( diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 5458a80c..5d08670a 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -1,5 +1,6 @@ import asyncio import logging +import time import uuid from abc import ABC, abstractmethod from datetime import datetime @@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService logger = logging.getLogger(__name__) +class NodeExecutionError(Exception): + """节点执行失败异常。 + + 携带失败节点的完整 node_output,供 executor 兜底注入 node_outputs, + 保证 workflow_executions.output_data 里能看到失败节点的日志记录。 + """ + + def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str): + super().__init__(f"Node {node_id} execution failed: {error_message}") + self.node_id = node_id + self.node_output = node_output + self.error_message = error_message + + class BaseNode(ABC): """Base class for workflow nodes. @@ -396,6 +411,8 @@ class BaseNode(ABC): "elapsed_time": elapsed_time, "token_usage": token_usage, "error": None, + # 单调递增序号,用于日志按执行顺序排序(JSONB 不保证 key 顺序) + "execution_order": time.monotonic_ns(), **self._extract_extra_fields(business_result), } final_output = { @@ -444,7 +461,9 @@ class BaseNode(ABC): "output": None, "elapsed_time": elapsed_time, "token_usage": None, - "error": error_message + "error": error_message, + # 单调递增序号,用于日志按执行顺序排序 + "execution_order": time.monotonic_ns(), } # if error_edge: @@ -466,7 +485,12 @@ class BaseNode(ABC): **node_output }) logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") - raise Exception(f"Node {self.node_id} execution failed: {error_message}") + # 抛出自定义异常,把 node_output 带给 executor,供其写入 node_outputs + raise NodeExecutionError( + node_id=self.node_id, + node_output=node_output, + error_message=error_message, + ) def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: """Extracts the input data for this node (used for logging or audit). diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 3ee7774f..3ce22ab2 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -174,6 +174,9 @@ class IterationRuntime: continue node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type") cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None + node_cfg = next( + (n for n in self.cycle_nodes if n.get("id") == node_name), None + ) self.event_write({ "type": "cycle_item", "data": { diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index da13ceec..6b117368 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode): case HttpContentType.NONE: return {} case HttpContentType.JSON: - content["json"] = json.loads(self._render_template( + rendered = self._render_template( self.typed_config.body.data, variable_pool - )) + ) + if not rendered or not rendered.strip(): + # 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body + return {} + try: + content["json"] = json.loads(rendered) + except json.JSONDecodeError as e: + raise RuntimeError( + f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})" + ) case HttpContentType.FROM_DATA: data = {} files = [] diff --git a/api/app/repositories/conversation_repository.py b/api/app/repositories/conversation_repository.py index 129e1f02..e3447dbd 100644 --- a/api/app/repositories/conversation_repository.py +++ b/api/app/repositories/conversation_repository.py @@ -1,13 +1,15 @@ import uuid from typing import Optional -from sqlalchemy import select, desc, func +from sqlalchemy import select, desc, func, or_, cast, Text from sqlalchemy.orm import Session from app.core.exceptions import ResourceNotFoundException from app.core.logging_config import get_db_logger from app.models import Conversation, Message +from app.models.app_model import AppType from app.models.conversation_model import ConversationDetail +from app.models.workflow_model import WorkflowExecution logger = get_db_logger() @@ -206,7 +208,8 @@ class ConversationRepository: is_draft: Optional[bool] = None, keyword: Optional[str] = None, page: int = 1, - pagesize: int = 20 + pagesize: int = 20, + app_type: Optional[str] = None, ) -> tuple[list[Conversation], int]: """ 查询应用日志会话列表(带分页和过滤) @@ -218,6 +221,9 @@ class ConversationRepository: keyword: 搜索关键词(匹配消息内容) page: 页码(从 1 开始) pagesize: 每页数量 + app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的 + input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表); + 其他类型仍走 messages 表。 Returns: Tuple[List[Conversation], int]: (会话列表,总数) @@ -234,12 +240,28 @@ class ConversationRepository: # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation if keyword: - # 查找包含关键词的 conversation_id 列表 - keyword_stmt = ( - select(Message.conversation_id) - .where(Message.content.ilike(f"%{keyword}%")) - .distinct() - ) + kw_pattern = f"%{keyword}%" + if app_type == AppType.WORKFLOW: + # 工作流:从 workflow_executions 的 input_data / output_data 匹配 + # (messages 表只存开场白 assistant 消息,失败的工作流也不会写入) + keyword_stmt = ( + select(WorkflowExecution.conversation_id) + .where( + WorkflowExecution.conversation_id.is_not(None), + or_( + cast(WorkflowExecution.input_data, Text).ilike(kw_pattern), + cast(WorkflowExecution.output_data, Text).ilike(kw_pattern), + ), + ) + .distinct() + ) + else: + # Agent 等其他类型:仍走 messages 表(user + assistant 内容) + keyword_stmt = ( + select(Message.conversation_id) + .where(Message.content.ilike(kw_pattern)) + .distinct() + ) base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt)) # Calculate total number of records diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index 7ca05d42..c2cff2a6 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -32,6 +32,7 @@ class AppLogService: pagesize: int = 20, is_draft: Optional[bool] = None, keyword: Optional[str] = None, + app_type: Optional[str] = None, ) -> Tuple[list[Conversation], int]: """ 查询应用日志会话列表 @@ -43,6 +44,7 @@ class AppLogService: pagesize: 每页数量 is_draft: 是否草稿会话(None表示返回全部) keyword: 搜索关键词(匹配消息内容) + app_type: 应用类型(WORKFLOW 时关键词将从 workflow_executions 搜索) Returns: Tuple[list[Conversation], int]: (会话列表,总数) @@ -55,7 +57,8 @@ class AppLogService: "page": page, "pagesize": pagesize, "is_draft": is_draft, - "keyword": keyword + "keyword": keyword, + "app_type": app_type, } ) @@ -66,7 +69,8 @@ class AppLogService: is_draft=is_draft, keyword=keyword, page=page, - pagesize=pagesize + pagesize=pagesize, + app_type=app_type, ) logger.info( @@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod if not output_data: return [] node_outputs: dict = output_data.get("node_outputs") or {} + # 按 execution_order(节点执行时写入的单调递增序号)排序。 + # PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序; + # 缺失 execution_order 的历史数据退化到 0,保持在最前。 + ordered_items = sorted( + node_outputs.items(), + key=lambda kv: (kv[1] or {}).get("execution_order", 0) + if isinstance(kv[1], dict) else 0 + ) result = [] - for node_id, node_data in node_outputs.items(): + for node_id, node_data in ordered_items: if not isinstance(node_data, dict): continue output = dict(node_data) @@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod inp = output.pop("input", None) elapsed_time = output.pop("elapsed_time", None) token_usage = output.pop("token_usage", None) + # execution_order 仅用于排序,不返回给前端 + output.pop("execution_order", None) result.append(AppLogNodeExecution( node_id=node_id, node_type=node_type,