From 21eb5006809dd7ce5b9c489ba7d657e61cd0d40c Mon Sep 17 00:00:00 2001 From: wxy Date: Fri, 24 Apr 2026 18:20:14 +0800 Subject: [PATCH] refactor(workflow): streamline node execution handling and log service logic - Consolidate node data retrieval from workflow_executions.output_data to unify storage access. - Optimize the construction of messages and execution records to support opening suggestions. - Eliminate redundant queries and storage logic to simplify the overall codebase structure. --- .../workflow/nodes/cycle_graph/iteration.py | 1 + .../core/workflow/nodes/cycle_graph/loop.py | 1 + api/app/services/app_log_service.py | 157 ++++++++++++------ api/app/services/workflow_service.py | 34 +--- 4 files changed, 121 insertions(+), 72 deletions(-) diff --git a/api/app/core/workflow/nodes/cycle_graph/iteration.py b/api/app/core/workflow/nodes/cycle_graph/iteration.py index 1ac46180..3ee7774f 100644 --- a/api/app/core/workflow/nodes/cycle_graph/iteration.py +++ b/api/app/core/workflow/nodes/cycle_graph/iteration.py @@ -182,6 +182,7 @@ class IterationRuntime: "node_id": node_name, "node_type": node_type, "node_name": node_cfg.get("data", {}).get("label") if node_cfg else node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/core/workflow/nodes/cycle_graph/loop.py b/api/app/core/workflow/nodes/cycle_graph/loop.py index ca432a04..93f1a1e4 100644 --- a/api/app/core/workflow/nodes/cycle_graph/loop.py +++ b/api/app/core/workflow/nodes/cycle_graph/loop.py @@ -212,6 +212,7 @@ class LoopRuntime: "node_id": node_name, "node_type": node_type, "node_name": node_name, + "status": result.get("node_outputs", {}).get(node_name, {}).get("status", "completed"), "input": result.get("node_outputs", {}).get(node_name, {}).get("input") if not cycle_variable else cycle_variable, "output": result.get("node_outputs", {}).get(node_name, {}).get("output") diff --git a/api/app/services/app_log_service.py b/api/app/services/app_log_service.py index e8a80d40..7ca05d42 100644 --- a/api/app/services/app_log_service.py +++ b/api/app/services/app_log_service.py @@ -115,7 +115,7 @@ class AppLogService: messages = self.message_repository.get_messages_by_conversation( conversation_id=conversation_id ) - _, node_executions_map = self._get_workflow_node_executions_with_map( + node_executions_map = self._get_workflow_node_executions_with_map( conversation_id, messages ) @@ -139,9 +139,9 @@ class AppLogService: 工作流应用专用:从 workflow_executions 构建 messages 和节点日志。 每条 WorkflowExecution 对应一轮对话: - - user message:来自 execution.input_data + - user message:来自 execution.input_data(content 取 message 字段,files 放 meta_data) - assistant message:来自 execution.output_data(失败时内容为错误信息) - 节点日志以 execution id 为 key 分组。 + 开场白的 suggested_questions 合并到第一条 assistant message 的 meta_data 里。 Returns: (messages 列表, node_executions_map) @@ -156,9 +156,44 @@ class AppLogService: ) executions = list(self.db.scalars(stmt).all()) + # 查开场白:Message 表里 meta_data 含 suggested_questions 的第一条 assistant 消息 + opening_stmt = ( + select(Message) + .where( + Message.conversation_id == conversation_id, + Message.role == "assistant", + ) + .order_by(Message.created_at.asc()) + .limit(10) + ) + early_messages = list(self.db.scalars(opening_stmt).all()) + suggested_questions: list = [] + for m in early_messages: + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data: + suggested_questions = m.meta_data.get("suggested_questions") or [] + break + messages: list[AppLogMessage] = [] node_executions_map: dict[str, list[AppLogNodeExecution]] = {} + # 如果有开场白,作为第一条 assistant 消息插入 + if suggested_questions or early_messages: + opening_msg = next( + (m for m in early_messages + if isinstance(m.meta_data, dict) and "suggested_questions" in m.meta_data), + None + ) + if opening_msg: + messages.append(AppLogMessage( + id=opening_msg.id, + conversation_id=conversation_id, + role="assistant", + content=opening_msg.content, + status=None, + meta_data={"suggested_questions": suggested_questions}, + created_at=opening_msg.created_at, + )) + for execution in executions: started_at = execution.started_at or dt.datetime.now() completed_at = execution.completed_at or started_at @@ -167,13 +202,20 @@ class AppLogService: assistant_msg_id = uuid.uuid5(execution.id, "assistant") # --- user message(输入)--- - input_content = _extract_text(execution.input_data) + input_data = execution.input_data or {} + input_content = input_data.get("message") or _extract_text(input_data) + + # 跳过没有用户输入的 execution(如开场白触发的记录) + if not input_content or not input_content.strip(): + continue + + files = input_data.get("files") or [] user_msg = AppLogMessage( id=uuid.uuid5(execution.id, "user"), conversation_id=conversation_id, role="user", content=input_content, - meta_data=None, + meta_data={"files": files} if files else None, created_at=started_at, ) messages.append(user_msg) @@ -197,24 +239,8 @@ class AppLogService: ) messages.append(assistant_msg) - # --- 节点执行记录,key 与 assistant message id 一致 --- - execution_nodes = [] - for node_exec in execution.node_executions: - output_data = dict(node_exec.output_data or {}) - cycle_items = output_data.pop("cycle_items", None) - execution_nodes.append(AppLogNodeExecution( - node_id=node_exec.node_id, - node_type=node_exec.node_type, - node_name=node_exec.node_name, - status=node_exec.status, - error=node_exec.error_message, - input=node_exec.input_data, - process=None, - output=output_data, - cycle_items=cycle_items, - elapsed_time=node_exec.elapsed_time, - token_usage=node_exec.token_usage, - )) + # --- 节点执行记录,从 workflow_executions.output_data["node_outputs"] 读取 --- + execution_nodes = _build_nodes_from_output_data(execution.output_data) if execution_nodes: node_executions_map[str(assistant_msg_id)] = execution_nodes @@ -225,7 +251,7 @@ class AppLogService: self, conversation_id: uuid.UUID, messages: list[Message] - ) -> Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: + ) -> dict[str, list[AppLogNodeExecution]]: """ 从 workflow_executions 表中提取节点执行记录,并按 assistant message 分组 @@ -237,7 +263,6 @@ class AppLogService: Tuple[list[AppLogNodeExecution], dict[str, list[AppLogNodeExecution]]]: (所有节点执行记录列表, 按 message_id 分组的节点执行记录字典) """ - node_executions = [] node_executions_map: dict[str, list[AppLogNodeExecution]] = {} # 查询该会话关联的所有工作流执行记录(按时间正序) @@ -268,26 +293,8 @@ class AppLogService: used_message_ids: set[str] = set() for execution in executions: - # 构建节点执行记录列表 - execution_nodes = [] - for node_exec in execution.node_executions: - output_data = dict(node_exec.output_data or {}) - cycle_items = output_data.pop("cycle_items", None) - node_execution = AppLogNodeExecution( - node_id=node_exec.node_id, - node_type=node_exec.node_type, - node_name=node_exec.node_name, - status=node_exec.status, - error=node_exec.error_message, - input=node_exec.input_data, - process=None, - output=output_data, - cycle_items=cycle_items, - elapsed_time=node_exec.elapsed_time, - token_usage=node_exec.token_usage, - ) - node_executions.append(node_execution) - execution_nodes.append(node_execution) + # 构建节点执行记录列表,从 workflow_executions.output_data["node_outputs"] 读取 + execution_nodes = _build_nodes_from_output_data(execution.output_data) if not execution_nodes: continue @@ -318,7 +325,7 @@ class AppLogService: used_message_ids.add(msg_id_str) node_executions_map[msg_id_str] = execution_nodes - return node_executions, node_executions_map + return node_executions_map def _extract_text(data: Optional[dict]) -> str: @@ -328,8 +335,64 @@ def _extract_text(data: Optional[dict]) -> str: """ if not data: return "" - for key in ("text", "content", "output", "result", "answer"): + for key in ("message", "text", "content", "output", "result", "answer"): if key in data and isinstance(data[key], str): return data[key] import json return json.dumps(data, ensure_ascii=False) + + +def _build_nodes_from_output_data(output_data: Optional[dict]) -> list[AppLogNodeExecution]: + """从 workflow_executions.output_data["node_outputs"] 构建节点执行记录列表。 + + output_data 结构: + { + "node_outputs": { + "": { + "node_type": ..., + "node_name": ..., + "status": ..., + "input": ..., + "output": ..., + "elapsed_time": ..., + "token_usage": ..., + "error": ..., + "cycle_items": [...], + ... + } + }, + "error": ..., + ... + } + """ + if not output_data: + return [] + node_outputs: dict = output_data.get("node_outputs") or {} + result = [] + for node_id, node_data in node_outputs.items(): + if not isinstance(node_data, dict): + continue + output = dict(node_data) + cycle_items = output.pop("cycle_items", None) + # 把已知的顶层字段剥离,剩余的作为 output + node_type = output.pop("node_type", "unknown") + node_name = output.pop("node_name", None) + status = output.pop("status", "completed") + error = output.pop("error", None) + inp = output.pop("input", None) + elapsed_time = output.pop("elapsed_time", None) + token_usage = output.pop("token_usage", None) + result.append(AppLogNodeExecution( + node_id=node_id, + node_type=node_type, + node_name=node_name, + status=status, + error=error, + input=inp, + process=None, + output=output if output else None, + cycle_items=cycle_items, + elapsed_time=elapsed_time, + token_usage=token_usage, + )) + return result diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index ea302a9c..aa91de89 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -910,7 +910,6 @@ class WorkflowService: input_data["conv_messages"] = conv_messages init_message_length = len(input_data.get("conv_messages", [])) message_id = uuid.uuid4() - _node_order_counter = 0 _cycle_items: dict[str, list] = {} # 新会话时写入开场白 @@ -1015,32 +1014,17 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") - # 把积累的 cycle_item 写入对应循环节点的 output_data - if _cycle_items: + # 把积累的 cycle_item 写入 workflow_executions.output_data["node_outputs"] + if _cycle_items and execution.output_data: + import copy + new_output_data = copy.deepcopy(execution.output_data) + node_outputs = new_output_data.setdefault("node_outputs", {}) for cycle_node_id, items in _cycle_items.items(): - node_exec = self.db.execute( - select(WorkflowNodeExecution).where( - WorkflowNodeExecution.execution_id == execution.id, - WorkflowNodeExecution.node_id == cycle_node_id - ) - ).scalar_one_or_none() - if node_exec: - node_exec.output_data = { - **(node_exec.output_data or {}), - "cycle_items": items - } + if cycle_node_id in node_outputs: + node_outputs[cycle_node_id]["cycle_items"] = items else: - node_cfg = next((n for n in config.nodes if n.get("id") == cycle_node_id), {}) - self.db.add(WorkflowNodeExecution( - execution_id=execution.id, - node_id=cycle_node_id, - node_type=node_cfg.get("type", "cycle"), - node_name=node_cfg.get("data", {}).get("label") or cycle_node_id, - execution_order=_node_order_counter, - status="completed", - output_data={"cycle_items": items}, - )) - _node_order_counter += 1 + node_outputs[cycle_node_id] = {"cycle_items": items} + execution.output_data = new_output_data self.db.commit() elif event.get("event") == "workflow_start": event["data"]["message_id"] = str(message_id)