feat(workflow): use lightweight deque for streaming scheduler output queue to reduce read/write overhead
This commit is contained in:
@@ -3,7 +3,7 @@
|
|||||||
# @Email: 1533512157@qq.com
|
# @Email: 1533512157@qq.com
|
||||||
# @Time : 2026/2/9 15:11
|
# @Time : 2026/2/9 15:11
|
||||||
import re
|
import re
|
||||||
from queue import Queue
|
from collections import deque
|
||||||
from typing import AsyncGenerator
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
from pydantic import BaseModel, Field, PrivateAttr
|
from pydantic import BaseModel, Field, PrivateAttr
|
||||||
@@ -256,7 +256,7 @@ class StreamOutputCoordinator:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.end_outputs: dict[str, StreamOutputConfig] = {}
|
self.end_outputs: dict[str, StreamOutputConfig] = {}
|
||||||
self.activate_end: str | None = None
|
self.activate_end: str | None = None
|
||||||
self.output_queue: Queue = Queue()
|
self.output_queue: deque[str] = deque()
|
||||||
self.processed_outputs = []
|
self.processed_outputs = []
|
||||||
|
|
||||||
def initialize_end_outputs(
|
def initialize_end_outputs(
|
||||||
@@ -266,7 +266,7 @@ class StreamOutputCoordinator:
|
|||||||
self.end_outputs = end_node_map
|
self.end_outputs = end_node_map
|
||||||
self.processed_outputs = []
|
self.processed_outputs = []
|
||||||
self.activate_end = None
|
self.activate_end = None
|
||||||
self.output_queue = Queue()
|
self.output_queue = deque()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def current_activate_end_info(self):
|
def current_activate_end_info(self):
|
||||||
@@ -296,13 +296,13 @@ class StreamOutputCoordinator:
|
|||||||
scope (str): The node ID or scope that has completed execution.
|
scope (str): The node ID or scope that has completed execution.
|
||||||
status (str | None): Optional status of the node (used for branch/control nodes).
|
status (str | None): Optional status of the node (used for branch/control nodes).
|
||||||
"""
|
"""
|
||||||
for node in self.end_outputs.keys():
|
for node in self.end_outputs:
|
||||||
self.end_outputs[node].update_activate(scope, status)
|
self.end_outputs[node].update_activate(scope, status)
|
||||||
if self.end_outputs[node].activate and node not in self.processed_outputs:
|
if self.end_outputs[node].activate and node not in self.processed_outputs:
|
||||||
self.output_queue.put(node)
|
self.output_queue.append(node)
|
||||||
self.processed_outputs.append(node)
|
self.processed_outputs.append(node)
|
||||||
if self.activate_end is None and not self.output_queue.empty():
|
if self.activate_end is None and self.output_queue:
|
||||||
self.activate_end = self.output_queue.get_nowait()
|
self.activate_end = self.output_queue.popleft()
|
||||||
|
|
||||||
async def emit_activate_chunk(
|
async def emit_activate_chunk(
|
||||||
self,
|
self,
|
||||||
@@ -414,8 +414,8 @@ class StreamOutputCoordinator:
|
|||||||
async for msg_event in self.emit_activate_chunk(variable_pool, force=True):
|
async for msg_event in self.emit_activate_chunk(variable_pool, force=True):
|
||||||
yield msg_event
|
yield msg_event
|
||||||
|
|
||||||
if not self.output_queue.empty():
|
if self.output_queue:
|
||||||
self.activate_end = self.output_queue.get_nowait()
|
self.activate_end = self.output_queue.popleft()
|
||||||
# Move to next active End node if current one is done
|
# Move to next active End node if current one is done
|
||||||
if not self.activate_end and self.end_outputs:
|
if not self.activate_end and self.end_outputs:
|
||||||
self.activate_end = list(self.end_outputs.keys())[0]
|
self.activate_end = list(self.end_outputs.keys())[0]
|
||||||
|
|||||||
Reference in New Issue
Block a user