feat(workflow): augment logging capabilities with execution status and loop support

- Augment workflow logs with execution status fields and loop node information.
- Refactor log service to handle distinct processing logic for workflows and agents.
- Construct message and node logs derived from workflow_executions data.
This commit is contained in:
wwq
2026-04-24 17:02:03 +08:00
parent cedf47b3bc
commit cf8db47389
5 changed files with 158 additions and 40 deletions

View File

@@ -9,7 +9,7 @@ from app.core.logging_config import get_business_logger
from app.core.response_utils import success from app.core.response_utils import success
from app.db import get_db from app.db import get_db
from app.dependencies import get_current_user, cur_workspace_access_guard from app.dependencies import get_current_user, cur_workspace_access_guard
from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail from app.schemas.app_log_schema import AppLogConversation, AppLogConversationDetail, AppLogMessage
from app.schemas.response_schema import PageData, PageMeta from app.schemas.response_schema import PageData, PageMeta
from app.services.app_service import AppService from app.services.app_service import AppService
from app.services.app_log_service import AppLogService from app.services.app_log_service import AppLogService
@@ -78,17 +78,32 @@ def get_app_log_detail(
# 验证应用访问权限 # 验证应用访问权限
app_service = AppService(db) app_service = AppService(db)
app_service.get_app(app_id, workspace_id) app = app_service.get_app(app_id, workspace_id)
# 使用 Service 层查询 # 使用 Service 层查询
log_service = AppLogService(db) log_service = AppLogService(db)
conversation, node_executions_map = log_service.get_conversation_detail( conversation, messages, node_executions_map = log_service.get_conversation_detail(
app_id=app_id, app_id=app_id,
conversation_id=conversation_id, conversation_id=conversation_id,
workspace_id=workspace_id workspace_id=workspace_id,
app_type=app.type
) )
detail = AppLogConversationDetail.model_validate(conversation) # 构建基础会话信息(不经过 ORM relationship
detail.node_executions_map = node_executions_map base = AppLogConversation.model_validate(conversation)
# 单独处理 messages避免触发 SQLAlchemy relationship 校验
if messages and isinstance(messages[0], AppLogMessage):
# 工作流:已经是 AppLogMessage 实例
msg_list = messages
else:
# AgentORM Message 对象逐个转换
msg_list = [AppLogMessage.model_validate(m) for m in messages]
detail = AppLogConversationDetail(
**base.model_dump(),
messages=msg_list,
node_executions_map=node_executions_map,
)
return success(data=detail) return success(data=detail)

View File

@@ -180,6 +180,8 @@ class IterationRuntime:
"cycle_id": self.node_id, "cycle_id": self.node_id,
"cycle_idx": idx, "cycle_idx": idx,
"node_id": node_name, "node_id": node_name,
"node_type": node_type,
"node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name,
"input": result.get("node_outputs", {}).get(node_name, {}).get("input") "input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable, if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output") "output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -210,6 +210,8 @@ class LoopRuntime:
"cycle_id": self.node_id, "cycle_id": self.node_id,
"cycle_idx": idx, "cycle_idx": idx,
"node_id": node_name, "node_id": node_name,
"node_type": node_type,
"node_name": node_name,
"input": result.get("node_outputs", {}).get(node_name, {}).get("input") "input": result.get("node_outputs", {}).get(node_name, {}).get("input")
if not cycle_variable else cycle_variable, if not cycle_variable else cycle_variable,
"output": result.get("node_outputs", {}).get(node_name, {}).get("output") "output": result.get("node_outputs", {}).get(node_name, {}).get("output")

View File

@@ -14,6 +14,7 @@ class AppLogMessage(BaseModel):
conversation_id: uuid.UUID conversation_id: uuid.UUID
role: str = Field(description="角色: user / assistant / system") role: str = Field(description="角色: user / assistant / system")
content: str content: str
status: Optional[str] = Field(default=None, description="执行状态(工作流专用): completed / failed")
meta_data: Optional[Dict[str, Any]] = None meta_data: Optional[Dict[str, Any]] = None
created_at: datetime.datetime created_at: datetime.datetime
@@ -58,6 +59,7 @@ class AppLogNodeExecution(BaseModel):
input: Optional[Any] = None input: Optional[Any] = None
process: Optional[Any] = None process: Optional[Any] = None
output: Optional[Any] = None output: Optional[Any] = None
cycle_items: Optional[List[Any]] = None
elapsed_time: Optional[float] = None elapsed_time: Optional[float] = None
token_usage: Optional[Dict[str, Any]] = None token_usage: Optional[Dict[str, Any]] = None

View File

@@ -1,16 +1,17 @@
"""应用日志服务层""" """应用日志服务层"""
import uuid import uuid
import datetime as dt
from typing import Optional, Tuple from typing import Optional, Tuple
from datetime import datetime
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.models.app_model import AppType
from app.models.conversation_model import Conversation, Message from app.models.conversation_model import Conversation, Message
from app.models.workflow_model import WorkflowExecution from app.models.workflow_model import WorkflowExecution
from app.repositories.conversation_repository import ConversationRepository, MessageRepository from app.repositories.conversation_repository import ConversationRepository, MessageRepository
from app.schemas.app_log_schema import AppLogNodeExecution from app.schemas.app_log_schema import AppLogMessage, AppLogNodeExecution
logger = get_business_logger() logger = get_business_logger()
@@ -83,51 +84,40 @@ class AppLogService:
self, self,
app_id: uuid.UUID, app_id: uuid.UUID,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
workspace_id: uuid.UUID workspace_id: uuid.UUID,
) -> Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: app_type: str = AppType.AGENT
) -> Tuple[Conversation, list, dict[str, list[AppLogNodeExecution]]]:
""" """
查询会话详情(包含消息和工作流节点执行记录) 查询会话详情
Args:
app_id: 应用 ID
conversation_id: 会话 ID
workspace_id: 工作空间 ID
Returns: Returns:
Tuple[Conversation, dict[str, list[AppLogNodeExecution]]]: Tuple[Conversation, list[AppLogMessage|Message], dict[str, list[AppLogNodeExecution]]]
(包含消息的会话对象, 按消息ID分组的节点执行记录)
Raises:
ResourceNotFoundException: 当会话不存在时
""" """
logger.info( logger.info(
"查询应用日志会话详情", "查询应用日志会话详情",
extra={ extra={
"app_id": str(app_id), "app_id": str(app_id),
"conversation_id": str(conversation_id), "conversation_id": str(conversation_id),
"workspace_id": str(workspace_id) "workspace_id": str(workspace_id),
"app_type": app_type
} }
) )
# 查询会话
conversation = self.conversation_repository.get_conversation_for_app_log( conversation = self.conversation_repository.get_conversation_for_app_log(
conversation_id=conversation_id, conversation_id=conversation_id,
app_id=app_id, app_id=app_id,
workspace_id=workspace_id workspace_id=workspace_id
) )
# 查询消息(按时间正序) if app_type == AppType.WORKFLOW:
messages = self.message_repository.get_messages_by_conversation( messages, node_executions_map = self._get_workflow_messages_and_nodes(conversation_id)
conversation_id=conversation_id else:
) messages = self.message_repository.get_messages_by_conversation(
conversation_id=conversation_id
# 将消息附加到会话对象 )
conversation.messages = messages _, node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
# 查询工作流节点执行记录(按消息分组) )
_, node_executions_map = self._get_workflow_node_executions_with_map(
conversation_id, messages
)
logger.info( logger.info(
"查询应用日志会话详情成功", "查询应用日志会话详情成功",
@@ -139,7 +129,97 @@ class AppLogService:
} }
) )
return conversation, node_executions_map return conversation, messages, node_executions_map
def _get_workflow_messages_and_nodes(
self,
conversation_id: uuid.UUID,
) -> Tuple[list[AppLogMessage], dict[str, list[AppLogNodeExecution]]]:
"""
工作流应用专用:从 workflow_executions 构建 messages 和节点日志。
每条 WorkflowExecution 对应一轮对话:
- user message来自 execution.input_data
- assistant message来自 execution.output_data失败时内容为错误信息
节点日志以 execution id 为 key 分组。
Returns:
(messages 列表, node_executions_map)
"""
stmt = (
select(WorkflowExecution)
.where(
WorkflowExecution.conversation_id == conversation_id,
WorkflowExecution.status.in_(["completed", "failed"])
)
.order_by(WorkflowExecution.started_at.asc())
)
executions = list(self.db.scalars(stmt).all())
messages: list[AppLogMessage] = []
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
for execution in executions:
started_at = execution.started_at or dt.datetime.now()
completed_at = execution.completed_at or started_at
# assistant message 的 id同时作为 node_executions_map 的 key
assistant_msg_id = uuid.uuid5(execution.id, "assistant")
# --- user message输入---
input_content = _extract_text(execution.input_data)
user_msg = AppLogMessage(
id=uuid.uuid5(execution.id, "user"),
conversation_id=conversation_id,
role="user",
content=input_content,
meta_data=None,
created_at=started_at,
)
messages.append(user_msg)
# --- assistant message输出---
if execution.status == "completed":
output_content = _extract_text(execution.output_data)
meta = {"usage": execution.token_usage or {}, "elapsed_time": execution.elapsed_time}
else:
output_content = _extract_text(execution.output_data) or ""
meta = {"error": execution.error_message, "error_node_id": execution.error_node_id}
assistant_msg = AppLogMessage(
id=assistant_msg_id,
conversation_id=conversation_id,
role="assistant",
content=output_content,
status=execution.status,
meta_data=meta,
created_at=completed_at,
)
messages.append(assistant_msg)
# --- 节点执行记录key 与 assistant message id 一致 ---
execution_nodes = []
for node_exec in execution.node_executions:
output_data = dict(node_exec.output_data or {})
cycle_items = output_data.pop("cycle_items", None)
execution_nodes.append(AppLogNodeExecution(
node_id=node_exec.node_id,
node_type=node_exec.node_type,
node_name=node_exec.node_name,
status=node_exec.status,
error=node_exec.error_message,
input=node_exec.input_data,
process=None,
output=output_data,
cycle_items=cycle_items,
elapsed_time=node_exec.elapsed_time,
token_usage=node_exec.token_usage,
))
if execution_nodes:
node_executions_map[str(assistant_msg_id)] = execution_nodes
return messages, node_executions_map
def _get_workflow_node_executions_with_map( def _get_workflow_node_executions_with_map(
self, self,
@@ -191,6 +271,8 @@ class AppLogService:
# 构建节点执行记录列表 # 构建节点执行记录列表
execution_nodes = [] execution_nodes = []
for node_exec in execution.node_executions: for node_exec in execution.node_executions:
output_data = dict(node_exec.output_data or {})
cycle_items = output_data.pop("cycle_items", None)
node_execution = AppLogNodeExecution( node_execution = AppLogNodeExecution(
node_id=node_exec.node_id, node_id=node_exec.node_id,
node_type=node_exec.node_type, node_type=node_exec.node_type,
@@ -199,7 +281,8 @@ class AppLogService:
error=node_exec.error_message, error=node_exec.error_message,
input=node_exec.input_data, input=node_exec.input_data,
process=None, process=None,
output=node_exec.output_data, output=output_data,
cycle_items=cycle_items,
elapsed_time=node_exec.elapsed_time, elapsed_time=node_exec.elapsed_time,
token_usage=node_exec.token_usage, token_usage=node_exec.token_usage,
) )
@@ -223,9 +306,9 @@ class AppLogService:
if msg_id_str in used_message_ids: if msg_id_str in used_message_ids:
continue continue
if msg.created_at and msg.created_at >= execution.started_at: if msg.created_at and msg.created_at >= execution.started_at:
dt = (msg.created_at - execution.started_at).total_seconds() delta = (msg.created_at - execution.started_at).total_seconds()
if best_dt is None or dt < best_dt: if best_dt is None or delta < best_dt:
best_dt = dt best_dt = delta
best_msg = msg best_msg = msg
if not best_msg: if not best_msg:
@@ -236,3 +319,17 @@ class AppLogService:
node_executions_map[msg_id_str] = execution_nodes node_executions_map[msg_id_str] = execution_nodes
return node_executions, node_executions_map return node_executions, node_executions_map
def _extract_text(data: Optional[dict]) -> str:
"""从 workflow execution 的 input_data / output_data 中提取可读文本。
优先取 'text''content''output' 字段;若都没有则 JSON 序列化整个 dict。
"""
if not data:
return ""
for key in ("text", "content", "output", "result", "answer"):
if key in data and isinstance(data[key], str):
return data[key]
import json
return json.dumps(data, ensure_ascii=False)