Merge pull request #365 from SuanmoSuanyangTechnology/fix/workflow-exception
fix(workflow): improve streaming output, control branches and file JSON
This commit is contained in:
@@ -337,6 +337,7 @@ class WorkflowExecutor:
|
|||||||
logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}")
|
logger.warning(f"[STREAM] Failed to evaluate segment: {current_segment.literal}")
|
||||||
|
|
||||||
if final_chunk:
|
if final_chunk:
|
||||||
|
logger.info(f"[STREAM] StreamOutput Node:{self.activate_end}, chunk:{final_chunk}")
|
||||||
yield {
|
yield {
|
||||||
"event": "message",
|
"event": "message",
|
||||||
"data": {
|
"data": {
|
||||||
@@ -701,7 +702,8 @@ class WorkflowExecutor:
|
|||||||
end_time = datetime.datetime.now()
|
end_time = datetime.datetime.now()
|
||||||
elapsed_time = (end_time - start_time).total_seconds()
|
elapsed_time = (end_time - start_time).total_seconds()
|
||||||
|
|
||||||
logger.info(f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s")
|
logger.info(
|
||||||
|
f"Workflow execution completed: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s")
|
||||||
|
|
||||||
return self._build_final_output(result, elapsed_time, full_content)
|
return self._build_final_output(result, elapsed_time, full_content)
|
||||||
|
|
||||||
@@ -763,7 +765,6 @@ class WorkflowExecutor:
|
|||||||
await self.__init_variable_pool(input_data)
|
await self.__init_variable_pool(input_data)
|
||||||
initial_state = self._prepare_initial_state(input_data)
|
initial_state = self._prepare_initial_state(input_data)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
full_content = ''
|
full_content = ''
|
||||||
self._update_scope_activate("sys")
|
self._update_scope_activate("sys")
|
||||||
@@ -789,7 +790,7 @@ class WorkflowExecutor:
|
|||||||
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
|
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
|
||||||
if event_type == "node_chunk":
|
if event_type == "node_chunk":
|
||||||
async for msg_event in self._handle_node_chunk_event(data):
|
async for msg_event in self._handle_node_chunk_event(data):
|
||||||
full_content += data.get("chunk")
|
full_content += msg_event["data"]["chunk"]
|
||||||
yield msg_event
|
yield msg_event
|
||||||
|
|
||||||
elif event_type == "node_error":
|
elif event_type == "node_error":
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ class StreamOutputConfig(BaseModel):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
control_nodes: dict[str, str] = Field(
|
control_nodes: dict[str, list[str]] = Field(
|
||||||
...,
|
...,
|
||||||
description=(
|
description=(
|
||||||
"Control branch conditions for this End node output.\n"
|
"Control branch conditions for this End node output.\n"
|
||||||
@@ -161,7 +161,7 @@ class StreamOutputConfig(BaseModel):
|
|||||||
if scope in self.control_nodes.keys():
|
if scope in self.control_nodes.keys():
|
||||||
if status is None:
|
if status is None:
|
||||||
raise RuntimeError("[Stream Output] Control node activation status not provided")
|
raise RuntimeError("[Stream Output] Control node activation status not provided")
|
||||||
if status == self.control_nodes[scope]:
|
if status in self.control_nodes[scope]:
|
||||||
self.activate = True
|
self.activate = True
|
||||||
|
|
||||||
# Case 2: activate variable segments related to this node
|
# Case 2: activate variable segments related to this node
|
||||||
@@ -229,6 +229,13 @@ class GraphBuilder:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
raise RuntimeError(f"Node not found: Id={node_id}")
|
raise RuntimeError(f"Node not found: Id={node_id}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _merge_control_nodes(control_nodes: list[tuple[str, str]]) -> dict[str, list]:
|
||||||
|
result = defaultdict(list)
|
||||||
|
for node in control_nodes:
|
||||||
|
result[node[0]].append(node[1])
|
||||||
|
return result
|
||||||
|
|
||||||
def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[tuple[str, str]]]:
|
def _find_upstream_branch_node(self, target_node: str) -> tuple[bool, tuple[tuple[str, str]]]:
|
||||||
"""
|
"""
|
||||||
Recursively find all upstream branch (control) nodes that influence the execution
|
Recursively find all upstream branch (control) nodes that influence the execution
|
||||||
@@ -372,7 +379,7 @@ class GraphBuilder:
|
|||||||
activate=not has_branch,
|
activate=not has_branch,
|
||||||
|
|
||||||
# Branch nodes that control activation of this End node
|
# Branch nodes that control activation of this End node
|
||||||
control_nodes=dict(control_nodes),
|
control_nodes=self._merge_control_nodes(control_nodes),
|
||||||
|
|
||||||
# Convert output segments into OutputContent objects
|
# Convert output segments into OutputContent objects
|
||||||
outputs=list(
|
outputs=list(
|
||||||
|
|||||||
@@ -68,12 +68,13 @@ class LLMNode(BaseNode):
|
|||||||
- ai/assistant: AI 消息(AIMessage)
|
- ai/assistant: AI 消息(AIMessage)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _output_types(self) -> dict[str, VariableType]:
|
|
||||||
return {"output": VariableType.STRING}
|
|
||||||
|
|
||||||
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]):
|
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]):
|
||||||
super().__init__(node_config, workflow_config)
|
super().__init__(node_config, workflow_config)
|
||||||
self.typed_config: LLMNodeConfig | None = None
|
self.typed_config: LLMNodeConfig | None = None
|
||||||
|
self.messages = []
|
||||||
|
|
||||||
|
def _output_types(self) -> dict[str, VariableType]:
|
||||||
|
return {"output": VariableType.STRING}
|
||||||
|
|
||||||
def _render_context(self, message: str, variable_pool: VariablePool):
|
def _render_context(self, message: str, variable_pool: VariablePool):
|
||||||
context = f"<context>{self._render_template(self.typed_config.context, variable_pool)}</context>"
|
context = f"<context>{self._render_template(self.typed_config.context, variable_pool)}</context>"
|
||||||
|
|||||||
@@ -39,18 +39,17 @@ class VariableType(StrEnum):
|
|||||||
Raises:
|
Raises:
|
||||||
TypeError: If the type of the input value is not supported.
|
TypeError: If the type of the input value is not supported.
|
||||||
"""
|
"""
|
||||||
var_type = type(var)
|
if isinstance(var, str):
|
||||||
if isinstance(var_type, str):
|
|
||||||
return cls.STRING
|
return cls.STRING
|
||||||
elif isinstance(var_type, (int, float)):
|
elif isinstance(var, (int, float)):
|
||||||
return cls.NUMBER
|
return cls.NUMBER
|
||||||
elif isinstance(var_type, bool):
|
elif isinstance(var, bool):
|
||||||
return cls.BOOLEAN
|
return cls.BOOLEAN
|
||||||
elif isinstance(var_type, FileObject) or (isinstance(var, dict) and var.get('__file')):
|
elif isinstance(var, FileObject) or (isinstance(var, dict) and var.get('__file')):
|
||||||
return cls.FILE
|
return cls.FILE
|
||||||
elif isinstance(var_type, dict):
|
elif isinstance(var, dict):
|
||||||
return cls.OBJECT
|
return cls.OBJECT
|
||||||
elif isinstance(var_type, list):
|
elif isinstance(var, list):
|
||||||
if len(var) == 0:
|
if len(var) == 0:
|
||||||
return cls.ARRAY_STRING
|
return cls.ARRAY_STRING
|
||||||
else:
|
else:
|
||||||
@@ -67,7 +66,7 @@ class VariableType(StrEnum):
|
|||||||
return cls.NESTED_ARRAY
|
return cls.NESTED_ARRAY
|
||||||
else:
|
else:
|
||||||
raise TypeError(f"Unsupported array child type - {child_type}")
|
raise TypeError(f"Unsupported array child type - {child_type}")
|
||||||
raise TypeError(f"Unsupported type - {var_type}")
|
raise TypeError(f"Unsupported type - {type(var)}")
|
||||||
|
|
||||||
|
|
||||||
def DEFAULT_VALUE(var_type: VariableType) -> Any:
|
def DEFAULT_VALUE(var_type: VariableType) -> Any:
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ class VariablePool:
|
|||||||
Storage for all variables managed by the pool.
|
Storage for all variables managed by the pool.
|
||||||
"""
|
"""
|
||||||
self.locks = defaultdict(Lock)
|
self.locks = defaultdict(Lock)
|
||||||
self.variables: dict[str, dict[str, VariableStruct[Any]]] = {}
|
self.variables: dict[str, dict[str, VariableStruct[Any]]] = {"sys": {}, "conv": {}}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def transform_selector(selector):
|
def transform_selector(selector):
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
工作流服务层
|
工作流服务层
|
||||||
"""
|
"""
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any, Annotated, Optional
|
from typing import Any, Annotated, Optional
|
||||||
@@ -448,13 +447,9 @@ class WorkflowService:
|
|||||||
message=f"工作流配置不存在: app_id={app_id}"
|
message=f"工作流配置不存在: app_id={app_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
json_files = []
|
|
||||||
for file in payload.files:
|
|
||||||
file_json = json.loads(file.model_dump_json())
|
|
||||||
json_files.append(file_json)
|
|
||||||
input_data = {"message": payload.message, "variables": payload.variables,
|
input_data = {"message": payload.message, "variables": payload.variables,
|
||||||
"conversation_id": payload.conversation_id,
|
"conversation_id": payload.conversation_id,
|
||||||
"files": json_files
|
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||||
}
|
}
|
||||||
|
|
||||||
# 转换 conversation_id 为 UUID
|
# 转换 conversation_id 为 UUID
|
||||||
@@ -642,13 +637,9 @@ class WorkflowService:
|
|||||||
message=f"工作流配置不存在: app_id={app_id}"
|
message=f"工作流配置不存在: app_id={app_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
json_files = []
|
|
||||||
for file in payload.files:
|
|
||||||
file_json = json.loads(file.model_dump_json())
|
|
||||||
json_files.append(file_json)
|
|
||||||
input_data = {"message": payload.message, "variables": payload.variables,
|
input_data = {"message": payload.message, "variables": payload.variables,
|
||||||
"conversation_id": payload.conversation_id,
|
"conversation_id": payload.conversation_id,
|
||||||
"files": json_files
|
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||||
}
|
}
|
||||||
|
|
||||||
# 转换 conversation_id 为 UUID
|
# 转换 conversation_id 为 UUID
|
||||||
|
|||||||
Reference in New Issue
Block a user