fix(workflow): fix function cache not taking effect and potential list index overflow
This commit is contained in:
@@ -320,7 +320,9 @@ class WorkflowExecutor:
|
|||||||
if event_type == "node_chunk":
|
if event_type == "node_chunk":
|
||||||
node_id = data.get("node_id")
|
node_id = data.get("node_id")
|
||||||
if self.activate_end:
|
if self.activate_end:
|
||||||
end_info = self.end_outputs[self.activate_end]
|
end_info = self.end_outputs.get(self.activate_end)
|
||||||
|
if not end_info or end_info.cursor >= len(end_info.outputs):
|
||||||
|
continue
|
||||||
current_output = end_info.outputs[end_info.cursor]
|
current_output = end_info.outputs[end_info.cursor]
|
||||||
if current_output.is_variable and node_id in current_output.literal:
|
if current_output.is_variable and node_id in current_output.literal:
|
||||||
if data.get("done"):
|
if data.get("done"):
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ class GraphBuilder:
|
|||||||
self.end_node_map: dict[str, StreamOutputConfig] = {}
|
self.end_node_map: dict[str, StreamOutputConfig] = {}
|
||||||
self._find_upstream_branch_node = lru_cache(
|
self._find_upstream_branch_node = lru_cache(
|
||||||
maxsize=len(self.nodes) * 2
|
maxsize=len(self.nodes) * 2
|
||||||
)(self._find_upstream_branche_node)
|
)(self._find_upstream_branch_node)
|
||||||
self._analyze_end_node_output()
|
self._analyze_end_node_output()
|
||||||
|
|
||||||
self.graph = StateGraph(WorkflowState)
|
self.graph = StateGraph(WorkflowState)
|
||||||
@@ -178,7 +178,7 @@ class GraphBuilder:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
raise RuntimeError(f"Node not found: Id={node_id}")
|
raise RuntimeError(f"Node not found: Id={node_id}")
|
||||||
|
|
||||||
def _find_upstream_branche_node(self, target_node: str) -> tuple[bool, tuple[str]]:
|
def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[str]]:
|
||||||
"""Find upstream branch nodes for a given target node in the workflow graph.
|
"""Find upstream branch nodes for a given target node in the workflow graph.
|
||||||
|
|
||||||
This method identifies all upstream control (branch) nodes that can affect
|
This method identifies all upstream control (branch) nodes that can affect
|
||||||
@@ -219,7 +219,7 @@ class GraphBuilder:
|
|||||||
|
|
||||||
has_branch = True
|
has_branch = True
|
||||||
for node_id in non_branch_nodes:
|
for node_id in non_branch_nodes:
|
||||||
node_has_branch, nodes = self._find_upstream_branche_node(node_id)
|
node_has_branch, nodes = self._find_upstream_branch_node(node_id)
|
||||||
has_branch = has_branch and node_has_branch
|
has_branch = has_branch and node_has_branch
|
||||||
if not has_branch:
|
if not has_branch:
|
||||||
break
|
break
|
||||||
@@ -288,7 +288,7 @@ class GraphBuilder:
|
|||||||
# Stream mode: output activation depends on upstream branch nodes
|
# Stream mode: output activation depends on upstream branch nodes
|
||||||
if self.stream:
|
if self.stream:
|
||||||
# Find upstream branch nodes that can control this End node
|
# Find upstream branch nodes that can control this End node
|
||||||
has_branch, control_nodes = self._find_upstream_branche_node(end_node_id)
|
has_branch, control_nodes = self._find_upstream_branch_node(end_node_id)
|
||||||
|
|
||||||
# Build StreamOutputConfig for this End node
|
# Build StreamOutputConfig for this End node
|
||||||
self.end_node_map[end_node_id] = StreamOutputConfig(
|
self.end_node_map[end_node_id] = StreamOutputConfig(
|
||||||
|
|||||||
Reference in New Issue
Block a user