Merge pull request #1012 from SuanmoSuanyangTechnology/fix/wxy-032

feat(workflow): augment logging queries and ameliorate error handling
This commit is contained in:
山程漫悟
2026-04-27 16:00:49 +08:00
committed by GitHub
7 changed files with 126 additions and 19 deletions

View File

@@ -41,7 +41,7 @@ def list_app_logs(
# 验证应用访问权限 # 验证应用访问权限
app_service = AppService(db) app_service = AppService(db)
app_service.get_app(app_id, workspace_id) app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询 # 使用 Service 层查询
log_service = AppLogService(db) log_service = AppLogService(db)
@@ -51,7 +51,8 @@ def list_app_logs(
page=page, page=page,
pagesize=pagesize, pagesize=pagesize,
is_draft=is_draft, is_draft=is_draft,
keyword=keyword keyword=keyword,
app_type=app.type,
) )
items = [AppLogConversation.model_validate(c) for c in conversations] items = [AppLogConversation.model_validate(c) for c in conversations]

View File

@@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext
from app.core.workflow.engine.state_manager import WorkflowStateManager from app.core.workflow.engine.state_manager import WorkflowStateManager
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer
from app.core.workflow.nodes.base_node import NodeExecutionError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -326,10 +327,43 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True) exc_info=True)
# 1) 尝试从 checkpoint 回补已成功节点的 node_outputs
recovered: dict[str, Any] = {}
try:
if self.graph is not None:
recovered = self.graph.get_state(
self.execution_context.checkpoint_config
).values or {}
except Exception as recover_err:
logger.warning(
f"Recover state on failure failed: {recover_err}, "
f"execution_id={self.execution_context.execution_id}"
)
if result is None: if result is None:
result = {"error": str(e)} result = dict(recovered) if recovered else {}
else: else:
result["error"] = str(e) # 已有 result 与 recovered 合并node_outputs 深度合并
for k, v in recovered.items():
if k == "node_outputs" and isinstance(v, dict):
existing = result.get("node_outputs") or {}
result["node_outputs"] = {**v, **existing}
else:
result.setdefault(k, v)
# 2) 如果是节点抛出的 NodeExecutionError把失败节点的 node_output 注入 node_outputs
failed_node_id: str | None = None
if isinstance(e, NodeExecutionError):
failed_node_id = e.node_id
node_outputs = result.setdefault("node_outputs", {})
# 不覆盖已有(理论上不会有),保底写入失败节点记录
node_outputs.setdefault(e.node_id, e.node_output)
result["error"] = str(e)
if failed_node_id:
result["error_node"] = failed_node_id
yield { yield {
"event": "workflow_end", "event": "workflow_end",
"data": self.result_builder.build_final_output( "data": self.result_builder.build_final_output(

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import time
import uuid import uuid
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
@@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NodeExecutionError(Exception):
"""节点执行失败异常。
携带失败节点的完整 node_output供 executor 兜底注入 node_outputs
保证 workflow_executions.output_data 里能看到失败节点的日志记录。
"""
def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str):
super().__init__(f"Node {node_id} execution failed: {error_message}")
self.node_id = node_id
self.node_output = node_output
self.error_message = error_message
class BaseNode(ABC): class BaseNode(ABC):
"""Base class for workflow nodes. """Base class for workflow nodes.
@@ -396,6 +411,8 @@ class BaseNode(ABC):
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"token_usage": token_usage, "token_usage": token_usage,
"error": None, "error": None,
# 单调递增序号用于日志按执行顺序排序JSONB 不保证 key 顺序)
"execution_order": time.monotonic_ns(),
**self._extract_extra_fields(business_result), **self._extract_extra_fields(business_result),
} }
final_output = { final_output = {
@@ -444,7 +461,9 @@ class BaseNode(ABC):
"output": None, "output": None,
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"token_usage": None, "token_usage": None,
"error": error_message "error": error_message,
# 单调递增序号,用于日志按执行顺序排序
"execution_order": time.monotonic_ns(),
} }
# if error_edge: # if error_edge:
@@ -466,7 +485,12 @@ class BaseNode(ABC):
**node_output **node_output
}) })
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}") logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
raise Exception(f"Node {self.node_id} execution failed: {error_message}") # 抛出自定义异常,把 node_output 带给 executor供其写入 node_outputs
raise NodeExecutionError(
node_id=self.node_id,
node_output=node_output,
error_message=error_message,
)
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
"""Extracts the input data for this node (used for logging or audit). """Extracts the input data for this node (used for logging or audit).

View File

@@ -174,6 +174,9 @@ class IterationRuntime:
continue continue
node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type") node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type")
cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None
node_cfg = next(
(n for n in self.cycle_nodes if n.get("id") == node_name), None
)
self.event_write({ self.event_write({
"type": "cycle_item", "type": "cycle_item",
"data": { "data": {

View File

@@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode):
case HttpContentType.NONE: case HttpContentType.NONE:
return {} return {}
case HttpContentType.JSON: case HttpContentType.JSON:
content["json"] = json.loads(self._render_template( rendered = self._render_template(
self.typed_config.body.data, variable_pool self.typed_config.body.data, variable_pool
)) )
if not rendered or not rendered.strip():
# 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body
return {}
try:
content["json"] = json.loads(rendered)
except json.JSONDecodeError as e:
raise RuntimeError(
f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})"
)
case HttpContentType.FROM_DATA: case HttpContentType.FROM_DATA:
data = {} data = {}
files = [] files = []

View File

@@ -1,13 +1,15 @@
import uuid import uuid
from typing import Optional from typing import Optional
from sqlalchemy import select, desc, func from sqlalchemy import select, desc, func, or_, cast, Text
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.exceptions import ResourceNotFoundException from app.core.exceptions import ResourceNotFoundException
from app.core.logging_config import get_db_logger from app.core.logging_config import get_db_logger
from app.models import Conversation, Message from app.models import Conversation, Message
from app.models.app_model import AppType
from app.models.conversation_model import ConversationDetail from app.models.conversation_model import ConversationDetail
from app.models.workflow_model import WorkflowExecution
logger = get_db_logger() logger = get_db_logger()
@@ -206,7 +208,8 @@ class ConversationRepository:
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
page: int = 1, page: int = 1,
pagesize: int = 20 pagesize: int = 20,
app_type: Optional[str] = None,
) -> tuple[list[Conversation], int]: ) -> tuple[list[Conversation], int]:
""" """
查询应用日志会话列表(带分页和过滤) 查询应用日志会话列表(带分页和过滤)
@@ -218,6 +221,9 @@ class ConversationRepository:
keyword: 搜索关键词(匹配消息内容) keyword: 搜索关键词(匹配消息内容)
page: 页码(从 1 开始) page: 页码(从 1 开始)
pagesize: 每页数量 pagesize: 每页数量
app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
其他类型仍走 messages 表。
Returns: Returns:
Tuple[List[Conversation], int]: (会话列表,总数) Tuple[List[Conversation], int]: (会话列表,总数)
@@ -234,12 +240,28 @@ class ConversationRepository:
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation # 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
if keyword: if keyword:
# 查找包含关键词的 conversation_id 列表 kw_pattern = f"%{keyword}%"
keyword_stmt = ( if app_type == AppType.WORKFLOW:
select(Message.conversation_id) # 工作流:从 workflow_executions 的 input_data / output_data 匹配
.where(Message.content.ilike(f"%{keyword}%")) # messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
.distinct() keyword_stmt = (
) select(WorkflowExecution.conversation_id)
.where(
WorkflowExecution.conversation_id.is_not(None),
or_(
cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
),
)
.distinct()
)
else:
# Agent 等其他类型:仍走 messages 表user + assistant 内容)
keyword_stmt = (
select(Message.conversation_id)
.where(Message.content.ilike(kw_pattern))
.distinct()
)
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt)) base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
# Calculate total number of records # Calculate total number of records

View File

@@ -32,6 +32,7 @@ class AppLogService:
pagesize: int = 20, pagesize: int = 20,
is_draft: Optional[bool] = None, is_draft: Optional[bool] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
app_type: Optional[str] = None,
) -> Tuple[list[Conversation], int]: ) -> Tuple[list[Conversation], int]:
""" """
查询应用日志会话列表 查询应用日志会话列表
@@ -43,6 +44,7 @@ class AppLogService:
pagesize: 每页数量 pagesize: 每页数量
is_draft: 是否草稿会话None表示返回全部 is_draft: 是否草稿会话None表示返回全部
keyword: 搜索关键词(匹配消息内容) keyword: 搜索关键词(匹配消息内容)
app_type: 应用类型WORKFLOW 时关键词将从 workflow_executions 搜索)
Returns: Returns:
Tuple[list[Conversation], int]: (会话列表,总数) Tuple[list[Conversation], int]: (会话列表,总数)
@@ -55,7 +57,8 @@ class AppLogService:
"page": page, "page": page,
"pagesize": pagesize, "pagesize": pagesize,
"is_draft": is_draft, "is_draft": is_draft,
"keyword": keyword "keyword": keyword,
"app_type": app_type,
} }
) )
@@ -66,7 +69,8 @@ class AppLogService:
is_draft=is_draft, is_draft=is_draft,
keyword=keyword, keyword=keyword,
page=page, page=page,
pagesize=pagesize pagesize=pagesize,
app_type=app_type,
) )
logger.info( logger.info(
@@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
if not output_data: if not output_data:
return [] return []
node_outputs: dict = output_data.get("node_outputs") or {} node_outputs: dict = output_data.get("node_outputs") or {}
# 按 execution_order节点执行时写入的单调递增序号排序。
# PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序;
# 缺失 execution_order 的历史数据退化到 0保持在最前。
ordered_items = sorted(
node_outputs.items(),
key=lambda kv: (kv[1] or {}).get("execution_order", 0)
if isinstance(kv[1], dict) else 0
)
result = [] result = []
for node_id, node_data in node_outputs.items(): for node_id, node_data in ordered_items:
if not isinstance(node_data, dict): if not isinstance(node_data, dict):
continue continue
output = dict(node_data) output = dict(node_data)
@@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
inp = output.pop("input", None) inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None) elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None) token_usage = output.pop("token_usage", None)
# execution_order 仅用于排序,不返回给前端
output.pop("execution_order", None)
result.append(AppLogNodeExecution( result.append(AppLogNodeExecution(
node_id=node_id, node_id=node_id,
node_type=node_type, node_type=node_type,