feat(workflow): add system-level workflow variable for dialogue turns and fix bug

This commit is contained in:
Eternity
2026-02-10 15:47:34 +08:00
parent d477e24e34
commit dc2ea5c007
5 changed files with 10 additions and 8 deletions

View File

@@ -7,7 +7,7 @@ import re
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from functools import lru_cache from functools import lru_cache
from typing import Any from typing import Any, Iterable
from langgraph.checkpoint.memory import InMemorySaver from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END from langgraph.graph import START, END
@@ -81,7 +81,7 @@ class GraphBuilder:
raise RuntimeError(f"Node not found: Id={node_id}") raise RuntimeError(f"Node not found: Id={node_id}")
@staticmethod @staticmethod
def _merge_control_nodes(control_nodes: tuple[tuple[str, str]]) -> dict[str, list]: def _merge_control_nodes(control_nodes: Iterable[tuple[str, str]]) -> dict[str, list]:
result = defaultdict(list) result = defaultdict(list)
for node in control_nodes: for node in control_nodes:
result[node[0]].append(node[1]) result[node[0]].append(node[1])

View File

@@ -5,7 +5,7 @@
from app.core.workflow.engine.variable_pool import VariablePool from app.core.workflow.engine.variable_pool import VariablePool
class WorkflowResultBuiler: class WorkflowResultBuilder:
def build_final_output( def build_final_output(
self, self,
result: dict, result: dict,

View File

@@ -263,7 +263,6 @@ class StreamOutputCoordinator:
else: else:
# Variable segment: evaluate and transform # Variable segment: evaluate and transform
try: try:
# Simulate evaluation (replace with actual logic)
chunk = variable_pool.get_literal(current_segment.literal) chunk = variable_pool.get_literal(current_segment.literal)
final_chunk += chunk final_chunk += chunk
except Exception as e: except Exception as e:
@@ -271,7 +270,7 @@ class StreamOutputCoordinator:
logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}, error: {e}") logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}, error: {e}")
if final_chunk: if final_chunk:
logger.warning(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}") logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}")
yield { yield {
"event": "message", "event": "message",
"data": { "data": {
@@ -317,7 +316,7 @@ class StreamOutputCoordinator:
if node_info.activate if node_info.activate
} }
if self.activate_end or self.activate_end: if self.end_outputs or self.activate_end:
while self.activate_end: while self.activate_end:
# Force emit all remaining chunks of the active End node # Force emit all remaining chunks of the active End node
async for msg_event in self.emit_activate_chunk(variable_pool, force=True): async for msg_event in self.emit_activate_chunk(variable_pool, force=True):

View File

@@ -401,10 +401,13 @@ class VariablePoolInitializer:
): ):
user_message = input_data.get("message") or "" user_message = input_data.get("message") or ""
user_files = input_data.get("files") or [] user_files = input_data.get("files") or []
conversations = input_data.get("conv_messages", [])
conversation_index = len(conversations) // 2
input_variables = input_data.get("variables") or {} input_variables = input_data.get("variables") or {}
sys_vars = { sys_vars = {
"message": (user_message, VariableType.STRING), "message": (user_message, VariableType.STRING),
"conversation_index": (conversation_index, VariableType.NUMBER),
"conversation_id": (input_data.get("conversation_id"), VariableType.STRING), "conversation_id": (input_data.get("conversation_id"), VariableType.STRING),
"execution_id": (context.execution_id, VariableType.STRING), "execution_id": (context.execution_id, VariableType.STRING),
"workspace_id": (context.workspace_id, VariableType.STRING), "workspace_id": (context.workspace_id, VariableType.STRING),

View File

@@ -10,7 +10,7 @@ from langgraph.graph.state import CompiledStateGraph
from app.core.workflow.engine.event_stream_handler import EventStreamHandler from app.core.workflow.engine.event_stream_handler import EventStreamHandler
from app.core.workflow.engine.graph_builder import GraphBuilder from app.core.workflow.engine.graph_builder import GraphBuilder
from app.core.workflow.engine.result_builder import WorkflowResultBuiler from app.core.workflow.engine.result_builder import WorkflowResultBuilder
from app.core.workflow.engine.runtime_schema import ExecutionContext from app.core.workflow.engine.runtime_schema import ExecutionContext
from app.core.workflow.engine.state_manager import WorkflowStateManager from app.core.workflow.engine.state_manager import WorkflowStateManager
from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator from app.core.workflow.engine.stream_output_coordinator import StreamOutputCoordinator
@@ -60,7 +60,7 @@ class WorkflowExecutor:
self.variable_initializer = VariablePoolInitializer(workflow_config) self.variable_initializer = VariablePoolInitializer(workflow_config)
self.state_manager = WorkflowStateManager() self.state_manager = WorkflowStateManager()
self.result_builder = WorkflowResultBuiler() self.result_builder = WorkflowResultBuilder()
self.stream_coordinator = StreamOutputCoordinator() self.stream_coordinator = StreamOutputCoordinator()
self.event_handler: EventStreamHandler | None = None self.event_handler: EventStreamHandler | None = None