feat(workflow): support multimodal context

This commit is contained in:
Eternity
2026-03-06 15:44:37 +08:00
parent e833db954a
commit ccc67df8df
4 changed files with 148 additions and 52 deletions

View File

@@ -158,18 +158,36 @@ class WorkflowExecutor:
full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False)
# Append messages for user and assistant
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
if input_data.get("files"):
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "user",
"content": input_data.get("files")
},
{
"role": "assistant",
"content": full_content
}
]
)
else:
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
# Calculate elapsed time
end_time = datetime.datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
@@ -308,18 +326,36 @@ class WorkflowExecutor:
elapsed_time = (end_time - start_time).total_seconds()
# Append messages for user and assistant
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
if input_data.get("files"):
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "user",
"content": input_data.get("files")
},
{
"role": "assistant",
"content": full_content
}
]
)
else:
result["messages"].extend(
[
{
"role": "user",
"content": input_data.get("message", '')
},
{
"role": "assistant",
"content": full_content
}
]
)
logger.info(
f"Workflow execution completed (streaming), "
f"elapsed: {elapsed_time:.2f}ms, execution_id: {self.execution_context.execution_id}"

View File

@@ -617,10 +617,19 @@ class BaseNode(ABC):
return variable_pool.has(selector)
@staticmethod
async def process_message(provider: str, content: str | FileObject, enable_file=False) -> dict | str | None:
async def process_message(provider: str, content: str | dict | FileObject, enable_file=False) -> list | str | None:
if isinstance(content, dict):
content = FileObject(
type=content.get("type"),
url=content.get("url"),
transfer_method=content.get("transfer_method"),
origin_file_type=content.get("origin_file_type"),
file_id=content.get("file_id"),
is_file=True
)
if isinstance(content, str):
if enable_file:
return {"text": content}
return [{"text": content}]
return content
elif isinstance(content, FileObject):
@@ -639,8 +648,8 @@ class BaseNode(ABC):
)
if message:
content.content_cache[provider] = message[0]
return message[0]
content.content_cache[provider] = message
return message
return None
raise TypeError(f'Unexpect input value type - {type(content)}')

View File

@@ -151,23 +151,23 @@ class LLMNode(BaseNode):
if role == "system":
messages.append({
"role": "system",
"content": content
"content": await self.process_message(provider, content, self.typed_config.vision)
})
elif role in ["user", "human"]:
messages.append({
"role": "user",
"content": content
"content": await self.process_message(provider, content, self.typed_config.vision)
})
elif role in ["ai", "assistant"]:
messages.append({
"role": "assistant",
"content": content
"content": await self.process_message(provider, content, self.typed_config.vision)
})
else:
logger.warning(f"未知的消息角色: {role},默认使用 user")
messages.append({
"role": "user",
"content": content
"content": await self.process_message(provider, content, self.typed_config.vision)
})
if self.typed_config.vision_input and self.typed_config.vision:
@@ -176,14 +176,28 @@ class LLMNode(BaseNode):
for file in files.value:
content = await self.process_message(provider, file.value, self.typed_config.vision)
if content:
file_content.append(content)
file_content.extend(content)
if messages and messages[-1]["role"] == 'user':
messages[-1]['content'] = [messages[-1]["content"]] + file_content
messages[-1]['content'] = messages[-1]["content"] + file_content
else:
messages.append({"role": "user", "content": file_content})
if self.typed_config.memory.enable:
messages = messages[:-1] + state["messages"][-self.typed_config.memory.window_size:] + messages[-1:]
history_message = []
for message in state["messages"][-self.typed_config.memory.window_size:]:
if isinstance(message["content"], list):
file_content = []
for file in message["content"]:
content = await self.process_message(provider, file, self.typed_config.vision)
if content:
file_content.extend(content)
history_message.append(
{"role": message["role"], "content": file_content}
)
else:
message["content"] = await self.process_message(provider, message["content"], self.typed_config.vision)
history_message.append(message)
messages = messages[:-1] + history_message + messages[-1:]
self.messages = messages
else:
# 使用简单的 prompt 格式(向后兼容)

View File

@@ -25,7 +25,7 @@ from app.repositories.workflow_repository import (
WorkflowExecutionRepository,
WorkflowNodeExecutionRepository
)
from app.schemas import DraftRunRequest, FileInput
from app.schemas import DraftRunRequest, FileInput, FileType
from app.services.conversation_service import ConversationService
from app.services.multi_agent_service import convert_uuids_to_str
from app.services.multimodal_service import MultimodalService
@@ -601,6 +601,7 @@ class WorkflowService:
try:
files = await self._handle_file_input(payload.files)
input_data["files"] = files
message_id = uuid.uuid4()
# 更新状态为运行中
self.update_execution_status(execution.execution_id, "running")
@@ -630,15 +631,32 @@ class WorkflowService:
token_usage = result.get("token_usage", {}) or {}
final_messages = result.get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
for message in final_messages:
message_obj = self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
if message["role"] != "user":
result["message_id"] = str(message_obj.id)
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], dict):
if message["content"].get("type") == FileType.IMAGE:
human_message += f"![image]({message['content'].get('url', '')})"
else:
human_message += f"[{FileType}]({message['content'].get('url', '')})"
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
)
self.update_execution_status(
execution.execution_id,
"completed",
@@ -664,7 +682,7 @@ class WorkflowService:
# "messages": result.get("messages"),
"output": result.get("output"), # 最终输出(字符串)
"message": result.get("output"), # 最终输出(字符串)
"message_id": result.get("message_id"),
"message_id": str(message_id),
# "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
"error_message": result.get("error"),
@@ -775,14 +793,33 @@ class WorkflowService:
token_usage = event.get("data", {}).get("token_usage", {}) or {}
if status == "completed":
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
human_message = ""
assistant_message = ""
for message in final_messages:
self.conversation_service.add_message(
message_id=message_id if message["role"] != "user" else uuid.uuid4(),
conversation_id=conversation_id_uuid,
role=message["role"],
content=message["content"],
meta_data=None if message["role"] == "user" else {"usage": token_usage}
)
if message["role"] == "user":
if isinstance(message["content"], str):
human_message += message["content"]
elif isinstance(message["content"], list):
for file in message["content"]:
if file.get("type") == FileType.IMAGE:
human_message += f"![image]({file.get('url', '')})"
else:
human_message += f"[{file.get("type")}]({file.get('url', '')})"
if message["role"] == "assistant":
assistant_message = message["content"]
self.conversation_service.add_message(
conversation_id=conversation_id_uuid,
role="user",
content=human_message,
meta_data=None
)
self.conversation_service.add_message(
message_id=message_id,
conversation_id=conversation_id_uuid,
role="assistant",
content=assistant_message,
meta_data={"usage": token_usage}
)
self.update_execution_status(
execution.execution_id,
"completed",