diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 537058a0..bebb67fc 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -337,6 +337,7 @@ class WorkflowExecutor: logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}") if final_chunk: + logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}") yield { "event": "message", "data": { @@ -701,7 +702,8 @@ class WorkflowExecutor: end_time = datetime.datetime.now() elapsed_time = (end_time - start_time).total_seconds() - logger.info(f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s") + logger.info( + f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s") return self._build_final_output(result, elapsed_time, full_content) @@ -763,7 +765,6 @@ class WorkflowExecutor: await self.__init_variable_pool(input_data) initial_state = self._prepare_initial_state(input_data) - try: full_content = '' self._update_scope_activate("sys") @@ -789,7 +790,7 @@ class WorkflowExecutor: event_type = data.get("type", "node_chunk") # "message" or "node_chunk" if event_type == "node_chunk": async for msg_event in self._handle_node_chunk_event(data): - full_content += data.get("chunk") + full_content += msg_event["data"]["chunk"] yield msg_event elif event_type == "node_error": diff --git a/api/app/core/workflow/graph_builder.py b/api/app/core/workflow/graph_builder.py index 46a594d7..8620bb9a 100644 --- a/api/app/core/workflow/graph_builder.py +++ b/api/app/core/workflow/graph_builder.py @@ -100,7 +100,7 @@ class StreamOutputConfig(BaseModel): ) ) - control_nodes: dict[str, str] = Field( + control_nodes: dict[str, list[str]] = Field( ..., description=( "Control branch conditions for this End node output.\n" @@ -161,7 +161,7 @@ class StreamOutputConfig(BaseModel): if scope in self.control_nodes.keys(): if status is None: raise RuntimeError("[Stream Output] Control node activation status not provided") - if status == self.control_nodes[scope]: + if status in self.control_nodes[scope]: self.activate = True # Case 2: activate variable segments related to this node @@ -229,6 +229,13 @@ class GraphBuilder: except KeyError: raise RuntimeError(f"Node not found: Id={node_id}") + @staticmethod + def _merge_control_nodes(control_nodes: list[tuple[str, str]]) -> dict[str, list]: + result = defaultdict(list) + for node in control_nodes: + result[node[0]].append(node[1]) + return result + def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[tuple[str, str]]]: """ Recursively find all upstream branch (control) nodes that influence the execution @@ -372,7 +379,7 @@ class GraphBuilder: activate=not has_branch, # Branch nodes that control activation of this End node - control_nodes=dict(control_nodes), + control_nodes=self._merge_control_nodes(control_nodes), # Convert output segments into OutputContent objects outputs=list( diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py index 14bcb8ed..761a2e22 100644 --- a/api/app/core/workflow/nodes/llm/node.py +++ b/api/app/core/workflow/nodes/llm/node.py @@ -68,12 +68,13 @@ class LLMNode(BaseNode): - ai/assistant: AI 消息(AIMessage) """ - def _output_types(self) -> dict[str, VariableType]: - return {"output": VariableType.STRING} - def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) self.typed_config: LLMNodeConfig | None = None + self.messages = [] + + def _output_types(self) -> dict[str, VariableType]: + return {"output": VariableType.STRING} def _render_context(self, message: str, variable_pool: VariablePool): context = f"{self._render_template(self.typed_config.context, variable_pool)}" diff --git a/api/app/core/workflow/variable/base_variable.py b/api/app/core/workflow/variable/base_variable.py index 6a2e84d2..19cbdc74 100644 --- a/api/app/core/workflow/variable/base_variable.py +++ b/api/app/core/workflow/variable/base_variable.py @@ -39,18 +39,17 @@ class VariableType(StrEnum): Raises: TypeError: If the type of the input value is not supported. """ - var_type = type(var) - if isinstance(var_type, str): + if isinstance(var, str): return cls.STRING - elif isinstance(var_type, (int, float)): + elif isinstance(var, (int, float)): return cls.NUMBER - elif isinstance(var_type, bool): + elif isinstance(var, bool): return cls.BOOLEAN - elif isinstance(var_type, FileObject) or (isinstance(var, dict) and var.get('__file')): + elif isinstance(var, FileObject) or (isinstance(var, dict) and var.get('__file')): return cls.FILE - elif isinstance(var_type, dict): + elif isinstance(var, dict): return cls.OBJECT - elif isinstance(var_type, list): + elif isinstance(var, list): if len(var) == 0: return cls.ARRAY_STRING else: @@ -67,7 +66,7 @@ class VariableType(StrEnum): return cls.NESTED_ARRAY else: raise TypeError(f"Unsupported array child type - {child_type}") - raise TypeError(f"Unsupported type - {var_type}") + raise TypeError(f"Unsupported type - {type(var)}") def DEFAULT_VALUE(var_type: VariableType) -> Any: diff --git a/api/app/core/workflow/variable_pool.py b/api/app/core/workflow/variable_pool.py index 96495ce8..ae56bcb4 100644 --- a/api/app/core/workflow/variable_pool.py +++ b/api/app/core/workflow/variable_pool.py @@ -119,7 +119,7 @@ class VariablePool: Storage for all variables managed by the pool. """ self.locks = defaultdict(Lock) - self.variables: dict[str, dict[str, VariableStruct[Any]]] = {} + self.variables: dict[str, dict[str, VariableStruct[Any]]] = {"sys": {}, "conv": {}} @staticmethod def transform_selector(selector): diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index db50ce88..fb88f804 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -2,7 +2,6 @@ 工作流服务层 """ import datetime -import json import logging import uuid from typing import Any, Annotated, Optional @@ -448,13 +447,9 @@ class WorkflowService: message=f"工作流配置不存在: app_id={app_id}" ) - json_files = [] - for file in payload.files: - file_json = json.loads(file.model_dump_json()) - json_files.append(file_json) input_data = {"message": payload.message, "variables": payload.variables, "conversation_id": payload.conversation_id, - "files": json_files + "files": [file.model_dump(mode='json') for file in payload.files] } # 转换 conversation_id 为 UUID @@ -642,13 +637,9 @@ class WorkflowService: message=f"工作流配置不存在: app_id={app_id}" ) - json_files = [] - for file in payload.files: - file_json = json.loads(file.model_dump_json()) - json_files.append(file_json) input_data = {"message": payload.message, "variables": payload.variables, "conversation_id": payload.conversation_id, - "files": json_files + "files": [file.model_dump(mode='json') for file in payload.files] } # 转换 conversation_id 为 UUID