Merge pull request #497 from SuanmoSuanyangTechnology/fix/bug-patch

feat(workflow,chat): support multimodal context and add message_id to chat API response; fix Dify compatibility issues
This commit is contained in:
Ke Sun
2026-03-06 17:28:36 +08:00
committed by GitHub
12 changed files with 216 additions and 89 deletions

View File

@@ -111,7 +111,7 @@ async def Split_The_Problem(state: ReadState) -> ReadState:
"error_type": type(e).__name__, "error_type": type(e).__name__,
"error_message": str(e), "error_message": str(e),
"content_length": len(content), "content_length": len(content),
"llm_model_id": memory_config.llm_model_id if memory_config else None "llm_model_id": str(memory_config.llm_model_id) if memory_config else None
} }
logger.error(f"Split_The_Problem error details: {error_details}") logger.error(f"Split_The_Problem error details: {error_details}")
@@ -221,7 +221,7 @@ async def Problem_Extension(state: ReadState) -> ReadState:
"error_type": type(e).__name__, "error_type": type(e).__name__,
"error_message": str(e), "error_message": str(e),
"questions_count": len(databasets), "questions_count": len(databasets),
"llm_model_id": memory_config.llm_model_id if memory_config else None "llm_model_id": str(memory_config.llm_model_id) if memory_config else None
} }
logger.error(f"Problem_Extension error details: {error_details}") logger.error(f"Problem_Extension error details: {error_details}")

View File

@@ -129,11 +129,11 @@ class DifyConverter(BaseConverter):
@staticmethod @staticmethod
def _convert_file(var): def _convert_file(var):
pass return None
@staticmethod @staticmethod
def _convert_array_file(var): def _convert_array_file(var):
pass return []
@staticmethod @staticmethod
def variable_type_map(source_type) -> VariableType | None: def variable_type_map(source_type) -> VariableType | None:
@@ -198,7 +198,7 @@ class DifyConverter(BaseConverter):
"over-write": AssignmentOperator.COVER, "over-write": AssignmentOperator.COVER,
"remove-last": AssignmentOperator.REMOVE_LAST, "remove-last": AssignmentOperator.REMOVE_LAST,
"remove-first": AssignmentOperator.REMOVE_FIRST, "remove-first": AssignmentOperator.REMOVE_FIRST,
"set": AssignmentOperator.ASSIGN,
} }
return operator_map.get(operator, operator) return operator_map.get(operator, operator)
@@ -267,10 +267,10 @@ class DifyConverter(BaseConverter):
type=var_type, type=var_type,
required=var["required"], required=var["required"],
default=self.convert_variable_type( default=self.convert_variable_type(
var_type, var["default"] var_type, var.get("default")
), ),
description=var["label"], description=var["label"],
max_length=var.get("max_length"), max_length=var.get("max_length", 50),
) )
start_vars.append(var_def) start_vars.append(var_def)
result = StartNodeConfig.model_construct( result = StartNodeConfig.model_construct(
@@ -333,7 +333,7 @@ class DifyConverter(BaseConverter):
MessageConfig( MessageConfig(
role="user", role="user",
content=self.trans_variable_format( content=self.trans_variable_format(
node_data["memory"].get("query_prompt_template", "{{#sys.query#}}") node_data["memory"].get("query_prompt_template") or "{{#sys.query#}}"
) )
) )
) )
@@ -612,7 +612,7 @@ class DifyConverter(BaseConverter):
), ),
headers=headers, headers=headers,
params=params, params=params,
verify_ssl=node_data["ssl_verify"], verify_ssl=node_data.get("ssl_verify", False),
timeouts=HttpTimeOutConfig.model_construct( timeouts=HttpTimeOutConfig.model_construct(
connect_timeout=node_data["timeout"]["max_connect_timeout"] or 5, connect_timeout=node_data["timeout"]["max_connect_timeout"] or 5,
read_timeout=node_data["timeout"]["max_read_timeout"] or 5, read_timeout=node_data["timeout"]["max_read_timeout"] or 5,
@@ -696,7 +696,7 @@ class DifyConverter(BaseConverter):
group_variables = {} group_variables = {}
group_type = {} group_type = {}
if not advanced_settings or not advanced_settings["group_enabled"]: if not advanced_settings or not advanced_settings["group_enabled"]:
group_variables["output"] = [ group_variables = [
self._process_list_variable_litearl(variable) self._process_list_variable_litearl(variable)
for variable in node_data["variables"] for variable in node_data["variables"]
] ]

View File

@@ -83,6 +83,12 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter):
require_fields = frozenset({'app', 'kind', 'version', 'workflow'}) require_fields = frozenset({'app', 'kind', 'version', 'workflow'})
if not all(field in self.config for field in require_fields): if not all(field in self.config for field in require_fields):
return False return False
if self.config.get("app",{}).get("mode") == "workflow":
self.errors.append(ExceptionDefineition(
type=ExceptionType.PLATFORM,
detail="workflow mode is not supported"
))
return False
for node in self.origin_nodes: for node in self.origin_nodes:
if not self._valid_nodes(node): if not self._valid_nodes(node):
@@ -134,6 +140,8 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter):
for node in self.origin_nodes: for node in self.origin_nodes:
if self.map_node_type(node["data"]["type"]) == NodeType.LLM: if self.map_node_type(node["data"]["type"]) == NodeType.LLM:
self.node_output_map[f"{node['id']}.text"] = f"{node['id']}.output" self.node_output_map[f"{node['id']}.text"] = f"{node['id']}.output"
elif self.map_node_type(node["data"]["type"]) == NodeType.KNOWLEDGE_RETRIEVAL:
self.node_output_map[f"{node['id']}.result"] = f"{node['id']}.output"
def _convert_cycle_node_position(self, node_id: str, position: dict): def _convert_cycle_node_position(self, node_id: str, position: dict):
for node in self.origin_nodes: for node in self.origin_nodes:
@@ -184,7 +192,7 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter):
type=ExceptionType.NODE, type=ExceptionType.NODE,
node_id=node["id"], node_id=node["id"],
node_name=node["data"]["title"], node_name=node["data"]["title"],
detail=f"node type {node_type} is unsupported", detail=f"node type {node_type if node_type else 'notes'} is unsupported",
)) ))
return converter(node) return converter(node)
except Exception as e: except Exception as e:

View File

@@ -320,7 +320,7 @@ class GraphBuilder:
# Used later to determine which branch to take based on the node's output # Used later to determine which branch to take based on the node's output
# Assumes node output `node.<node_id>.output` matches the edge's label # Assumes node output `node.<node_id>.output` matches the edge's label
# For example, if node.123.output == 'CASE1', take the branch labeled 'CASE1' # For example, if node.123.output == 'CASE1', take the branch labeled 'CASE1'
related_edge[idx]['condition'] = f"node.{node_id}.output == '{related_edge[idx]['label']}'" related_edge[idx]['condition'] = f"node['{node_id}']['output'] == '{related_edge[idx]['label']}'"
if node_instance: if node_instance:
# Wrap node's run method to avoid closure issues # Wrap node's run method to avoid closure issues

View File

@@ -158,18 +158,36 @@ class WorkflowExecutor:
full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False)
# Append messages for user and assistant # Append messages for user and assistant
result["messages"].extend( if input_data.get("files"):
[ result["messages"].extend(
{ [
"role": "user", {
"content": input_data.get("message", '') "role": "user",
}, "content": input_data.get("message", '')
{ },
"role": "assistant", {
"content": full_content "role": "user",
} "content": input_data.get("files")
] },
) {
"role": "assistant",
"content": full_content
}
]
)
else:
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
# Calculate elapsed time # Calculate elapsed time
end_time = datetime.datetime.now() end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds() elapsed_time = (end_time - start_time).total_seconds()
@@ -308,18 +326,36 @@ class WorkflowExecutor:
elapsed_time = (end_time - start_time).total_seconds() elapsed_time = (end_time - start_time).total_seconds()
# Append messages for user and assistant # Append messages for user and assistant
result["messages"].extend( if input_data.get("files"):
[ result["messages"].extend(
{ [
"role": "user", {
"content": input_data.get("message", '') "role": "user",
}, "content": input_data.get("message", '')
{ },
"role": "assistant", {
"content": full_content "role": "user",
} "content": input_data.get("files")
] },
) {
"role": "assistant",
"content": full_content
}
]
)
else:
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
logger.info( logger.info(
f"Workflow execution completed (streaming), " f"Workflow execution completed (streaming), "
f"elapsed: {elapsed_time:.2f}ms, execution_id: {self.execution_context.execution_id}" f"elapsed: {elapsed_time:.2f}ms, execution_id: {self.execution_context.execution_id}"

View File

@@ -85,20 +85,20 @@ class BaseNodeConfig(BaseModel):
- tags: 节点标签(用于分类和搜索) - tags: 节点标签(用于分类和搜索)
""" """
name: str | None = Field( # name: str | None = Field(
default=None, # default=None,
description="节点名称(显示名称),如果不设置则使用节点 ID" # description="节点名称(显示名称),如果不设置则使用节点 ID"
) # )
#
description: str | None = Field( # description: str | None = Field(
default=None, # default=None,
description="节点描述,说明节点的作用" # description="节点描述,说明节点的作用"
) # )
#
tags: list[str] = Field( # tags: list[str] = Field(
default_factory=list, # default_factory=list,
description="节点标签,用于分类和搜索" # description="节点标签,用于分类和搜索"
) # )
class Config: class Config:
"""Pydantic 配置""" """Pydantic 配置"""

View File

@@ -617,10 +617,19 @@ class BaseNode(ABC):
return variable_pool.has(selector) return variable_pool.has(selector)
@staticmethod @staticmethod
async def process_message(provider: str, content: str | FileObject, enable_file=False) -> dict | str | None: async def process_message(provider: str, content: str | dict | FileObject, enable_file=False) -> list | str | None:
if isinstance(content, dict):
content = FileObject(
type=content.get("type"),
url=content.get("url"),
transfer_method=content.get("transfer_method"),
origin_file_type=content.get("origin_file_type"),
file_id=content.get("file_id"),
is_file=True
)
if isinstance(content, str): if isinstance(content, str):
if enable_file: if enable_file:
return {"text": content} return [{"text": content}]
return content return content
elif isinstance(content, FileObject): elif isinstance(content, FileObject):
@@ -639,8 +648,8 @@ class BaseNode(ABC):
) )
if message: if message:
content.content_cache[provider] = message[0] content.content_cache[provider] = message
return message[0] return message
return None return None
raise TypeError(f'Unexpect input value type - {type(content)}') raise TypeError(f'Unexpect input value type - {type(content)}')

View File

@@ -151,23 +151,23 @@ class LLMNode(BaseNode):
if role == "system": if role == "system":
messages.append({ messages.append({
"role": "system", "role": "system",
"content": content "content": await self.process_message(provider, content, self.typed_config.vision)
}) })
elif role in ["user", "human"]: elif role in ["user", "human"]:
messages.append({ messages.append({
"role": "user", "role": "user",
"content": content "content": await self.process_message(provider, content, self.typed_config.vision)
}) })
elif role in ["ai", "assistant"]: elif role in ["ai", "assistant"]:
messages.append({ messages.append({
"role": "assistant", "role": "assistant",
"content": content "content": await self.process_message(provider, content, self.typed_config.vision)
}) })
else: else:
logger.warning(f"未知的消息角色: {role},默认使用 user") logger.warning(f"未知的消息角色: {role},默认使用 user")
messages.append({ messages.append({
"role": "user", "role": "user",
"content": content "content": await self.process_message(provider, content, self.typed_config.vision)
}) })
if self.typed_config.vision_input and self.typed_config.vision: if self.typed_config.vision_input and self.typed_config.vision:
@@ -176,14 +176,28 @@ class LLMNode(BaseNode):
for file in files.value: for file in files.value:
content = await self.process_message(provider, file.value, self.typed_config.vision) content = await self.process_message(provider, file.value, self.typed_config.vision)
if content: if content:
file_content.append(content) file_content.extend(content)
if messages and messages[-1]["role"] == 'user': if messages and messages[-1]["role"] == 'user':
messages[-1]['content'] = [messages[-1]["content"]] + file_content messages[-1]['content'] = messages[-1]["content"] + file_content
else: else:
messages.append({"role": "user", "content": file_content}) messages.append({"role": "user", "content": file_content})
if self.typed_config.memory.enable: if self.typed_config.memory.enable:
messages = messages[:-1] + state["messages"][-self.typed_config.memory.window_size:] + messages[-1:] history_message = []
for message in state["messages"][-self.typed_config.memory.window_size:]:
if isinstance(message["content"], list):
file_content = []
for file in message["content"]:
content = await self.process_message(provider, file, self.typed_config.vision)
if content:
file_content.extend(content)
history_message.append(
{"role": message["role"], "content": file_content}
)
else:
message["content"] = await self.process_message(provider, message["content"], self.typed_config.vision)
history_message.append(message)
messages = messages[:-1] + history_message + messages[-1:]
self.messages = messages self.messages = messages
else: else:
# 使用简单的 prompt 格式(向后兼容) # 使用简单的 prompt 格式(向后兼容)

View File

@@ -144,7 +144,7 @@ class AppChatService:
) )
# 保存消息 # 保存消息
self.conversation_service.save_conversation_messages( message_id = self.conversation_service.save_conversation_messages(
conversation_id=conversation_id, conversation_id=conversation_id,
user_message=message, user_message=message,
assistant_message=result["content"], assistant_message=result["content"],
@@ -163,6 +163,7 @@ class AppChatService:
return { return {
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message_id": str(message_id),
"message": result["content"], "message": result["content"],
"usage": result.get("usage", { "usage": result.get("usage", {
"prompt_tokens": 0, "prompt_tokens": 0,
@@ -191,7 +192,11 @@ class AppChatService:
try: try:
start_time = time.time() start_time = time.time()
config_id = None config_id = None
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" message_id = uuid.uuid4()
yield f"event: start\ndata: {json.dumps({
'conversation_id': str(conversation_id),
"message_id": str(message_id)
}, ensure_ascii=False)}\n\n"
variables = self.agent_service.prepare_variables(variables, config.variables) variables = self.agent_service.prepare_variables(variables, config.variables)
# 获取模型配置ID # 获取模型配置ID
@@ -296,6 +301,7 @@ class AppChatService:
) )
self.conversation_service.add_message( self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=full_content, content=full_content,
@@ -373,7 +379,7 @@ class AppChatService:
content=message content=message
) )
self.conversation_service.add_message( ai_message = self.conversation_service.add_message(
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=result.get("message", ""), content=result.get("message", ""),
@@ -391,6 +397,7 @@ class AppChatService:
return { return {
"conversation_id": conversation_id, "conversation_id": conversation_id,
"message": result.get("message", ""), "message": result.get("message", ""),
"message_id": str(ai_message.id),
"usage": { "usage": {
"prompt_tokens": 0, "prompt_tokens": 0,
"completion_tokens": 0, "completion_tokens": 0,
@@ -419,9 +426,9 @@ class AppChatService:
variables = {} variables = {}
try: try:
message_id = uuid.uuid4()
# 发送开始事件 # 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id), "message_id": str(message_id)}, ensure_ascii=False)}\n\n"
full_content = "" full_content = ""
total_tokens = 0 total_tokens = 0
@@ -429,6 +436,7 @@ class AppChatService:
# 2. 创建编排器 # 2. 创建编排器
orchestrator = MultiAgentOrchestrator(self.db, config) orchestrator = MultiAgentOrchestrator(self.db, config)
# 3. 流式执行任务 # 3. 流式执行任务
async for event in orchestrator.execute_stream( async for event in orchestrator.execute_stream(
message=message, message=message,
@@ -472,6 +480,7 @@ class AppChatService:
) )
self.conversation_service.add_message( self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=full_content, content=full_content,

View File

@@ -178,7 +178,8 @@ class ConversationService:
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
role: str, role: str,
content: str, content: str,
meta_data: Optional[dict] = None meta_data: Optional[dict] = None,
message_id: Optional[uuid.UUID] = None,
) -> Message: ) -> Message:
""" """
Add a message to a conversation using UnitOfWork. Add a message to a conversation using UnitOfWork.
@@ -188,6 +189,7 @@ class ConversationService:
role (str): Role of the message sender ('user' or 'assistant'). role (str): Role of the message sender ('user' or 'assistant').
content (str): Message content. content (str): Message content.
meta_data (Optional[dict]): Optional metadata. meta_data (Optional[dict]): Optional metadata.
message_id (Optional[uuid.UUID]): Optional custom message UUID.
Returns: Returns:
Message: Newly created Message instance. Message: Newly created Message instance.
@@ -198,6 +200,7 @@ class ConversationService:
) )
message = Message( message = Message(
id=message_id if message_id else uuid.uuid4(),
conversation_id=conversation_id, conversation_id=conversation_id,
role=role, role=role,
content=content, content=content,
@@ -317,7 +320,7 @@ class ConversationService:
content=user_message content=user_message
) )
self.add_message( ai_message = self.add_message(
conversation_id=conversation_id, conversation_id=conversation_id,
role="assistant", role="assistant",
content=assistant_message, content=assistant_message,
@@ -332,6 +335,7 @@ class ConversationService:
"assistant_message_length": len(assistant_message) "assistant_message_length": len(assistant_message)
} }
) )
return ai_message.id
def delete_conversation( def delete_conversation(
self, self,

View File

@@ -56,7 +56,7 @@ class WorkflowImportService:
success=False, success=False,
temp_id=None, temp_id=None,
workflow_id=None, workflow_id=None,
errors=[InvalidConfiguration()] errors=[InvalidConfiguration()] + adapter.errors
) )
workflow_config = adapter.parse_workflow() workflow_config = adapter.parse_workflow()

View File

@@ -25,7 +25,7 @@ from app.repositories.workflow_repository import (
WorkflowExecutionRepository, WorkflowExecutionRepository,
WorkflowNodeExecutionRepository WorkflowNodeExecutionRepository
) )
from app.schemas import DraftRunRequest, FileInput from app.schemas import DraftRunRequest, FileInput, FileType
from app.services.conversation_service import ConversationService from app.services.conversation_service import ConversationService
from app.services.multi_agent_service import convert_uuids_to_str from app.services.multi_agent_service import convert_uuids_to_str
from app.services.multimodal_service import MultimodalService from app.services.multimodal_service import MultimodalService
@@ -496,6 +496,7 @@ class WorkflowService:
"event": "start", "event": "start",
"data": { "data": {
"conversation_id": payload.get("conversation_id"), "conversation_id": payload.get("conversation_id"),
"message_id": payload.get("message_id")
} }
} }
case "workflow_end": case "workflow_end":
@@ -600,6 +601,7 @@ class WorkflowService:
try: try:
files = await self._handle_file_input(payload.files) files = await self._handle_file_input(payload.files)
input_data["files"] = files input_data["files"] = files
message_id = uuid.uuid4()
# 更新状态为运行中 # 更新状态为运行中
self.update_execution_status(execution.execution_id, "running") self.update_execution_status(execution.execution_id, "running")
@@ -624,24 +626,45 @@ class WorkflowService:
workspace_id=str(workspace_id), workspace_id=str(workspace_id),
user_id=payload.user_id user_id=payload.user_id
) )
# 更新执行结果 # 更新执行结果
if result.get("status") == "completed": if result.get("status") == "completed":
token_usage = result.get("token_usage", {}) or {} token_usage = result.get("token_usage", {}) or {}
final_messages = result.get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
for message in final_messages:
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], list):
for file in message["content"]:
if file.get("type") == FileType.IMAGE:
human_message += f"![image]({file.get('url', '')})"
else:
human_message += f"[{file.get('type')}]({file.get('url', '')})"
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
)
self.update_execution_status( self.update_execution_status(
execution.execution_id, execution.execution_id,
"completed", "completed",
output_data=result, output_data=result,
token_usage=token_usage.get("total_tokens", None) token_usage=token_usage.get("total_tokens", None)
) )
final_messages = result.get("messages", [])[init_message_length:]
for message in final_messages:
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
logger.info(f"Workflow Run Success, " logger.info(f"Workflow Run Success, "
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
else: else:
@@ -650,6 +673,8 @@ class WorkflowService:
"failed", "failed",
error_message=result.get("error") error_message=result.get("error")
) )
logger.error(f"Workflow Run Failed, execution_id: {execution.execution_id},"
f" error: {result.get('error')}")
# 返回增强的响应结构 # 返回增强的响应结构
return { return {
@@ -659,6 +684,7 @@ class WorkflowService:
# "messages": result.get("messages"), # "messages": result.get("messages"),
"output": result.get("output"), # 最终输出(字符串) "output": result.get("output"), # 最终输出(字符串)
"message": result.get("output"), # 最终输出(字符串) "message": result.get("output"), # 最终输出(字符串)
"message_id": str(message_id),
# "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID "conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
"error_message": result.get("error"), "error_message": result.get("error"),
@@ -756,7 +782,7 @@ class WorkflowService:
input_data["conv_messages"] = last_state.get("messages") or [] input_data["conv_messages"] = last_state.get("messages") or []
break break
init_message_length = len(input_data.get("conv_messages", [])) init_message_length = len(input_data.get("conv_messages", []))
message_id = uuid.uuid4()
async for event in execute_workflow_stream( async for event in execute_workflow_stream(
workflow_config=workflow_config_dict, workflow_config=workflow_config_dict,
input_data=input_data, input_data=input_data,
@@ -765,24 +791,43 @@ class WorkflowService:
user_id=payload.user_id, user_id=payload.user_id,
): ):
if event.get("event") == "workflow_end": if event.get("event") == "workflow_end":
status = event.get("data", {}).get("status") status = event.get("data", {}).get("status")
token_usage = event.get("data", {}).get("token_usage", {}) or {} token_usage = event.get("data", {}).get("token_usage", {}) or {}
if status == "completed": if status == "completed":
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
for message in final_messages:
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], list):
for file in message["content"]:
if file.get("type") == FileType.IMAGE:
human_message += f"![image]({file.get('url', '')})"
else:
human_message += f"[{file.get('type')}]({file.get('url', '')})"
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
)
self.update_execution_status( self.update_execution_status(
execution.execution_id, execution.execution_id,
"completed", "completed",
output_data=event.get("data"), output_data=event.get("data"),
token_usage=token_usage.get("total_tokens", None) token_usage=token_usage.get("total_tokens", None)
) )
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
for message in final_messages:
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
logger.info(f"Workflow Run Success, " logger.info(f"Workflow Run Success, "
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
elif status == "failed": elif status == "failed":
@@ -793,6 +838,8 @@ class WorkflowService:
) )
else: else:
logger.error(f"unexpect workflow run status, status: {status}") logger.error(f"unexpect workflow run status, status: {status}")
elif event.get("event") == "workflow_start":
event["data"]["message_id"] = str(message_id)
event = self._emit(public, event) event = self._emit(public, event)
if event: if event:
yield event yield event