Merge pull request #446 from SuanmoSuanyangTechnology/feature/agent-variable
fix(workflow): rename output message field
This commit is contained in:
@@ -127,7 +127,7 @@ class EventStreamHandler:
|
||||
yield {
|
||||
"event": "message",
|
||||
"data": {
|
||||
"chunk": data.get("chunk")
|
||||
"content": data.get("chunk")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -274,7 +274,7 @@ class StreamOutputCoordinator:
|
||||
yield {
|
||||
"event": "message",
|
||||
"data": {
|
||||
"chunk": final_chunk
|
||||
"content": final_chunk
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -272,7 +272,7 @@ class WorkflowExecutor:
|
||||
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
|
||||
if event_type == "node_chunk":
|
||||
async for msg_event in self.event_handler.handle_node_chunk_event(data):
|
||||
full_content += msg_event["data"]["chunk"]
|
||||
full_content += msg_event["data"]["content"]
|
||||
yield msg_event
|
||||
|
||||
elif event_type == "node_error":
|
||||
|
||||
@@ -13,6 +13,7 @@ from sqlalchemy.orm import Session
|
||||
from app.core.error_codes import BizCode
|
||||
from app.core.exceptions import BusinessException
|
||||
from app.core.workflow.adapters.registry import PlatformAdapterRegistry
|
||||
from app.core.workflow.executor import execute_workflow, execute_workflow_stream
|
||||
from app.core.workflow.nodes.enums import NodeType
|
||||
from app.core.workflow.validator import validate_workflow_config
|
||||
from app.db import get_db
|
||||
@@ -23,7 +24,7 @@ from app.repositories.workflow_repository import (
|
||||
WorkflowExecutionRepository,
|
||||
WorkflowNodeExecutionRepository
|
||||
)
|
||||
from app.schemas import DraftRunRequest
|
||||
from app.schemas import DraftRunRequest, FileInput
|
||||
from app.services.conversation_service import ConversationService
|
||||
from app.services.multi_agent_service import convert_uuids_to_str
|
||||
from app.services.multimodal_service import MultimodalService
|
||||
@@ -445,6 +446,91 @@ class WorkflowService:
|
||||
"success_rate": completed / total if total > 0 else 0
|
||||
}
|
||||
|
||||
async def _handle_file_input(self, files: list[FileInput]):
|
||||
if not files:
|
||||
return []
|
||||
|
||||
files_struct = []
|
||||
for file in files:
|
||||
files_struct.append(
|
||||
{
|
||||
"type": file.type,
|
||||
"url": await self.multimodal_service.get_file_url(file),
|
||||
"__file": True
|
||||
}
|
||||
)
|
||||
return files_struct
|
||||
|
||||
@staticmethod
|
||||
def _map_public_event(event: dict) -> dict | None:
|
||||
"""
|
||||
Map internal workflow events to public-facing event formats.
|
||||
|
||||
Purpose:
|
||||
- Hide internal execution details
|
||||
- Expose a stable and simplified public event schema
|
||||
- Filter out non-public events
|
||||
- Maintain backward compatibility when possible
|
||||
|
||||
Args:
|
||||
event (dict): Internal event object, e.g.:
|
||||
{
|
||||
"event": "workflow_start",
|
||||
"data": {...}
|
||||
}
|
||||
|
||||
Returns:
|
||||
dict | None:
|
||||
- Returns the mapped public event
|
||||
- Returns None if the event should not be exposed
|
||||
"""
|
||||
event_type = event.get("event")
|
||||
payload = event.get("data")
|
||||
match event_type:
|
||||
case "workflow_start":
|
||||
return {
|
||||
"event": "start",
|
||||
"data": {
|
||||
"conversation_id": payload.get("conversation_id"),
|
||||
}
|
||||
}
|
||||
case "workflow_end":
|
||||
return {
|
||||
"event": "end",
|
||||
"data": {
|
||||
"elapsed_time": payload.get("elapsed_time"),
|
||||
"message_length": len(payload.get("output", "")),
|
||||
"error": payload.get("error", "")
|
||||
}
|
||||
}
|
||||
case "node_start" | "node_end" | "node_error" | "cycle_item":
|
||||
return None
|
||||
case _:
|
||||
return event
|
||||
|
||||
def _emit(self, public: bool, internal_event: dict):
|
||||
"""
|
||||
Unified event emission entry.
|
||||
|
||||
Args:
|
||||
public (bool):
|
||||
- True -> Emit mapped public event
|
||||
- False -> Emit raw internal event
|
||||
|
||||
internal_event (dict):
|
||||
The original internal event object
|
||||
|
||||
Returns:
|
||||
dict | None:
|
||||
- The mapped event
|
||||
- Or None if the event is filtered out
|
||||
"""
|
||||
if public:
|
||||
mapped = self._map_public_event(internal_event)
|
||||
else:
|
||||
mapped = internal_event
|
||||
return mapped
|
||||
|
||||
# ==================== 工作流执行 ====================
|
||||
|
||||
async def run(
|
||||
@@ -479,10 +565,11 @@ class WorkflowService:
|
||||
message=f"工作流配置不存在: app_id={app_id}"
|
||||
)
|
||||
|
||||
input_data = {"message": payload.message, "variables": payload.variables,
|
||||
"conversation_id": payload.conversation_id,
|
||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||
}
|
||||
input_data = {
|
||||
"message": payload.message, "variables": payload.variables,
|
||||
"conversation_id": payload.conversation_id,
|
||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||
}
|
||||
|
||||
# 转换 conversation_id 为 UUID
|
||||
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
||||
@@ -506,22 +593,8 @@ class WorkflowService:
|
||||
"execution_config": config.execution_config
|
||||
}
|
||||
|
||||
# 4. 获取工作空间 ID(从 app 获取)
|
||||
|
||||
# 5. 执行工作流
|
||||
from app.core.workflow.executor import execute_workflow
|
||||
|
||||
try:
|
||||
files = []
|
||||
if payload.files:
|
||||
for file in payload.files:
|
||||
files.append(
|
||||
{
|
||||
"type": file.type,
|
||||
"url": await self.multimodal_service.get_file_url(file),
|
||||
"__file": True
|
||||
}
|
||||
)
|
||||
files = await self._handle_file_input(payload.files)
|
||||
input_data["files"] = files
|
||||
# 更新状态为运行中
|
||||
self.update_execution_status(execution.execution_id, "running")
|
||||
@@ -601,42 +674,6 @@ class WorkflowService:
|
||||
message=f"工作流执行失败: {str(e)}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _map_public_event(event: dict) -> dict | None:
|
||||
event_type = event.get("event")
|
||||
payload = event.get("data")
|
||||
match event_type:
|
||||
case "workflow_start":
|
||||
return {
|
||||
"event": "start",
|
||||
"data": {
|
||||
"conversation_id": payload.get("conversation_id"),
|
||||
}
|
||||
}
|
||||
case "workflow_end":
|
||||
return {
|
||||
"event": "end",
|
||||
"data": {
|
||||
"elapsed_time": payload.get("elapsed_time"),
|
||||
"message_length": len(payload.get("output", "")),
|
||||
"error": payload.get("error", "")
|
||||
}
|
||||
}
|
||||
case "node_start" | "node_end" | "node_error" | "cycle_item":
|
||||
return None
|
||||
case _:
|
||||
return event
|
||||
|
||||
def _emit(self, public: bool, internal_event: dict):
|
||||
"""
|
||||
decide
|
||||
"""
|
||||
if public:
|
||||
mapped = self._map_public_event(internal_event)
|
||||
else:
|
||||
mapped = internal_event
|
||||
return mapped
|
||||
|
||||
async def run_stream(
|
||||
self,
|
||||
app_id: uuid.UUID,
|
||||
@@ -671,10 +708,11 @@ class WorkflowService:
|
||||
message=f"工作流配置不存在: app_id={app_id}"
|
||||
)
|
||||
|
||||
input_data = {"message": payload.message, "variables": payload.variables,
|
||||
"conversation_id": payload.conversation_id,
|
||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||
}
|
||||
input_data = {
|
||||
"message": payload.message, "variables": payload.variables,
|
||||
"conversation_id": payload.conversation_id,
|
||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||
}
|
||||
|
||||
# 转换 conversation_id 为 UUID
|
||||
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
||||
@@ -699,16 +737,7 @@ class WorkflowService:
|
||||
}
|
||||
|
||||
try:
|
||||
files = []
|
||||
if payload.files:
|
||||
for file in payload.files:
|
||||
files.append(
|
||||
{
|
||||
"type": file.type,
|
||||
"url": await self.multimodal_service.get_file_url(file),
|
||||
"__file": True
|
||||
}
|
||||
)
|
||||
files = await self._handle_file_input(payload.files)
|
||||
input_data["files"] = files
|
||||
self.update_execution_status(execution.execution_id, "running")
|
||||
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
|
||||
@@ -723,7 +752,6 @@ class WorkflowService:
|
||||
input_data["conv_messages"] = last_state.get("messages") or []
|
||||
break
|
||||
init_message_length = len(input_data.get("conv_messages", []))
|
||||
from app.core.workflow.executor import execute_workflow_stream
|
||||
|
||||
async for event in execute_workflow_stream(
|
||||
workflow_config=workflow_config_dict,
|
||||
@@ -789,37 +817,6 @@ class WorkflowService:
|
||||
return node.get("config", {}).get("variables", [])
|
||||
raise BusinessException("workflow config error - start node not found")
|
||||
|
||||
def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]:
|
||||
"""清理事件数据,移除不可序列化的对象
|
||||
|
||||
Args:
|
||||
event: 原始事件数据
|
||||
|
||||
Returns:
|
||||
可序列化的事件数据
|
||||
"""
|
||||
from langchain_core.messages import BaseMessage
|
||||
|
||||
def clean_value(value):
|
||||
"""递归清理值"""
|
||||
if isinstance(value, BaseMessage):
|
||||
# 将 Message 对象转换为字典
|
||||
return {
|
||||
"type": value.__class__.__name__,
|
||||
"content": value.content,
|
||||
}
|
||||
elif isinstance(value, dict):
|
||||
return {k: clean_value(v) for k, v in value.items()}
|
||||
elif isinstance(value, list):
|
||||
return [clean_value(item) for item in value]
|
||||
elif isinstance(value, (str, int, float, bool, type(None))):
|
||||
return value
|
||||
else:
|
||||
# 其他不可序列化的对象转换为字符串
|
||||
return str(value)
|
||||
|
||||
return clean_value(event)
|
||||
|
||||
|
||||
# ==================== 依赖注入函数 ====================
|
||||
|
||||
|
||||
Reference in New Issue
Block a user