perf(workflow): enhance streaming output node activation performance
This commit is contained in:
@@ -333,7 +333,7 @@ class WorkflowExecutor:
|
||||
if not end_info or end_info.cursor >= len(end_info.outputs):
|
||||
continue
|
||||
current_output = end_info.outputs[end_info.cursor]
|
||||
if current_output.is_variable and node_id in current_output.literal:
|
||||
if current_output.is_variable and current_output.depends_on_node(node_id):
|
||||
if data.get("done"):
|
||||
end_info.cursor += 1
|
||||
else:
|
||||
|
||||
@@ -53,6 +53,44 @@ class OutputContent(BaseModel):
|
||||
)
|
||||
)
|
||||
|
||||
def depends_on_node(self, node_id: str) -> bool:
|
||||
"""
|
||||
Check if this output segment depends on a specific node's variable.
|
||||
|
||||
This method examines the `literal` of the output segment to see if it
|
||||
contains a variable placeholder referencing the given node in the form:
|
||||
|
||||
{{ node_id.field_name }}
|
||||
|
||||
It uses a regular expression to match the exact node ID, avoiding
|
||||
false positives from substring matches (e.g., 'node1' should not match 'node10').
|
||||
|
||||
Args:
|
||||
node_id (str): The ID of the node to check for in this segment's variable placeholders.
|
||||
|
||||
Returns:
|
||||
bool:
|
||||
- True if the segment contains a variable referencing the given node.
|
||||
- False otherwise.
|
||||
|
||||
Example:
|
||||
literal = "{{node1.name}}"
|
||||
|
||||
depends_on_node("node1") -> True
|
||||
depends_on_node("node2") -> False
|
||||
|
||||
Usage:
|
||||
This method is primarily used in stream mode to determine whether
|
||||
a particular variable output segment should be activated when a
|
||||
specific upstream node completes execution.
|
||||
"""
|
||||
variable_pattern = rf"\{{\{{\s*{re.escape(node_id)}\.[a-zA-Z0-9_]+\s*\}}\}}"
|
||||
pattern = re.compile(variable_pattern)
|
||||
match = pattern.search(self.literal)
|
||||
if match:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class StreamOutputConfig(BaseModel):
|
||||
"""
|
||||
@@ -122,7 +160,7 @@ class StreamOutputConfig(BaseModel):
|
||||
for i in range(len(self.outputs)):
|
||||
if (
|
||||
self.outputs[i].is_variable
|
||||
and node_id in self.outputs[i].literal
|
||||
and self.outputs[i].depends_on_node(node_id)
|
||||
):
|
||||
self.outputs[i].activate = True
|
||||
|
||||
|
||||
Reference in New Issue
Block a user