From 1c49e3c16721dc72f39e6f887b4dde2e05c992cf Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Tue, 24 Mar 2026 17:12:37 +0800 Subject: [PATCH 1/2] feat(workflow): use lightweight deque for streaming scheduler output queue to reduce read/write overhead --- .../engine/stream_output_coordinator.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/api/app/core/workflow/engine/stream_output_coordinator.py b/api/app/core/workflow/engine/stream_output_coordinator.py index 6685a49e..dcc92fdb 100644 --- a/api/app/core/workflow/engine/stream_output_coordinator.py +++ b/api/app/core/workflow/engine/stream_output_coordinator.py @@ -3,7 +3,7 @@ # @Email: 1533512157@qq.com # @Time : 2026/2/9 15:11 import re -from queue import Queue +from collections import deque from typing import AsyncGenerator from pydantic import BaseModel, Field, PrivateAttr @@ -256,7 +256,7 @@ class StreamOutputCoordinator: def __init__(self): self.end_outputs: dict[str, StreamOutputConfig] = {} self.activate_end: str | None = None - self.output_queue: Queue = Queue() + self.output_queue: deque[str] = deque() self.processed_outputs = [] def initialize_end_outputs( @@ -266,7 +266,7 @@ class StreamOutputCoordinator: self.end_outputs = end_node_map self.processed_outputs = [] self.activate_end = None - self.output_queue = Queue() + self.output_queue = deque() @property def current_activate_end_info(self): @@ -296,13 +296,13 @@ class StreamOutputCoordinator: scope (str): The node ID or scope that has completed execution. status (str | None): Optional status of the node (used for branch/control nodes). """ - for node in self.end_outputs.keys(): + for node in self.end_outputs: self.end_outputs[node].update_activate(scope, status) if self.end_outputs[node].activate and node not in self.processed_outputs: - self.output_queue.put(node) + self.output_queue.append(node) self.processed_outputs.append(node) - if self.activate_end is None and not self.output_queue.empty(): - self.activate_end = self.output_queue.get_nowait() + if self.activate_end is None and self.output_queue: + self.activate_end = self.output_queue.popleft() async def emit_activate_chunk( self, @@ -414,8 +414,8 @@ class StreamOutputCoordinator: async for msg_event in self.emit_activate_chunk(variable_pool, force=True): yield msg_event - if not self.output_queue.empty(): - self.activate_end = self.output_queue.get_nowait() + if self.output_queue: + self.activate_end = self.output_queue.popleft() # Move to next active End node if current one is done if not self.activate_end and self.end_outputs: self.activate_end = list(self.end_outputs.keys())[0] From 522eb569f15dcc197fa25d0b79e7a123ba358458 Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Tue, 24 Mar 2026 18:10:45 +0800 Subject: [PATCH 2/2] fix(memory): fix undefined logger causing logging errors in memory module --- api/app/services/memory_agent_service.py | 2 +- api/app/services/memory_perceptual_service.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/services/memory_agent_service.py b/api/app/services/memory_agent_service.py index 3cfcc1d6..e5c34492 100644 --- a/api/app/services/memory_agent_service.py +++ b/api/app/services/memory_agent_service.py @@ -348,7 +348,7 @@ class MemoryAgentService: perceptual_serivce = MemoryPerceptualService(db) for message in messages: message["file_content"] = [] - for file in message["files"]: + for file in (message.get("files") or []): file_object = await perceptual_serivce.generate_perceptual_memory( end_user_id=end_user_id, memory_config=memory_config, diff --git a/api/app/services/memory_perceptual_service.py b/api/app/services/memory_perceptual_service.py index effceda7..3ee238e2 100644 --- a/api/app/services/memory_perceptual_service.py +++ b/api/app/services/memory_perceptual_service.py @@ -278,7 +278,7 @@ class MemoryPerceptualService: files=[file] ) if not file_message: - logger.warning(f"Unsupport file type {file}, model capability: {model_config.capability}") + business_logger.warning(f"Unsupported file type {file}, model capability: {model_config.capability}") return None file_message = file_message[0] try: