fix(workflow): fix loop variable type check, control node streaming output, and variable pool initialization

- Correct loop variable type detection to handle actual Python types
- Update StreamOutput control_nodes to support list of branches and fix upstream control node analysis
- Fix full_content aggregation in WorkflowExecutor for streaming outputs
- Initialize VariablePool with default "sys" and "conv" scopes
This commit is contained in:
Eternity
2026-02-09 14:40:12 +08:00
parent 03bc8c8280
commit d5c46574cc
4 changed files with 22 additions and 15 deletions

View File

@@ -337,6 +337,7 @@ class WorkflowExecutor:
logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}")
if final_chunk:
logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}")
yield {
"event": "message",
"data": {
@@ -701,7 +702,8 @@ class WorkflowExecutor:
end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
logger.info(f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s")
logger.info(
f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s")
return self._build_final_output(result, elapsed_time, full_content)
@@ -763,7 +765,6 @@ class WorkflowExecutor:
await self.__init_variable_pool(input_data)
initial_state = self._prepare_initial_state(input_data)
try:
full_content = ''
self._update_scope_activate("sys")
@@ -789,7 +790,7 @@ class WorkflowExecutor:
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
if event_type == "node_chunk":
async for msg_event in self._handle_node_chunk_event(data):
full_content += data.get("chunk")
full_content += msg_event["data"]["chunk"]
yield msg_event
elif event_type == "node_error":

View File

@@ -100,7 +100,7 @@ class StreamOutputConfig(BaseModel):
)
)
control_nodes: dict[str, str] = Field(
control_nodes: dict[str, list[str]] = Field(
...,
description=(
"Control branch conditions for this End node output.\n"
@@ -161,7 +161,7 @@ class StreamOutputConfig(BaseModel):
if scope in self.control_nodes.keys():
if status is None:
raise RuntimeError("[Stream Output] Control node activation status not provided")
if status == self.control_nodes[scope]:
if status in self.control_nodes[scope]:
self.activate = True
# Case 2: activate variable segments related to this node
@@ -229,6 +229,13 @@ class GraphBuilder:
except KeyError:
raise RuntimeError(f"Node not found: Id={node_id}")
@staticmethod
def _merge_control_nodes(control_nodes: list[tuple[str, str]]) -> dict[str, list]:
result = defaultdict(list)
for node in control_nodes:
result[node[0]].append(node[1])
return result
def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[tuple[str, str]]]:
"""
Recursively find all upstream branch (control) nodes that influence the execution
@@ -372,7 +379,7 @@ class GraphBuilder:
activate=not has_branch,
# Branch nodes that control activation of this End node
control_nodes=dict(control_nodes),
control_nodes=self._merge_control_nodes(control_nodes),
# Convert output segments into OutputContent objects
outputs=list(

View File

@@ -39,18 +39,17 @@ class VariableType(StrEnum):
Raises:
TypeError: If the type of the input value is not supported.
"""
var_type = type(var)
if isinstance(var_type, str):
if isinstance(var, str):
return cls.STRING
elif isinstance(var_type, (int, float)):
elif isinstance(var, (int, float)):
return cls.NUMBER
elif isinstance(var_type, bool):
elif isinstance(var, bool):
return cls.BOOLEAN
elif isinstance(var_type, FileObject) or (isinstance(var, dict) and var.get('__file')):
elif isinstance(var, FileObject) or (isinstance(var, dict) and var.get('__file')):
return cls.FILE
elif isinstance(var_type, dict):
elif isinstance(var, dict):
return cls.OBJECT
elif isinstance(var_type, list):
elif isinstance(var, list):
if len(var) == 0:
return cls.ARRAY_STRING
else:
@@ -67,7 +66,7 @@ class VariableType(StrEnum):
return cls.NESTED_ARRAY
else:
raise TypeError(f"Unsupported array child type - {child_type}")
raise TypeError(f"Unsupported type - {var_type}")
raise TypeError(f"Unsupported type - {type(var)}")
def DEFAULT_VALUE(var_type: VariableType) -> Any:

View File

@@ -119,7 +119,7 @@ class VariablePool:
Storage for all variables managed by the pool.
"""
self.locks = defaultdict(Lock)
self.variables: dict[str, dict[str, VariableStruct[Any]]] = {}
self.variables: dict[str, dict[str, VariableStruct[Any]]] = {"sys": {}, "conv": {}}
@staticmethod
def transform_selector(selector):