diff --git a/api/app/core/workflow/engine/graph_builder.py b/api/app/core/workflow/engine/graph_builder.py index 61896574..c20fd0bb 100644 --- a/api/app/core/workflow/engine/graph_builder.py +++ b/api/app/core/workflow/engine/graph_builder.py @@ -59,7 +59,6 @@ class GraphBuilder: self.variable_pool = variable_pool else: self.variable_pool = VariablePool() - self._reverse_adj: dict[str, list[dict]] = defaultdict(list) self.graph = StateGraph(WorkflowState) self.add_nodes() @@ -138,8 +137,10 @@ class GraphBuilder: complete before this node activates. """ source_nodes = self._reverse_adj[target_node] - if not source_nodes or self.get_node_type(target_node) in [NodeType.START, NodeType.CYCLE_START]: - return tuple(), tuple() + if not source_nodes: + if self.get_node_type(target_node) in [NodeType.START, NodeType.CYCLE_START]: + return tuple(), tuple() + raise RuntimeError(f"Node {target_node} is not reachable from the Start node") branch_nodes = [] output_nodes = [] diff --git a/api/app/core/workflow/engine/result_builder.py b/api/app/core/workflow/engine/result_builder.py index 31bccf57..e5a03c1c 100644 --- a/api/app/core/workflow/engine/result_builder.py +++ b/api/app/core/workflow/engine/result_builder.py @@ -12,6 +12,7 @@ class WorkflowResultBuilder: variable_pool: VariablePool, elapsed_time: float, final_output: str, + success: bool ): """Construct the final standardized output of the workflow execution. @@ -29,6 +30,7 @@ class WorkflowResultBuilder: elapsed_time (float): Total execution time in seconds. final_output (Any): The aggregated or final output content of the workflow (e.g., combined messages from all End nodes). + success (bool): Whether the execution was successful. Returns: dict: A dictionary containing the final workflow execution result with keys: @@ -49,7 +51,7 @@ class WorkflowResultBuilder: conversation_id = variable_pool.get_value("sys.conversation_id") return { - "status": "completed", + "status": "completed" if success else "failed", "output": final_output, "variables": { "conv": variable_pool.get_all_conversation_vars(), diff --git a/api/app/core/workflow/engine/stream_output_coordinator.py b/api/app/core/workflow/engine/stream_output_coordinator.py index 8184545c..ceffc7dc 100644 --- a/api/app/core/workflow/engine/stream_output_coordinator.py +++ b/api/app/core/workflow/engine/stream_output_coordinator.py @@ -264,6 +264,9 @@ class StreamOutputCoordinator: end_node_map: dict[str, StreamOutputConfig] ): self.end_outputs = end_node_map + self.processed_outputs = [] + self.activate_end = None + self.output_queue = Queue() @property def current_activate_end_info(self): diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index ff979f2b..c9ed6e65 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -128,89 +128,100 @@ class WorkflowExecutor: - token_usage: aggregated token usage if available - error: error message if any """ - logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}") - - start_time = datetime.datetime.now() - - # Execute the workflow - try: - # Build the workflow graph - graph = self.build_graph() - - # Initialize the variable pool with input data - await self.variable_initializer.initialize( - variable_pool=self.variable_pool, - input_data=input_data, - execution_context=self.execution_context - ) - initial_state = self.state_manager.create_initial_state( - workflow_config=self.workflow_config, - input_data=input_data, - execution_context=self.execution_context, - start_node_id=self.start_node_id - ) - - result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config) - - # Aggregate output from all End nodes - full_content = '' - for end_id in self.stream_coordinator.end_outputs.keys(): - full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) - - # Append messages for user and assistant - if input_data.get("files"): - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "user", - "content": input_data.get("files") - }, - { - "role": "assistant", - "content": full_content - } - ] - ) - else: - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "assistant", - "content": full_content - } - ] - ) - # Calculate elapsed time - end_time = datetime.datetime.now() - elapsed_time = (end_time - start_time).total_seconds() - - logger.info( - f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms") - - return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content) - - except Exception as e: - end_time = datetime.datetime.now() - elapsed_time = (end_time - start_time).total_seconds() - - logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", - exc_info=True) - return { - "status": "failed", - "error": str(e), - "output": None, - "node_outputs": {}, - "elapsed_time": elapsed_time, - "token_usage": None - } + start = datetime.datetime.now() + async for event in self.execute_stream(input_data): + if event.get("event") == "workflow_end": + return event.get("data") + return self.result_builder.build_final_output( + {"error": "Workflow execution did not end as expected"}, + self.variable_pool, + (datetime.datetime.now() - start).total_seconds(), + "", + success=False + ) + # logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}") + # + # start_time = datetime.datetime.now() + # + # # Execute the workflow + # try: + # # Build the workflow graph + # graph = self.build_graph() + # + # # Initialize the variable pool with input data + # await self.variable_initializer.initialize( + # variable_pool=self.variable_pool, + # input_data=input_data, + # execution_context=self.execution_context + # ) + # initial_state = self.state_manager.create_initial_state( + # workflow_config=self.workflow_config, + # input_data=input_data, + # execution_context=self.execution_context, + # start_node_id=self.start_node_id + # ) + # + # result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config) + # + # # Aggregate output from all End nodes + # full_content = '' + # for end_id in self.stream_coordinator.end_outputs.keys(): + # full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) + # + # # Append messages for user and assistant + # if input_data.get("files"): + # result["messages"].extend( + # [ + # { + # "role": "user", + # "content": input_data.get("message", '') + # }, + # { + # "role": "user", + # "content": input_data.get("files") + # }, + # { + # "role": "assistant", + # "content": full_content + # } + # ] + # ) + # else: + # result["messages"].extend( + # [ + # { + # "role": "user", + # "content": input_data.get("message", '') + # }, + # { + # "role": "assistant", + # "content": full_content + # } + # ] + # ) + # # Calculate elapsed time + # end_time = datetime.datetime.now() + # elapsed_time = (end_time - start_time).total_seconds() + # + # logger.info( + # f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms") + # + # return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content) + # + # except Exception as e: + # end_time = datetime.datetime.now() + # elapsed_time = (end_time - start_time).total_seconds() + # + # logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", + # exc_info=True) + # return { + # "status": "failed", + # "error": str(e), + # "output": None, + # "node_outputs": {}, + # "elapsed_time": elapsed_time, + # "token_usage": None + # } async def execute_stream( self, @@ -248,7 +259,8 @@ class WorkflowExecutor: "timestamp": int(start_time.timestamp() * 1000) } } - + result = None + full_content = '' try: # Build the workflow graph in streaming mode graph = self.build_graph(stream=True) @@ -266,7 +278,6 @@ class WorkflowExecutor: start_node_id=self.start_node_id ) - full_content = '' self.stream_coordinator.update_scope_activation("sys") # Execute the workflow with streaming @@ -363,7 +374,12 @@ class WorkflowExecutor: yield { "event": "workflow_end", - "data": self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content) + "data": self.result_builder.build_final_output( + result, + self.variable_pool, + elapsed_time, + full_content, + success=True) } except Exception as e: @@ -372,16 +388,19 @@ class WorkflowExecutor: logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", exc_info=True) - + if result is None: + result = {"error": str(e)} + else: + result["error"] = str(e) yield { "event": "workflow_end", - "data": { - "execution_id": self.execution_context.execution_id, - "status": "failed", - "error": str(e), - "elapsed_time": elapsed_time, - "timestamp": end_time.isoformat() - } + "data": self.result_builder.build_final_output( + result, + self.variable_pool, + elapsed_time, + full_content, + success=False + ) } diff --git a/api/app/core/workflow/nodes/code/node.py b/api/app/core/workflow/nodes/code/node.py index 9303302d..1e055002 100644 --- a/api/app/core/workflow/nodes/code/node.py +++ b/api/app/core/workflow/nodes/code/node.py @@ -128,7 +128,7 @@ class CodeNode(BaseNode): else: raise ValueError(f"Unsupported language: {self.typed_config.language}") - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=60) as client: response = await client.post( "http://sandbox:8194/v1/sandbox/run", headers={