feat(workflow): use internal streaming execution for non-stream API and return aggregated result

This commit is contained in:
Eternity
2026-03-20 11:39:02 +08:00
parent fcc81ac025
commit 7c6e48b04e
5 changed files with 124 additions and 99 deletions

View File

@@ -59,7 +59,6 @@ class GraphBuilder:
self.variable_pool = variable_pool self.variable_pool = variable_pool
else: else:
self.variable_pool = VariablePool() self.variable_pool = VariablePool()
self._reverse_adj: dict[str, list[dict]] = defaultdict(list)
self.graph = StateGraph(WorkflowState) self.graph = StateGraph(WorkflowState)
self.add_nodes() self.add_nodes()
@@ -138,8 +137,10 @@ class GraphBuilder:
complete before this node activates. complete before this node activates.
""" """
source_nodes = self._reverse_adj[target_node] source_nodes = self._reverse_adj[target_node]
if not source_nodes or self.get_node_type(target_node) in [NodeType.START, NodeType.CYCLE_START]: if not source_nodes:
return tuple(), tuple() if self.get_node_type(target_node) in [NodeType.START, NodeType.CYCLE_START]:
return tuple(), tuple()
raise RuntimeError(f"Node {target_node} is not reachable from the Start node")
branch_nodes = [] branch_nodes = []
output_nodes = [] output_nodes = []

View File

@@ -12,6 +12,7 @@ class WorkflowResultBuilder:
variable_pool: VariablePool, variable_pool: VariablePool,
elapsed_time: float, elapsed_time: float,
final_output: str, final_output: str,
success: bool
): ):
"""Construct the final standardized output of the workflow execution. """Construct the final standardized output of the workflow execution.
@@ -29,6 +30,7 @@ class WorkflowResultBuilder:
elapsed_time (float): Total execution time in seconds. elapsed_time (float): Total execution time in seconds.
final_output (Any): The aggregated or final output content of the workflow final_output (Any): The aggregated or final output content of the workflow
(e.g., combined messages from all End nodes). (e.g., combined messages from all End nodes).
success (bool): Whether the execution was successful.
Returns: Returns:
dict: A dictionary containing the final workflow execution result with keys: dict: A dictionary containing the final workflow execution result with keys:
@@ -49,7 +51,7 @@ class WorkflowResultBuilder:
conversation_id = variable_pool.get_value("sys.conversation_id") conversation_id = variable_pool.get_value("sys.conversation_id")
return { return {
"status": "completed", "status": "completed" if success else "failed",
"output": final_output, "output": final_output,
"variables": { "variables": {
"conv": variable_pool.get_all_conversation_vars(), "conv": variable_pool.get_all_conversation_vars(),

View File

@@ -264,6 +264,9 @@ class StreamOutputCoordinator:
end_node_map: dict[str, StreamOutputConfig] end_node_map: dict[str, StreamOutputConfig]
): ):
self.end_outputs = end_node_map self.end_outputs = end_node_map
self.processed_outputs = []
self.activate_end = None
self.output_queue = Queue()
@property @property
def current_activate_end_info(self): def current_activate_end_info(self):

View File

@@ -128,89 +128,100 @@ class WorkflowExecutor:
- token_usage: aggregated token usage if available - token_usage: aggregated token usage if available
- error: error message if any - error: error message if any
""" """
logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}") start = datetime.datetime.now()
async for event in self.execute_stream(input_data):
start_time = datetime.datetime.now() if event.get("event") == "workflow_end":
return event.get("data")
# Execute the workflow return self.result_builder.build_final_output(
try: {"error": "Workflow execution did not end as expected"},
# Build the workflow graph self.variable_pool,
graph = self.build_graph() (datetime.datetime.now() - start).total_seconds(),
"",
# Initialize the variable pool with input data success=False
await self.variable_initializer.initialize( )
variable_pool=self.variable_pool, # logger.info(f"Starting workflow execution: execution_id={self.execution_context.execution_id}")
input_data=input_data, #
execution_context=self.execution_context # start_time = datetime.datetime.now()
) #
initial_state = self.state_manager.create_initial_state( # # Execute the workflow
workflow_config=self.workflow_config, # try:
input_data=input_data, # # Build the workflow graph
execution_context=self.execution_context, # graph = self.build_graph()
start_node_id=self.start_node_id #
) # # Initialize the variable pool with input data
# await self.variable_initializer.initialize(
result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config) # variable_pool=self.variable_pool,
# input_data=input_data,
# Aggregate output from all End nodes # execution_context=self.execution_context
full_content = '' # )
for end_id in self.stream_coordinator.end_outputs.keys(): # initial_state = self.state_manager.create_initial_state(
full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) # workflow_config=self.workflow_config,
# input_data=input_data,
# Append messages for user and assistant # execution_context=self.execution_context,
if input_data.get("files"): # start_node_id=self.start_node_id
result["messages"].extend( # )
[ #
{ # result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config)
"role": "user", #
"content": input_data.get("message", '') # # Aggregate output from all End nodes
}, # full_content = ''
{ # for end_id in self.stream_coordinator.end_outputs.keys():
"role": "user", # full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False)
"content": input_data.get("files") #
}, # # Append messages for user and assistant
{ # if input_data.get("files"):
"role": "assistant", # result["messages"].extend(
"content": full_content # [
} # {
] # "role": "user",
) # "content": input_data.get("message", '')
else: # },
result["messages"].extend( # {
[ # "role": "user",
{ # "content": input_data.get("files")
"role": "user", # },
"content": input_data.get("message", '') # {
}, # "role": "assistant",
{ # "content": full_content
"role": "assistant", # }
"content": full_content # ]
} # )
] # else:
) # result["messages"].extend(
# Calculate elapsed time # [
end_time = datetime.datetime.now() # {
elapsed_time = (end_time - start_time).total_seconds() # "role": "user",
# "content": input_data.get("message", '')
logger.info( # },
f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms") # {
# "role": "assistant",
return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content) # "content": full_content
# }
except Exception as e: # ]
end_time = datetime.datetime.now() # )
elapsed_time = (end_time - start_time).total_seconds() # # Calculate elapsed time
# end_time = datetime.datetime.now()
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", # elapsed_time = (end_time - start_time).total_seconds()
exc_info=True) #
return { # logger.info(
"status": "failed", # f"Workflow execution completed: execution_id={self.execution_context.execution_id}, elapsed_time={elapsed_time:.2f}ms")
"error": str(e), #
"output": None, # return self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content)
"node_outputs": {}, #
"elapsed_time": elapsed_time, # except Exception as e:
"token_usage": None # end_time = datetime.datetime.now()
} # elapsed_time = (end_time - start_time).total_seconds()
#
# logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
# exc_info=True)
# return {
# "status": "failed",
# "error": str(e),
# "output": None,
# "node_outputs": {},
# "elapsed_time": elapsed_time,
# "token_usage": None
# }
async def execute_stream( async def execute_stream(
self, self,
@@ -248,7 +259,8 @@ class WorkflowExecutor:
"timestamp": int(start_time.timestamp() * 1000) "timestamp": int(start_time.timestamp() * 1000)
} }
} }
result = None
full_content = ''
try: try:
# Build the workflow graph in streaming mode # Build the workflow graph in streaming mode
graph = self.build_graph(stream=True) graph = self.build_graph(stream=True)
@@ -266,7 +278,6 @@ class WorkflowExecutor:
start_node_id=self.start_node_id start_node_id=self.start_node_id
) )
full_content = ''
self.stream_coordinator.update_scope_activation("sys") self.stream_coordinator.update_scope_activation("sys")
# Execute the workflow with streaming # Execute the workflow with streaming
@@ -363,7 +374,12 @@ class WorkflowExecutor:
yield { yield {
"event": "workflow_end", "event": "workflow_end",
"data": self.result_builder.build_final_output(result, self.variable_pool, elapsed_time, full_content) "data": self.result_builder.build_final_output(
result,
self.variable_pool,
elapsed_time,
full_content,
success=True)
} }
except Exception as e: except Exception as e:
@@ -372,16 +388,19 @@ class WorkflowExecutor:
logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}", logger.error(f"Workflow execution failed: execution_id={self.execution_context.execution_id}, error={e}",
exc_info=True) exc_info=True)
if result is None:
result = {"error": str(e)}
else:
result["error"] = str(e)
yield { yield {
"event": "workflow_end", "event": "workflow_end",
"data": { "data": self.result_builder.build_final_output(
"execution_id": self.execution_context.execution_id, result,
"status": "failed", self.variable_pool,
"error": str(e), elapsed_time,
"elapsed_time": elapsed_time, full_content,
"timestamp": end_time.isoformat() success=False
} )
} }

View File

@@ -128,7 +128,7 @@ class CodeNode(BaseNode):
else: else:
raise ValueError(f"Unsupported language: {self.typed_config.language}") raise ValueError(f"Unsupported language: {self.typed_config.language}")
async with httpx.AsyncClient() as client: async with httpx.AsyncClient(timeout=60) as client:
response = await client.post( response = await client.post(
"http://sandbox:8194/v1/sandbox/run", "http://sandbox:8194/v1/sandbox/run",
headers={ headers={