fix(workflow): rename output message field
This commit is contained in:
@@ -127,7 +127,7 @@ class EventStreamHandler:
|
|||||||
yield {
|
yield {
|
||||||
"event": "message",
|
"event": "message",
|
||||||
"data": {
|
"data": {
|
||||||
"chunk": data.get("chunk")
|
"content": data.get("chunk")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -274,7 +274,7 @@ class StreamOutputCoordinator:
|
|||||||
yield {
|
yield {
|
||||||
"event": "message",
|
"event": "message",
|
||||||
"data": {
|
"data": {
|
||||||
"chunk": final_chunk
|
"content": final_chunk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -272,7 +272,7 @@ class WorkflowExecutor:
|
|||||||
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
|
event_type = data.get("type", "node_chunk") # "message" or "node_chunk"
|
||||||
if event_type == "node_chunk":
|
if event_type == "node_chunk":
|
||||||
async for msg_event in self.event_handler.handle_node_chunk_event(data):
|
async for msg_event in self.event_handler.handle_node_chunk_event(data):
|
||||||
full_content += msg_event["data"]["chunk"]
|
full_content += msg_event["data"]["content"]
|
||||||
yield msg_event
|
yield msg_event
|
||||||
|
|
||||||
elif event_type == "node_error":
|
elif event_type == "node_error":
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from sqlalchemy.orm import Session
|
|||||||
from app.core.error_codes import BizCode
|
from app.core.error_codes import BizCode
|
||||||
from app.core.exceptions import BusinessException
|
from app.core.exceptions import BusinessException
|
||||||
from app.core.workflow.adapters.registry import PlatformAdapterRegistry
|
from app.core.workflow.adapters.registry import PlatformAdapterRegistry
|
||||||
|
from app.core.workflow.executor import execute_workflow, execute_workflow_stream
|
||||||
from app.core.workflow.nodes.enums import NodeType
|
from app.core.workflow.nodes.enums import NodeType
|
||||||
from app.core.workflow.validator import validate_workflow_config
|
from app.core.workflow.validator import validate_workflow_config
|
||||||
from app.db import get_db
|
from app.db import get_db
|
||||||
@@ -23,7 +24,7 @@ from app.repositories.workflow_repository import (
|
|||||||
WorkflowExecutionRepository,
|
WorkflowExecutionRepository,
|
||||||
WorkflowNodeExecutionRepository
|
WorkflowNodeExecutionRepository
|
||||||
)
|
)
|
||||||
from app.schemas import DraftRunRequest
|
from app.schemas import DraftRunRequest, FileInput
|
||||||
from app.services.conversation_service import ConversationService
|
from app.services.conversation_service import ConversationService
|
||||||
from app.services.multi_agent_service import convert_uuids_to_str
|
from app.services.multi_agent_service import convert_uuids_to_str
|
||||||
from app.services.multimodal_service import MultimodalService
|
from app.services.multimodal_service import MultimodalService
|
||||||
@@ -445,6 +446,91 @@ class WorkflowService:
|
|||||||
"success_rate": completed / total if total > 0 else 0
|
"success_rate": completed / total if total > 0 else 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def _handle_file_input(self, files: list[FileInput]):
|
||||||
|
if not files:
|
||||||
|
return []
|
||||||
|
|
||||||
|
files_struct = []
|
||||||
|
for file in files:
|
||||||
|
files_struct.append(
|
||||||
|
{
|
||||||
|
"type": file.type,
|
||||||
|
"url": await self.multimodal_service.get_file_url(file),
|
||||||
|
"__file": True
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return files_struct
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _map_public_event(event: dict) -> dict | None:
|
||||||
|
"""
|
||||||
|
Map internal workflow events to public-facing event formats.
|
||||||
|
|
||||||
|
Purpose:
|
||||||
|
- Hide internal execution details
|
||||||
|
- Expose a stable and simplified public event schema
|
||||||
|
- Filter out non-public events
|
||||||
|
- Maintain backward compatibility when possible
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (dict): Internal event object, e.g.:
|
||||||
|
{
|
||||||
|
"event": "workflow_start",
|
||||||
|
"data": {...}
|
||||||
|
}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict | None:
|
||||||
|
- Returns the mapped public event
|
||||||
|
- Returns None if the event should not be exposed
|
||||||
|
"""
|
||||||
|
event_type = event.get("event")
|
||||||
|
payload = event.get("data")
|
||||||
|
match event_type:
|
||||||
|
case "workflow_start":
|
||||||
|
return {
|
||||||
|
"event": "start",
|
||||||
|
"data": {
|
||||||
|
"conversation_id": payload.get("conversation_id"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "workflow_end":
|
||||||
|
return {
|
||||||
|
"event": "end",
|
||||||
|
"data": {
|
||||||
|
"elapsed_time": payload.get("elapsed_time"),
|
||||||
|
"message_length": len(payload.get("output", "")),
|
||||||
|
"error": payload.get("error", "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "node_start" | "node_end" | "node_error" | "cycle_item":
|
||||||
|
return None
|
||||||
|
case _:
|
||||||
|
return event
|
||||||
|
|
||||||
|
def _emit(self, public: bool, internal_event: dict):
|
||||||
|
"""
|
||||||
|
Unified event emission entry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
public (bool):
|
||||||
|
- True -> Emit mapped public event
|
||||||
|
- False -> Emit raw internal event
|
||||||
|
|
||||||
|
internal_event (dict):
|
||||||
|
The original internal event object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict | None:
|
||||||
|
- The mapped event
|
||||||
|
- Or None if the event is filtered out
|
||||||
|
"""
|
||||||
|
if public:
|
||||||
|
mapped = self._map_public_event(internal_event)
|
||||||
|
else:
|
||||||
|
mapped = internal_event
|
||||||
|
return mapped
|
||||||
|
|
||||||
# ==================== 工作流执行 ====================
|
# ==================== 工作流执行 ====================
|
||||||
|
|
||||||
async def run(
|
async def run(
|
||||||
@@ -479,10 +565,11 @@ class WorkflowService:
|
|||||||
message=f"工作流配置不存在: app_id={app_id}"
|
message=f"工作流配置不存在: app_id={app_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
input_data = {"message": payload.message, "variables": payload.variables,
|
input_data = {
|
||||||
"conversation_id": payload.conversation_id,
|
"message": payload.message, "variables": payload.variables,
|
||||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
"conversation_id": payload.conversation_id,
|
||||||
}
|
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||||
|
}
|
||||||
|
|
||||||
# 转换 conversation_id 为 UUID
|
# 转换 conversation_id 为 UUID
|
||||||
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
||||||
@@ -506,22 +593,8 @@ class WorkflowService:
|
|||||||
"execution_config": config.execution_config
|
"execution_config": config.execution_config
|
||||||
}
|
}
|
||||||
|
|
||||||
# 4. 获取工作空间 ID(从 app 获取)
|
|
||||||
|
|
||||||
# 5. 执行工作流
|
|
||||||
from app.core.workflow.executor import execute_workflow
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
files = []
|
files = await self._handle_file_input(payload.files)
|
||||||
if payload.files:
|
|
||||||
for file in payload.files:
|
|
||||||
files.append(
|
|
||||||
{
|
|
||||||
"type": file.type,
|
|
||||||
"url": await self.multimodal_service.get_file_url(file),
|
|
||||||
"__file": True
|
|
||||||
}
|
|
||||||
)
|
|
||||||
input_data["files"] = files
|
input_data["files"] = files
|
||||||
# 更新状态为运行中
|
# 更新状态为运行中
|
||||||
self.update_execution_status(execution.execution_id, "running")
|
self.update_execution_status(execution.execution_id, "running")
|
||||||
@@ -601,42 +674,6 @@ class WorkflowService:
|
|||||||
message=f"工作流执行失败: {str(e)}"
|
message=f"工作流执行失败: {str(e)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _map_public_event(event: dict) -> dict | None:
|
|
||||||
event_type = event.get("event")
|
|
||||||
payload = event.get("data")
|
|
||||||
match event_type:
|
|
||||||
case "workflow_start":
|
|
||||||
return {
|
|
||||||
"event": "start",
|
|
||||||
"data": {
|
|
||||||
"conversation_id": payload.get("conversation_id"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "workflow_end":
|
|
||||||
return {
|
|
||||||
"event": "end",
|
|
||||||
"data": {
|
|
||||||
"elapsed_time": payload.get("elapsed_time"),
|
|
||||||
"message_length": len(payload.get("output", "")),
|
|
||||||
"error": payload.get("error", "")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "node_start" | "node_end" | "node_error" | "cycle_item":
|
|
||||||
return None
|
|
||||||
case _:
|
|
||||||
return event
|
|
||||||
|
|
||||||
def _emit(self, public: bool, internal_event: dict):
|
|
||||||
"""
|
|
||||||
decide
|
|
||||||
"""
|
|
||||||
if public:
|
|
||||||
mapped = self._map_public_event(internal_event)
|
|
||||||
else:
|
|
||||||
mapped = internal_event
|
|
||||||
return mapped
|
|
||||||
|
|
||||||
async def run_stream(
|
async def run_stream(
|
||||||
self,
|
self,
|
||||||
app_id: uuid.UUID,
|
app_id: uuid.UUID,
|
||||||
@@ -671,10 +708,11 @@ class WorkflowService:
|
|||||||
message=f"工作流配置不存在: app_id={app_id}"
|
message=f"工作流配置不存在: app_id={app_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
input_data = {"message": payload.message, "variables": payload.variables,
|
input_data = {
|
||||||
"conversation_id": payload.conversation_id,
|
"message": payload.message, "variables": payload.variables,
|
||||||
"files": [file.model_dump(mode='json') for file in payload.files]
|
"conversation_id": payload.conversation_id,
|
||||||
}
|
"files": [file.model_dump(mode='json') for file in payload.files]
|
||||||
|
}
|
||||||
|
|
||||||
# 转换 conversation_id 为 UUID
|
# 转换 conversation_id 为 UUID
|
||||||
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
conversation_id_uuid = uuid.UUID(payload.conversation_id) if payload.conversation_id else None
|
||||||
@@ -699,16 +737,7 @@ class WorkflowService:
|
|||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
files = []
|
files = await self._handle_file_input(payload.files)
|
||||||
if payload.files:
|
|
||||||
for file in payload.files:
|
|
||||||
files.append(
|
|
||||||
{
|
|
||||||
"type": file.type,
|
|
||||||
"url": await self.multimodal_service.get_file_url(file),
|
|
||||||
"__file": True
|
|
||||||
}
|
|
||||||
)
|
|
||||||
input_data["files"] = files
|
input_data["files"] = files
|
||||||
self.update_execution_status(execution.execution_id, "running")
|
self.update_execution_status(execution.execution_id, "running")
|
||||||
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
|
executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid)
|
||||||
@@ -723,7 +752,6 @@ class WorkflowService:
|
|||||||
input_data["conv_messages"] = last_state.get("messages") or []
|
input_data["conv_messages"] = last_state.get("messages") or []
|
||||||
break
|
break
|
||||||
init_message_length = len(input_data.get("conv_messages", []))
|
init_message_length = len(input_data.get("conv_messages", []))
|
||||||
from app.core.workflow.executor import execute_workflow_stream
|
|
||||||
|
|
||||||
async for event in execute_workflow_stream(
|
async for event in execute_workflow_stream(
|
||||||
workflow_config=workflow_config_dict,
|
workflow_config=workflow_config_dict,
|
||||||
@@ -789,37 +817,6 @@ class WorkflowService:
|
|||||||
return node.get("config", {}).get("variables", [])
|
return node.get("config", {}).get("variables", [])
|
||||||
raise BusinessException("workflow config error - start node not found")
|
raise BusinessException("workflow config error - start node not found")
|
||||||
|
|
||||||
def _clean_event_for_json(self, event: dict[str, Any]) -> dict[str, Any]:
|
|
||||||
"""清理事件数据,移除不可序列化的对象
|
|
||||||
|
|
||||||
Args:
|
|
||||||
event: 原始事件数据
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
可序列化的事件数据
|
|
||||||
"""
|
|
||||||
from langchain_core.messages import BaseMessage
|
|
||||||
|
|
||||||
def clean_value(value):
|
|
||||||
"""递归清理值"""
|
|
||||||
if isinstance(value, BaseMessage):
|
|
||||||
# 将 Message 对象转换为字典
|
|
||||||
return {
|
|
||||||
"type": value.__class__.__name__,
|
|
||||||
"content": value.content,
|
|
||||||
}
|
|
||||||
elif isinstance(value, dict):
|
|
||||||
return {k: clean_value(v) for k, v in value.items()}
|
|
||||||
elif isinstance(value, list):
|
|
||||||
return [clean_value(item) for item in value]
|
|
||||||
elif isinstance(value, (str, int, float, bool, type(None))):
|
|
||||||
return value
|
|
||||||
else:
|
|
||||||
# 其他不可序列化的对象转换为字符串
|
|
||||||
return str(value)
|
|
||||||
|
|
||||||
return clean_value(event)
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== 依赖注入函数 ====================
|
# ==================== 依赖注入函数 ====================
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user