From 163872be6eb3edb0089000f9371ca271517e3f2d Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Wed, 4 Mar 2026 12:12:52 +0800 Subject: [PATCH] fix(workflow): rename output message field --- .../workflow/engine/event_stream_handler.py | 2 +- .../engine/stream_output_coordinator.py | 2 +- api/app/core/workflow/executor.py | 2 +- api/app/services/workflow_service.py | 201 +++++++++--------- 4 files changed, 102 insertions(+), 105 deletions(-) diff --git a/api/app/core/workflow/engine/event_stream_handler.py b/api/app/core/workflow/engine/event_stream_handler.py index 5b7d8de2..dc3cd04d 100644 --- a/api/app/core/workflow/engine/event_stream_handler.py +++ b/api/app/core/workflow/engine/event_stream_handler.py @@ -127,7 +127,7 @@ class EventStreamHandler: yield { "event": "message", "data": { - "chunk": data.get("chunk") + "content": data.get("chunk") } } diff --git a/api/app/core/workflow/engine/stream_output_coordinator.py b/api/app/core/workflow/engine/stream_output_coordinator.py index ba6af156..c2885ab0 100644 --- a/api/app/core/workflow/engine/stream_output_coordinator.py +++ b/api/app/core/workflow/engine/stream_output_coordinator.py @@ -274,7 +274,7 @@ class StreamOutputCoordinator: yield { "event": "message", "data": { - "chunk": final_chunk + "content": final_chunk } } diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index e781b6c4..7e5bb0e4 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -272,7 +272,7 @@ class WorkflowExecutor: event_type = data.get("type", "node_chunk") # "message" or "node_chunk" if event_type == "node_chunk": async for msg_event in self.event_handler.handle_node_chunk_event(data): - full_content += msg_event["data"]["chunk"] + full_content += msg_event["data"]["content"] yield msg_event elif event_type == "node_error": diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index a388ca75..02819efb 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -13,6 +13,7 @@ from sqlalchemy.orm import Session from app.core.error_codes import BizCode from app.core.exceptions import BusinessException from app.core.workflow.adapters.registry import PlatformAdapterRegistry +from app.core.workflow.executor import execute_workflow, execute_workflow_stream from app.core.workflow.nodes.enums import NodeType from app.core.workflow.validator import validate_workflow_config from app.db import get_db @@ -23,7 +24,7 @@ from app.repositories.workflow_repository import ( WorkflowExecutionRepository, WorkflowNodeExecutionRepository ) -from app.schemas import DraftRunRequest +from app.schemas import DraftRunRequest, FileInput from app.services.conversation_service import ConversationService from app.services.multi_agent_service import convert_uuids_to_str from app.services.multimodal_service import MultimodalService @@ -445,6 +446,91 @@ class WorkflowService: "success_rate": completed / total if total > 0 else 0 } + async def _handle_file_input(self, files: list[FileInput]): + if not files: + return [] + + files_struct = [] + for file in files: + files_struct.append( + { + "type": file.type, + "url": await self.multimodal_service.get_file_url(file), + "__file": True + } + ) + return files_struct + + @staticmethod + def _map_public_event(event: dict) -> dict | None: + """ + Map internal workflow events to public-facing event formats. + + Purpose: + - Hide internal execution details + - Expose a stable and simplified public event schema + - Filter out non-public events + - Maintain backward compatibility when possible + + Args: + event (dict): Internal event object, e.g.: + { + "event": "workflow_start", + "data": {...} + } + + Returns: + dict | None: + - Returns the mapped public event + - Returns None if the event should not be exposed + """ + event_type = event.get("event") + payload = event.get("data") + match event_type: + case "workflow_start": + return { + "event": "start", + "data": { + "conversation_id": payload.get("conversation_id"), + } + } + case "workflow_end": + return { + "event": "end", + "data": { + "elapsed_time": payload.get("elapsed_time"), + "message_length": len(payload.get("output", "")), + "error": payload.get("error", "") + } + } + case "node_start" | "node_end" | "node_error" | "cycle_item": + return None + case _: + return event + + def _emit(self, public: bool, internal_event: dict): + """ + Unified event emission entry. + + Args: + public (bool): + - True -> Emit mapped public event + - False -> Emit raw internal event + + internal_event (dict): + The original internal event object + + Returns: + dict | None: + - The mapped event + - Or None if the event is filtered out + """ + if public: + mapped = self._map_public_event(internal_event) + else: + mapped = internal_event + return mapped + # ==================== 工作流执行 ==================== async def run( @@ -479,10 +565,11 @@ class WorkflowService: message=f"工作流配置不存在: app_id={app_id}" ) - input_data = {"message": payload.message, "variables": payload.variables, - "conversation_id": payload.conversation_id, - "files": [file.model_dump(mode='json') for file in payload.files] - } + input_data = { + "message": payload.message, "variables": payload.variables, + "conversation_id": payload.conversation_id, + "files": [file.model_dump(mode='json') for file in payload.files] + } # 转换 conversation_id 为 UUID conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None @@ -506,22 +593,8 @@ class WorkflowService: "execution_config": config.execution_config } - # 4. 获取工作空间 ID(从 app 获取) - - # 5. 执行工作流 - from app.core.workflow.executor import execute_workflow - try: - files = [] - if payload.files: - for file in payload.files: - files.append( - { - "type": file.type, - "url": await self.multimodal_service.get_file_url(file), - "__file": True - } - ) + files = await self._handle_file_input(payload.files) input_data["files"] = files # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") @@ -601,42 +674,6 @@ class WorkflowService: message=f"工作流执行失败: {str(e)}" ) - @staticmethod - def _map_public_event(event: dict) -> dict | None: - event_type = event.get("event") - payload = event.get("data") - match event_type: - case "workflow_start": - return { - "event": "start", - "data": { - "conversation_id": payload.get("conversation_id"), - } - } - case "workflow_end": - return { - "event": "end", - "data": { - "elapsed_time": payload.get("elapsed_time"), - "message_length": len(payload.get("output", "")), - "error": payload.get("error", "") - } - } - case "node_start" | "node_end" | "node_error" | "cycle_item": - return None - case _: - return event - - def _emit(self, public: bool, internal_event: dict): - """ - decide - """ - if public: - mapped = self._map_public_event(internal_event) - else: - mapped = internal_event - return mapped - async def run_stream( self, app_id: uuid.UUID, @@ -671,10 +708,11 @@ class WorkflowService: message=f"工作流配置不存在: app_id={app_id}" ) - input_data = {"message": payload.message, "variables": payload.variables, - "conversation_id": payload.conversation_id, - "files": [file.model_dump(mode='json') for file in payload.files] - } + input_data = { + "message": payload.message, "variables": payload.variables, + "conversation_id": payload.conversation_id, + "files": [file.model_dump(mode='json') for file in payload.files] + } # 转换 conversation_id 为 UUID conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None @@ -699,16 +737,7 @@ class WorkflowService: } try: - files = [] - if payload.files: - for file in payload.files: - files.append( - { - "type": file.type, - "url": await self.multimodal_service.get_file_url(file), - "__file": True - } - ) + files = await self._handle_file_input(payload.files) input_data["files"] = files self.update_execution_status(execution.execution_id, "running") executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) @@ -723,7 +752,6 @@ class WorkflowService: input_data["conv_messages"] = last_state.get("messages") or [] break init_message_length = len(input_data.get("conv_messages", [])) - from app.core.workflow.executor import execute_workflow_stream async for event in execute_workflow_stream( workflow_config=workflow_config_dict, @@ -789,37 +817,6 @@ class WorkflowService: return node.get("config", {}).get("variables", []) raise BusinessException("workflow config error - start node not found") - def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]: - """清理事件数据,移除不可序列化的对象 - - Args: - event: 原始事件数据 - - Returns: - 可序列化的事件数据 - """ - from langchain_core.messages import BaseMessage - - def clean_value(value): - """递归清理值""" - if isinstance(value, BaseMessage): - # 将 Message 对象转换为字典 - return { - "type": value.__class__.__name__, - "content": value.content, - } - elif isinstance(value, dict): - return {k: clean_value(v) for k, v in value.items()} - elif isinstance(value, list): - return [clean_value(item) for item in value] - elif isinstance(value, (str, int, float, bool, type(None))): - return value - else: - # 其他不可序列化的对象转换为字符串 - return str(value) - - return clean_value(event) - # ==================== 依赖注入函数 ====================