refactor(workflow): streamline node execution handling and log service logic
- Consolidate node data retrieval from workflow_executions.output_data to unify storage access. - Optimize the construction of messages and execution records to support opening suggestions. - Eliminate redundant queries and storage logic to simplify the overall codebase structure.
This commit is contained in:
@@ -182,6 +182,7 @@ class IterationRuntime:
|
||||
"node_id": node_name,
|
||||
"node_type": node_type,
|
||||
"node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name,
|
||||
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
|
||||
"input": result.get("node_outputs", {}).get(node_name, {}).get("input")
|
||||
if not cycle_variable else cycle_variable,
|
||||
"output": result.get("node_outputs", {}).get(node_name, {}).get("output")
|
||||
|
||||
@@ -212,6 +212,7 @@ class LoopRuntime:
|
||||
"node_id": node_name,
|
||||
"node_type": node_type,
|
||||
"node_name": node_name,
|
||||
"status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"),
|
||||
"input": result.get("node_outputs", {}).get(node_name, {}).get("input")
|
||||
if not cycle_variable else cycle_variable,
|
||||
"output": result.get("node_outputs", {}).get(node_name, {}).get("output")
|
||||
|
||||
@@ -115,7 +115,7 @@ class AppLogService:
|
||||
messages = self.message_repository.get_messages_by_conversation(
|
||||
conversation_id=conversation_id
|
||||
)
|
||||
_, node_executions_map = self._get_workflow_node_executions_with_map(
|
||||
node_executions_map = self._get_workflow_node_executions_with_map(
|
||||
conversation_id, messages
|
||||
)
|
||||
|
||||
@@ -139,9 +139,9 @@ class AppLogService:
|
||||
工作流应用专用:从 workflow_executions 构建 messages 和节点日志。
|
||||
|
||||
每条 WorkflowExecution 对应一轮对话:
|
||||
- user message:来自 execution.input_data
|
||||
- user message:来自 execution.input_data(content 取 message 字段,files 放 meta_data)
|
||||
- assistant message:来自 execution.output_data(失败时内容为错误信息)
|
||||
节点日志以 execution id 为 key 分组。
|
||||
开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。
|
||||
|
||||
Returns:
|
||||
(messages 列表, node_executions_map)
|
||||
@@ -156,9 +156,44 @@ class AppLogService:
|
||||
)
|
||||
executions = list(self.db.scalars(stmt).all())
|
||||
|
||||
# 查开场白:Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息
|
||||
opening_stmt = (
|
||||
select(Message)
|
||||
.where(
|
||||
Message.conversation_id == conversation_id,
|
||||
Message.role == "assistant",
|
||||
)
|
||||
.order_by(Message.created_at.asc())
|
||||
.limit(10)
|
||||
)
|
||||
early_messages = list(self.db.scalars(opening_stmt).all())
|
||||
suggested_questions: list = []
|
||||
for m in early_messages:
|
||||
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data:
|
||||
suggested_questions = m.meta_data.get("suggested_questions") or []
|
||||
break
|
||||
|
||||
messages: list[AppLogMessage] = []
|
||||
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
|
||||
|
||||
# 如果有开场白,作为第一条 assistant 消息插入
|
||||
if suggested_questions or early_messages:
|
||||
opening_msg = next(
|
||||
(m for m in early_messages
|
||||
if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data),
|
||||
None
|
||||
)
|
||||
if opening_msg:
|
||||
messages.append(AppLogMessage(
|
||||
id=opening_msg.id,
|
||||
conversation_id=conversation_id,
|
||||
role="assistant",
|
||||
content=opening_msg.content,
|
||||
status=None,
|
||||
meta_data={"suggested_questions": suggested_questions},
|
||||
created_at=opening_msg.created_at,
|
||||
))
|
||||
|
||||
for execution in executions:
|
||||
started_at = execution.started_at or dt.datetime.now()
|
||||
completed_at = execution.completed_at or started_at
|
||||
@@ -167,13 +202,20 @@ class AppLogService:
|
||||
assistant_msg_id = uuid.uuid5(execution.id, "assistant")
|
||||
|
||||
# --- user message(输入)---
|
||||
input_content = _extract_text(execution.input_data)
|
||||
input_data = execution.input_data or {}
|
||||
input_content = input_data.get("message") or _extract_text(input_data)
|
||||
|
||||
# 跳过没有用户输入的 execution(如开场白触发的记录)
|
||||
if not input_content or not input_content.strip():
|
||||
continue
|
||||
|
||||
files = input_data.get("files") or []
|
||||
user_msg = AppLogMessage(
|
||||
id=uuid.uuid5(execution.id, "user"),
|
||||
conversation_id=conversation_id,
|
||||
role="user",
|
||||
content=input_content,
|
||||
meta_data=None,
|
||||
meta_data={"files": files} if files else None,
|
||||
created_at=started_at,
|
||||
)
|
||||
messages.append(user_msg)
|
||||
@@ -197,24 +239,8 @@ class AppLogService:
|
||||
)
|
||||
messages.append(assistant_msg)
|
||||
|
||||
# --- 节点执行记录,key 与 assistant message id 一致 ---
|
||||
execution_nodes = []
|
||||
for node_exec in execution.node_executions:
|
||||
output_data = dict(node_exec.output_data or {})
|
||||
cycle_items = output_data.pop("cycle_items", None)
|
||||
execution_nodes.append(AppLogNodeExecution(
|
||||
node_id=node_exec.node_id,
|
||||
node_type=node_exec.node_type,
|
||||
node_name=node_exec.node_name,
|
||||
status=node_exec.status,
|
||||
error=node_exec.error_message,
|
||||
input=node_exec.input_data,
|
||||
process=None,
|
||||
output=output_data,
|
||||
cycle_items=cycle_items,
|
||||
elapsed_time=node_exec.elapsed_time,
|
||||
token_usage=node_exec.token_usage,
|
||||
))
|
||||
# --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 ---
|
||||
execution_nodes = _build_nodes_from_output_data(execution.output_data)
|
||||
|
||||
if execution_nodes:
|
||||
node_executions_map[str(assistant_msg_id)] = execution_nodes
|
||||
@@ -225,7 +251,7 @@ class AppLogService:
|
||||
self,
|
||||
conversation_id: uuid.UUID,
|
||||
messages: list[Message]
|
||||
) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
|
||||
) -> dict[str, list[AppLogNodeExecution]]:
|
||||
"""
|
||||
从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组
|
||||
|
||||
@@ -237,7 +263,6 @@ class AppLogService:
|
||||
Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]:
|
||||
(所有节点执行记录列表, 按 message_id 分组的节点执行记录字典)
|
||||
"""
|
||||
node_executions = []
|
||||
node_executions_map: dict[str, list[AppLogNodeExecution]] = {}
|
||||
|
||||
# 查询该会话关联的所有工作流执行记录(按时间正序)
|
||||
@@ -268,26 +293,8 @@ class AppLogService:
|
||||
used_message_ids: set[str] = set()
|
||||
|
||||
for execution in executions:
|
||||
# 构建节点执行记录列表
|
||||
execution_nodes = []
|
||||
for node_exec in execution.node_executions:
|
||||
output_data = dict(node_exec.output_data or {})
|
||||
cycle_items = output_data.pop("cycle_items", None)
|
||||
node_execution = AppLogNodeExecution(
|
||||
node_id=node_exec.node_id,
|
||||
node_type=node_exec.node_type,
|
||||
node_name=node_exec.node_name,
|
||||
status=node_exec.status,
|
||||
error=node_exec.error_message,
|
||||
input=node_exec.input_data,
|
||||
process=None,
|
||||
output=output_data,
|
||||
cycle_items=cycle_items,
|
||||
elapsed_time=node_exec.elapsed_time,
|
||||
token_usage=node_exec.token_usage,
|
||||
)
|
||||
node_executions.append(node_execution)
|
||||
execution_nodes.append(node_execution)
|
||||
# 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取
|
||||
execution_nodes = _build_nodes_from_output_data(execution.output_data)
|
||||
|
||||
if not execution_nodes:
|
||||
continue
|
||||
@@ -318,7 +325,7 @@ class AppLogService:
|
||||
used_message_ids.add(msg_id_str)
|
||||
node_executions_map[msg_id_str] = execution_nodes
|
||||
|
||||
return node_executions, node_executions_map
|
||||
return node_executions_map
|
||||
|
||||
|
||||
def _extract_text(data: Optional[dict]) -> str:
|
||||
@@ -328,8 +335,64 @@ def _extract_text(data: Optional[dict]) -> str:
|
||||
"""
|
||||
if not data:
|
||||
return ""
|
||||
for key in ("text", "content", "output", "result", "answer"):
|
||||
for key in ("message", "text", "content", "output", "result", "answer"):
|
||||
if key in data and isinstance(data[key], str):
|
||||
return data[key]
|
||||
import json
|
||||
return json.dumps(data, ensure_ascii=False)
|
||||
|
||||
|
||||
def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]:
|
||||
"""从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。
|
||||
|
||||
output_data 结构:
|
||||
{
|
||||
"node_outputs": {
|
||||
"<node_id>": {
|
||||
"node_type": ...,
|
||||
"node_name": ...,
|
||||
"status": ...,
|
||||
"input": ...,
|
||||
"output": ...,
|
||||
"elapsed_time": ...,
|
||||
"token_usage": ...,
|
||||
"error": ...,
|
||||
"cycle_items": [...],
|
||||
...
|
||||
}
|
||||
},
|
||||
"error": ...,
|
||||
...
|
||||
}
|
||||
"""
|
||||
if not output_data:
|
||||
return []
|
||||
node_outputs: dict = output_data.get("node_outputs") or {}
|
||||
result = []
|
||||
for node_id, node_data in node_outputs.items():
|
||||
if not isinstance(node_data, dict):
|
||||
continue
|
||||
output = dict(node_data)
|
||||
cycle_items = output.pop("cycle_items", None)
|
||||
# 把已知的顶层字段剥离,剩余的作为 output
|
||||
node_type = output.pop("node_type", "unknown")
|
||||
node_name = output.pop("node_name", None)
|
||||
status = output.pop("status", "completed")
|
||||
error = output.pop("error", None)
|
||||
inp = output.pop("input", None)
|
||||
elapsed_time = output.pop("elapsed_time", None)
|
||||
token_usage = output.pop("token_usage", None)
|
||||
result.append(AppLogNodeExecution(
|
||||
node_id=node_id,
|
||||
node_type=node_type,
|
||||
node_name=node_name,
|
||||
status=status,
|
||||
error=error,
|
||||
input=inp,
|
||||
process=None,
|
||||
output=output if output else None,
|
||||
cycle_items=cycle_items,
|
||||
elapsed_time=elapsed_time,
|
||||
token_usage=token_usage,
|
||||
))
|
||||
return result
|
||||
|
||||
@@ -910,7 +910,6 @@ class WorkflowService:
|
||||
input_data["conv_messages"] = conv_messages
|
||||
init_message_length = len(input_data.get("conv_messages", []))
|
||||
message_id = uuid.uuid4()
|
||||
_node_order_counter = 0
|
||||
_cycle_items: dict[str, list] = {}
|
||||
|
||||
# 新会话时写入开场白
|
||||
@@ -1015,32 +1014,17 @@ class WorkflowService:
|
||||
)
|
||||
else:
|
||||
logger.error(f"unexpect workflow run status, status: {status}")
|
||||
# 把积累的 cycle_item 写入对应循环节点的 output_data
|
||||
if _cycle_items:
|
||||
# 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"]
|
||||
if _cycle_items and execution.output_data:
|
||||
import copy
|
||||
new_output_data = copy.deepcopy(execution.output_data)
|
||||
node_outputs = new_output_data.setdefault("node_outputs", {})
|
||||
for cycle_node_id, items in _cycle_items.items():
|
||||
node_exec = self.db.execute(
|
||||
select(WorkflowNodeExecution).where(
|
||||
WorkflowNodeExecution.execution_id == execution.id,
|
||||
WorkflowNodeExecution.node_id == cycle_node_id
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if node_exec:
|
||||
node_exec.output_data = {
|
||||
**(node_exec.output_data or {}),
|
||||
"cycle_items": items
|
||||
}
|
||||
if cycle_node_id in node_outputs:
|
||||
node_outputs[cycle_node_id]["cycle_items"] = items
|
||||
else:
|
||||
node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {})
|
||||
self.db.add(WorkflowNodeExecution(
|
||||
execution_id=execution.id,
|
||||
node_id=cycle_node_id,
|
||||
node_type=node_cfg.get("type", "cycle"),
|
||||
node_name=node_cfg.get("data", {}).get("label") or cycle_node_id,
|
||||
execution_order=_node_order_counter,
|
||||
status="completed",
|
||||
output_data={"cycle_items": items},
|
||||
))
|
||||
_node_order_counter += 1
|
||||
node_outputs[cycle_node_id] = {"cycle_items": items}
|
||||
execution.output_data = new_output_data
|
||||
self.db.commit()
|
||||
elif event.get("event") == "workflow_start":
|
||||
event["data"]["message_id"] = str(message_id)
|
||||
|
||||
Reference in New Issue
Block a user