Merge remote-tracking branch 'origin/fix/wxy-032' into fix/wxy-032

This commit is contained in:
wxy
2026-04-27 15:29:54 +08:00
7 changed files with 126 additions and 19 deletions

View File

@@ -41,7 +41,7 @@ def list_app_logs(
# 验证应用访问权限
app_service = AppService(db)
app_service.get_app(app_id, workspace_id)
app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询
log_service = AppLogService(db)
@@ -51,7 +51,8 @@ def list_app_logs(
page=page,
pagesize=pagesize,
is_draft=is_draft,
keyword=keyword
keyword=keyword,
app_type=app.type,
)
items = [AppLogConversation.model_validate(c) for c in conversations]

View File

@@ -16,6 +16,7 @@ from app.core.workflow.engine.runtime_schema import ExecutionContext
from app.core.workflow.engine.state_manager import WorkflowStateManager
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
from app.core.workflow.engine.variable_pool import VariablePool, VariablePoolInitializer
from app.core.workflow.nodes.base_node import NodeExecutionError
logger = logging.getLogger(__name__)
@@ -326,10 +327,43 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True)
# 1) 尝试从 checkpoint 回补已成功节点的 node_outputs
recovered: dict[str, Any] = {}
try:
if self.graph is not None:
recovered = self.graph.get_state(
self.execution_context.checkpoint_config
).values or {}
except Exception as recover_err:
logger.warning(
f"Recover state on failure failed: {recover_err}, "
f"execution_id={self.execution_context.execution_id}"
)
if result is None:
result = {"error": str(e)}
result = dict(recovered) if recovered else {}
else:
result["error"] = str(e)
# 已有 result 与 recovered 合并node_outputs 深度合并
for k, v in recovered.items():
if k == "node_outputs" and isinstance(v, dict):
existing = result.get("node_outputs") or {}
result["node_outputs"] = {**v, **existing}
else:
result.setdefault(k, v)
# 2) 如果是节点抛出的 NodeExecutionError把失败节点的 node_output 注入 node_outputs
failed_node_id: str | None = None
if isinstance(e, NodeExecutionError):
failed_node_id = e.node_id
node_outputs = result.setdefault("node_outputs", {})
# 不覆盖已有(理论上不会有),保底写入失败节点记录
node_outputs.setdefault(e.node_id, e.node_output)
result["error"] = str(e)
if failed_node_id:
result["error_node"] = failed_node_id
yield {
"event": "workflow_end",
"data": self.result_builder.build_final_output(

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
import time
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
@@ -22,6 +23,20 @@ from app.services.multimodal_service import MultimodalService
logger = logging.getLogger(__name__)
class NodeExecutionError(Exception):
"""节点执行失败异常。
携带失败节点的完整 node_output供 executor 兜底注入 node_outputs
保证 workflow_executions.output_data 里能看到失败节点的日志记录。
"""
def __init__(self, node_id: str, node_output: dict[str, Any], error_message: str):
super().__init__(f"Node {node_id} execution failed: {error_message}")
self.node_id = node_id
self.node_output = node_output
self.error_message = error_message
class BaseNode(ABC):
"""Base class for workflow nodes.
@@ -396,6 +411,8 @@ class BaseNode(ABC):
"elapsed_time": elapsed_time,
"token_usage": token_usage,
"error": None,
# 单调递增序号用于日志按执行顺序排序JSONB 不保证 key 顺序)
"execution_order": time.monotonic_ns(),
**self._extract_extra_fields(business_result),
}
final_output = {
@@ -444,7 +461,9 @@ class BaseNode(ABC):
"output": None,
"elapsed_time": elapsed_time,
"token_usage": None,
"error": error_message
"error": error_message,
# 单调递增序号,用于日志按执行顺序排序
"execution_order": time.monotonic_ns(),
}
# if error_edge:
@@ -466,7 +485,12 @@ class BaseNode(ABC):
**node_output
})
logger.error(f"Node {self.node_id} execution failed, stopping workflow: {error_message}")
raise Exception(f"Node {self.node_id} execution failed: {error_message}")
# 抛出自定义异常,把 node_output 带给 executor供其写入 node_outputs
raise NodeExecutionError(
node_id=self.node_id,
node_output=node_output,
error_message=error_message,
)
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
"""Extracts the input data for this node (used for logging or audit).

View File

@@ -174,6 +174,9 @@ class IterationRuntime:
continue
node_type = result.get("node_outputs", {}).get(node_name, {}).get("node_type")
cycle_variable = {"item": item} if node_type == NodeType.CYCLE_START else None
node_cfg = next(
(n for n in self.cycle_nodes if n.get("id") == node_name), None
)
self.event_write({
"type": "cycle_item",
"data": {

View File

@@ -255,9 +255,18 @@ class HttpRequestNode(BaseNode):
case HttpContentType.NONE:
return {}
case HttpContentType.JSON:
content["json"] = json.loads(self._render_template(
rendered = self._render_template(
self.typed_config.body.data, variable_pool
))
)
if not rendered or not rendered.strip():
# 第三方导入的工作流可能出现 content_type=json 但 data 为空的情况,视为无 body
return {}
try:
content["json"] = json.loads(rendered)
except json.JSONDecodeError as e:
raise RuntimeError(
f"Invalid JSON body for HTTP request node: {e.msg} (data={rendered!r})"
)
case HttpContentType.FROM_DATA:
data = {}
files = []

View File

@@ -1,13 +1,15 @@
import uuid
from typing import Optional
from sqlalchemy import select, desc, func
from sqlalchemy import select, desc, func, or_, cast, Text
from sqlalchemy.orm import Session
from app.core.exceptions import ResourceNotFoundException
from app.core.logging_config import get_db_logger
from app.models import Conversation, Message
from app.models.app_model import AppType
from app.models.conversation_model import ConversationDetail
from app.models.workflow_model import WorkflowExecution
logger = get_db_logger()
@@ -206,7 +208,8 @@ class ConversationRepository:
is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
page: int = 1,
pagesize: int = 20
pagesize: int = 20,
app_type: Optional[str] = None,
) -> tuple[list[Conversation], int]:
"""
查询应用日志会话列表(带分页和过滤)
@@ -218,6 +221,9 @@ class ConversationRepository:
keyword: 搜索关键词(匹配消息内容)
page: 页码(从 1 开始)
pagesize: 每页数量
app_type: 应用类型。WORKFLOW 类型改用 workflow_executions 的
input_data/output_data 做关键词过滤(因为失败的工作流不会写入 messages 表);
其他类型仍走 messages 表。
Returns:
Tuple[List[Conversation], int]: (会话列表,总数)
@@ -234,12 +240,28 @@ class ConversationRepository:
# 如果有关键词搜索,通过子查询过滤包含该关键词的 conversation
if keyword:
# 查找包含关键词的 conversation_id 列表
keyword_stmt = (
select(Message.conversation_id)
.where(Message.content.ilike(f"%{keyword}%"))
.distinct()
)
kw_pattern = f"%{keyword}%"
if app_type == AppType.WORKFLOW:
# 工作流:从 workflow_executions 的 input_data / output_data 匹配
# messages 表只存开场白 assistant 消息,失败的工作流也不会写入)
keyword_stmt = (
select(WorkflowExecution.conversation_id)
.where(
WorkflowExecution.conversation_id.is_not(None),
or_(
cast(WorkflowExecution.input_data, Text).ilike(kw_pattern),
cast(WorkflowExecution.output_data, Text).ilike(kw_pattern),
),
)
.distinct()
)
else:
# Agent 等其他类型:仍走 messages 表user + assistant 内容)
keyword_stmt = (
select(Message.conversation_id)
.where(Message.content.ilike(kw_pattern))
.distinct()
)
base_stmt = base_stmt.where(Conversation.id.in_(keyword_stmt))
# Calculate total number of records

View File

@@ -32,6 +32,7 @@ class AppLogService:
pagesize: int = 20,
is_draft: Optional[bool] = None,
keyword: Optional[str] = None,
app_type: Optional[str] = None,
) -> Tuple[list[Conversation], int]:
"""
查询应用日志会话列表
@@ -43,6 +44,7 @@ class AppLogService:
pagesize: 每页数量
is_draft: 是否草稿会话None表示返回全部
keyword: 搜索关键词(匹配消息内容)
app_type: 应用类型WORKFLOW 时关键词将从 workflow_executions 搜索)
Returns:
Tuple[list[Conversation], int]: (会话列表,总数)
@@ -55,7 +57,8 @@ class AppLogService:
"page": page,
"pagesize": pagesize,
"is_draft": is_draft,
"keyword": keyword
"keyword": keyword,
"app_type": app_type,
}
)
@@ -66,7 +69,8 @@ class AppLogService:
is_draft=is_draft,
keyword=keyword,
page=page,
pagesize=pagesize
pagesize=pagesize,
app_type=app_type,
)
logger.info(
@@ -368,8 +372,16 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
if not output_data:
return []
node_outputs: dict = output_data.get("node_outputs") or {}
# 按 execution_order节点执行时写入的单调递增序号排序。
# PostgreSQL JSONB 不保证 key 顺序,不能依赖 dict 插入顺序;
# 缺失 execution_order 的历史数据退化到 0保持在最前。
ordered_items = sorted(
node_outputs.items(),
key=lambda kv: (kv[1] or {}).get("execution_order", 0)
if isinstance(kv[1], dict) else 0
)
result = []
for node_id, node_data in node_outputs.items():
for node_id, node_data in ordered_items:
if not isinstance(node_data, dict):
continue
output = dict(node_data)
@@ -382,6 +394,8 @@ def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNod
inp = output.pop("input", None)
elapsed_time = output.pop("elapsed_time", None)
token_usage = output.pop("token_usage", None)
# execution_order 仅用于排序,不返回给前端
output.pop("execution_order", None)
result.append(AppLogNodeExecution(
node_id=node_id,
node_type=node_type,