Merge pull request #679 from SuanmoSuanyangTechnology/pref/workflow-engine
pref(workflow): use lightweight deque for streaming scheduler output queue to reduce read/write overhead
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
# @Email: 1533512157@qq.com
|
||||
# @Time : 2026/2/9 15:11
|
||||
import re
|
||||
from queue import Queue
|
||||
from collections import deque
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pydantic import BaseModel, Field, PrivateAttr
|
||||
@@ -256,7 +256,7 @@ class StreamOutputCoordinator:
|
||||
def __init__(self):
|
||||
self.end_outputs: dict[str, StreamOutputConfig] = {}
|
||||
self.activate_end: str | None = None
|
||||
self.output_queue: Queue = Queue()
|
||||
self.output_queue: deque[str] = deque()
|
||||
self.processed_outputs = []
|
||||
|
||||
def initialize_end_outputs(
|
||||
@@ -266,7 +266,7 @@ class StreamOutputCoordinator:
|
||||
self.end_outputs = end_node_map
|
||||
self.processed_outputs = []
|
||||
self.activate_end = None
|
||||
self.output_queue = Queue()
|
||||
self.output_queue = deque()
|
||||
|
||||
@property
|
||||
def current_activate_end_info(self):
|
||||
@@ -296,13 +296,13 @@ class StreamOutputCoordinator:
|
||||
scope (str): The node ID or scope that has completed execution.
|
||||
status (str | None): Optional status of the node (used for branch/control nodes).
|
||||
"""
|
||||
for node in self.end_outputs.keys():
|
||||
for node in self.end_outputs:
|
||||
self.end_outputs[node].update_activate(scope, status)
|
||||
if self.end_outputs[node].activate and node not in self.processed_outputs:
|
||||
self.output_queue.put(node)
|
||||
self.output_queue.append(node)
|
||||
self.processed_outputs.append(node)
|
||||
if self.activate_end is None and not self.output_queue.empty():
|
||||
self.activate_end = self.output_queue.get_nowait()
|
||||
if self.activate_end is None and self.output_queue:
|
||||
self.activate_end = self.output_queue.popleft()
|
||||
|
||||
async def emit_activate_chunk(
|
||||
self,
|
||||
@@ -414,8 +414,8 @@ class StreamOutputCoordinator:
|
||||
async for msg_event in self.emit_activate_chunk(variable_pool, force=True):
|
||||
yield msg_event
|
||||
|
||||
if not self.output_queue.empty():
|
||||
self.activate_end = self.output_queue.get_nowait()
|
||||
if self.output_queue:
|
||||
self.activate_end = self.output_queue.popleft()
|
||||
# Move to next active End node if current one is done
|
||||
if not self.activate_end and self.end_outputs:
|
||||
self.activate_end = list(self.end_outputs.keys())[0]
|
||||
|
||||
@@ -348,7 +348,7 @@ class MemoryAgentService:
|
||||
perceptual_serivce = MemoryPerceptualService(db)
|
||||
for message in messages:
|
||||
message["file_content"] = []
|
||||
for file in message["files"]:
|
||||
for file in (message.get("files") or []):
|
||||
file_object = await perceptual_serivce.generate_perceptual_memory(
|
||||
end_user_id=end_user_id,
|
||||
memory_config=memory_config,
|
||||
|
||||
@@ -278,7 +278,7 @@ class MemoryPerceptualService:
|
||||
files=[file]
|
||||
)
|
||||
if not file_message:
|
||||
logger.warning(f"Unsupport file type {file}, model capability: {model_config.capability}")
|
||||
business_logger.warning(f"Unsupported file type {file}, model capability: {model_config.capability}")
|
||||
return None
|
||||
file_message = file_message[0]
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user