From 1c49e3c16721dc72f39e6f887b4dde2e05c992cf Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Tue, 24 Mar 2026 17:12:37 +0800 Subject: [PATCH] feat(workflow): use lightweight deque for streaming scheduler output queue to reduce read/write overhead --- .../engine/stream_output_coordinator.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/api/app/core/workflow/engine/stream_output_coordinator.py b/api/app/core/workflow/engine/stream_output_coordinator.py index 6685a49e..dcc92fdb 100644 --- a/api/app/core/workflow/engine/stream_output_coordinator.py +++ b/api/app/core/workflow/engine/stream_output_coordinator.py @@ -3,7 +3,7 @@ # @Email: 1533512157@qq.com # @Time : 2026/2/9 15:11 import re -from queue import Queue +from collections import deque from typing import AsyncGenerator from pydantic import BaseModel, Field, PrivateAttr @@ -256,7 +256,7 @@ class StreamOutputCoordinator: def __init__(self): self.end_outputs: dict[str, StreamOutputConfig] = {} self.activate_end: str | None = None - self.output_queue: Queue = Queue() + self.output_queue: deque[str] = deque() self.processed_outputs = [] def initialize_end_outputs( @@ -266,7 +266,7 @@ class StreamOutputCoordinator: self.end_outputs = end_node_map self.processed_outputs = [] self.activate_end = None - self.output_queue = Queue() + self.output_queue = deque() @property def current_activate_end_info(self): @@ -296,13 +296,13 @@ class StreamOutputCoordinator: scope (str): The node ID or scope that has completed execution. status (str | None): Optional status of the node (used for branch/control nodes). """ - for node in self.end_outputs.keys(): + for node in self.end_outputs: self.end_outputs[node].update_activate(scope, status) if self.end_outputs[node].activate and node not in self.processed_outputs: - self.output_queue.put(node) + self.output_queue.append(node) self.processed_outputs.append(node) - if self.activate_end is None and not self.output_queue.empty(): - self.activate_end = self.output_queue.get_nowait() + if self.activate_end is None and self.output_queue: + self.activate_end = self.output_queue.popleft() async def emit_activate_chunk( self, @@ -414,8 +414,8 @@ class StreamOutputCoordinator: async for msg_event in self.emit_activate_chunk(variable_pool, force=True): yield msg_event - if not self.output_queue.empty(): - self.activate_end = self.output_queue.get_nowait() + if self.output_queue: + self.activate_end = self.output_queue.popleft() # Move to next active End node if current one is done if not self.activate_end and self.end_outputs: self.activate_end = list(self.end_outputs.keys())[0]