diff --git a/api/app/core/workflow/engine/graph_builder.py b/api/app/core/workflow/engine/graph_builder.py index 0d0035ab..7b5c059c 100644 --- a/api/app/core/workflow/engine/graph_builder.py +++ b/api/app/core/workflow/engine/graph_builder.py @@ -7,7 +7,7 @@ import re import uuid from collections import defaultdict from functools import lru_cache -from typing import Any +from typing import Any, Iterable from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import START, END @@ -81,7 +81,7 @@ class GraphBuilder: raise RuntimeError(f"Node not found: Id={node_id}") @staticmethod - def _merge_control_nodes(control_nodes: tuple[tuple[str, str]]) -> dict[str, list]: + def _merge_control_nodes(control_nodes: Iterable[tuple[str, str]]) -> dict[str, list]: result = defaultdict(list) for node in control_nodes: result[node[0]].append(node[1]) diff --git a/api/app/core/workflow/engine/result_builder.py b/api/app/core/workflow/engine/result_builder.py index dbaf8fa6..31bccf57 100644 --- a/api/app/core/workflow/engine/result_builder.py +++ b/api/app/core/workflow/engine/result_builder.py @@ -5,7 +5,7 @@ from app.core.workflow.engine.variable_pool import VariablePool -class WorkflowResultBuiler: +class WorkflowResultBuilder: def build_final_output( self, result: dict, diff --git a/api/app/core/workflow/engine/stream_output_coordinator.py b/api/app/core/workflow/engine/stream_output_coordinator.py index 778c6acf..5155a76f 100644 --- a/api/app/core/workflow/engine/stream_output_coordinator.py +++ b/api/app/core/workflow/engine/stream_output_coordinator.py @@ -263,7 +263,6 @@ class StreamOutputCoordinator: else: # Variable segment: evaluate and transform try: - # Simulate evaluation (replace with actual logic) chunk = variable_pool.get_literal(current_segment.literal) final_chunk += chunk except Exception as e: @@ -271,7 +270,7 @@ class StreamOutputCoordinator: logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}, error: {e}") if final_chunk: - logger.warning(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}") + logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}") yield { "event": "message", "data": { @@ -317,7 +316,7 @@ class StreamOutputCoordinator: if node_info.activate } - if self.activate_end or self.activate_end: + if self.end_outputs or self.activate_end: while self.activate_end: # Force emit all remaining chunks of the active End node async for msg_event in self.emit_activate_chunk(variable_pool, force=True): diff --git a/api/app/core/workflow/engine/variable_pool.py b/api/app/core/workflow/engine/variable_pool.py index 55966ed6..22be08c8 100644 --- a/api/app/core/workflow/engine/variable_pool.py +++ b/api/app/core/workflow/engine/variable_pool.py @@ -401,10 +401,13 @@ class VariablePoolInitializer: ): user_message = input_data.get("message") or "" user_files = input_data.get("files") or [] + conversations = input_data.get("conv_messages", []) + conversation_index = len(conversations) // 2 input_variables = input_data.get("variables") or {} sys_vars = { "message": (user_message, VariableType.STRING), + "conversation_index": (conversation_index, VariableType.NUMBER), "conversation_id": (input_data.get("conversation_id"), VariableType.STRING), "execution_id": (context.execution_id, VariableType.STRING), "workspace_id": (context.workspace_id, VariableType.STRING), diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 2ec7992b..ff48fb07 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -10,7 +10,7 @@ from langgraph.graph.state import CompiledStateGraph from app.core.workflow.engine.event_stream_handler import EventStreamHandler from app.core.workflow.engine.graph_builder import GraphBuilder -from app.core.workflow.engine.result_builder import WorkflowResultBuiler +from app.core.workflow.engine.result_builder import WorkflowResultBuilder from app.core.workflow.engine.runtime_schema import ExecutionContext from app.core.workflow.engine.state_manager import WorkflowStateManager from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator @@ -60,7 +60,7 @@ class WorkflowExecutor: self.variable_initializer = VariablePoolInitializer(workflow_config) self.state_manager = WorkflowStateManager() - self.result_builder = WorkflowResultBuiler() + self.result_builder = WorkflowResultBuilder() self.stream_coordinator = StreamOutputCoordinator() self.event_handler: EventStreamHandler | None = None