From ccc67df8df72f91022f97253c950f1d238bd8eb2 Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Fri, 6 Mar 2026 15:44:37 +0800 Subject: [PATCH] feat(workflow): support multimodal context --- api/app/core/workflow/executor.py | 84 +++++++++++++++++------- api/app/core/workflow/nodes/base_node.py | 17 +++-- api/app/core/workflow/nodes/llm/node.py | 28 ++++++-- api/app/services/workflow_service.py | 71 +++++++++++++++----- 4 files changed, 148 insertions(+), 52 deletions(-) diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 78149e4c..ff979f2b 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -158,18 +158,36 @@ class WorkflowExecutor: full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) # Append messages for user and assistant - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "assistant", - "content": full_content - } - ] - ) + if input_data.get("files"): + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "user", + "content": input_data.get("files") + }, + { + "role": "assistant", + "content": full_content + } + ] + ) + else: + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "assistant", + "content": full_content + } + ] + ) # Calculate elapsed time end_time = datetime.datetime.now() elapsed_time = (end_time - start_time).total_seconds() @@ -308,18 +326,36 @@ class WorkflowExecutor: elapsed_time = (end_time - start_time).total_seconds() # Append messages for user and assistant - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "assistant", - "content": full_content - } - ] - ) + if input_data.get("files"): + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "user", + "content": input_data.get("files") + }, + { + "role": "assistant", + "content": full_content + } + ] + ) + else: + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "assistant", + "content": full_content + } + ] + ) logger.info( f"Workflow execution completed (streaming), " f"elapsed: {elapsed_time:.2f}ms, execution_id: {self.execution_context.execution_id}" diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 3f30718c..dacbef85 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -617,10 +617,19 @@ class BaseNode(ABC): return variable_pool.has(selector) @staticmethod - async def process_message(provider: str, content: str | FileObject, enable_file=False) -> dict | str | None: + async def process_message(provider: str, content: str | dict | FileObject, enable_file=False) -> list | str | None: + if isinstance(content, dict): + content = FileObject( + type=content.get("type"), + url=content.get("url"), + transfer_method=content.get("transfer_method"), + origin_file_type=content.get("origin_file_type"), + file_id=content.get("file_id"), + is_file=True + ) if isinstance(content, str): if enable_file: - return {"text": content} + return [{"text": content}] return content elif isinstance(content, FileObject): @@ -639,8 +648,8 @@ class BaseNode(ABC): ) if message: - content.content_cache[provider] = message[0] - return message[0] + content.content_cache[provider] = message + return message return None raise TypeError(f'Unexpect input value type - {type(content)}') diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py index c109d59b..4b63bc4e 100644 --- a/api/app/core/workflow/nodes/llm/node.py +++ b/api/app/core/workflow/nodes/llm/node.py @@ -151,23 +151,23 @@ class LLMNode(BaseNode): if role == "system": messages.append({ "role": "system", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) elif role in ["user", "human"]: messages.append({ "role": "user", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) elif role in ["ai", "assistant"]: messages.append({ "role": "assistant", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) else: logger.warning(f"未知的消息角色: {role},默认使用 user") messages.append({ "role": "user", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) if self.typed_config.vision_input and self.typed_config.vision: @@ -176,14 +176,28 @@ class LLMNode(BaseNode): for file in files.value: content = await self.process_message(provider, file.value, self.typed_config.vision) if content: - file_content.append(content) + file_content.extend(content) if messages and messages[-1]["role"] == 'user': - messages[-1]['content'] = [messages[-1]["content"]] + file_content + messages[-1]['content'] = messages[-1]["content"] + file_content else: messages.append({"role": "user", "content": file_content}) if self.typed_config.memory.enable: - messages = messages[:-1] + state["messages"][-self.typed_config.memory.window_size:] + messages[-1:] + history_message = [] + for message in state["messages"][-self.typed_config.memory.window_size:]: + if isinstance(message["content"], list): + file_content = [] + for file in message["content"]: + content = await self.process_message(provider, file, self.typed_config.vision) + if content: + file_content.extend(content) + history_message.append( + {"role": message["role"], "content": file_content} + ) + else: + message["content"] = await self.process_message(provider, message["content"], self.typed_config.vision) + history_message.append(message) + messages = messages[:-1] + history_message + messages[-1:] self.messages = messages else: # 使用简单的 prompt 格式(向后兼容) diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index ce27e276..bd940387 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -25,7 +25,7 @@ from app.repositories.workflow_repository import ( WorkflowExecutionRepository, WorkflowNodeExecutionRepository ) -from app.schemas import DraftRunRequest, FileInput +from app.schemas import DraftRunRequest, FileInput, FileType from app.services.conversation_service import ConversationService from app.services.multi_agent_service import convert_uuids_to_str from app.services.multimodal_service import MultimodalService @@ -601,6 +601,7 @@ class WorkflowService: try: files = await self._handle_file_input(payload.files) input_data["files"] = files + message_id = uuid.uuid4() # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") @@ -630,15 +631,32 @@ class WorkflowService: token_usage = result.get("token_usage", {}) or {} final_messages = result.get("messages", [])[init_message_length:] + human_message = "" + assistant_message = "" for message in final_messages: - message_obj = self.conversation_service.add_message( - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) - if message["role"] != "user": - result["message_id"] = str(message_obj.id) + if message["role"] == "user": + if isinstance(message["content"], str): + human_message += message["content"] + elif isinstance(message["content"], dict): + if message["content"].get("type") == FileType.IMAGE: + human_message += f"![image]({message['content'].get('url', '')})" + else: + human_message += f"[{FileType}]({message['content'].get('url', '')})" + if message["role"] == "assistant": + assistant_message = message["content"] + self.conversation_service.add_message( + conversation_id=conversation_id_uuid, + role="user", + content=human_message, + meta_data=None + ) + self.conversation_service.add_message( + message_id=message_id, + conversation_id=conversation_id_uuid, + role="assistant", + content=assistant_message, + meta_data={"usage": token_usage} + ) self.update_execution_status( execution.execution_id, "completed", @@ -664,7 +682,7 @@ class WorkflowService: # "messages": result.get("messages"), "output": result.get("output"), # 最终输出(字符串) "message": result.get("output"), # 最终输出(字符串) - "message_id": result.get("message_id"), + "message_id": str(message_id), # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID "error_message": result.get("error"), @@ -775,14 +793,33 @@ class WorkflowService: token_usage = event.get("data", {}).get("token_usage", {}) or {} if status == "completed": final_messages = event.get("data", {}).get("messages", [])[init_message_length:] + human_message = "" + assistant_message = "" for message in final_messages: - self.conversation_service.add_message( - message_id=message_id if message["role"] != "user" else uuid.uuid4(), - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) + if message["role"] == "user": + if isinstance(message["content"], str): + human_message += message["content"] + elif isinstance(message["content"], list): + for file in message["content"]: + if file.get("type") == FileType.IMAGE: + human_message += f"![image]({file.get('url', '')})" + else: + human_message += f"[{file.get("type")}]({file.get('url', '')})" + if message["role"] == "assistant": + assistant_message = message["content"] + self.conversation_service.add_message( + conversation_id=conversation_id_uuid, + role="user", + content=human_message, + meta_data=None + ) + self.conversation_service.add_message( + message_id=message_id, + conversation_id=conversation_id_uuid, + role="assistant", + content=assistant_message, + meta_data={"usage": token_usage} + ) self.update_execution_status( execution.execution_id, "completed",