diff --git a/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py b/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py index c8cc0460..784e5802 100644 --- a/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py +++ b/api/app/core/memory/agent/langgraph_graph/nodes/problem_nodes.py @@ -111,7 +111,7 @@ async def Split_The_Problem(state: ReadState) -> ReadState: "error_type": type(e).__name__, "error_message": str(e), "content_length": len(content), - "llm_model_id": memory_config.llm_model_id if memory_config else None + "llm_model_id": str(memory_config.llm_model_id) if memory_config else None } logger.error(f"Split_The_Problem error details: {error_details}") @@ -221,7 +221,7 @@ async def Problem_Extension(state: ReadState) -> ReadState: "error_type": type(e).__name__, "error_message": str(e), "questions_count": len(databasets), - "llm_model_id": memory_config.llm_model_id if memory_config else None + "llm_model_id": str(memory_config.llm_model_id) if memory_config else None } logger.error(f"Problem_Extension error details: {error_details}") diff --git a/api/app/core/workflow/adapters/dify/converter.py b/api/app/core/workflow/adapters/dify/converter.py index 06c988d3..3c9348c7 100644 --- a/api/app/core/workflow/adapters/dify/converter.py +++ b/api/app/core/workflow/adapters/dify/converter.py @@ -129,11 +129,11 @@ class DifyConverter(BaseConverter): @staticmethod def _convert_file(var): - pass + return None @staticmethod def _convert_array_file(var): - pass + return [] @staticmethod def variable_type_map(source_type) -> VariableType | None: @@ -198,7 +198,7 @@ class DifyConverter(BaseConverter): "over-write": AssignmentOperator.COVER, "remove-last": AssignmentOperator.REMOVE_LAST, "remove-first": AssignmentOperator.REMOVE_FIRST, - + "set": AssignmentOperator.ASSIGN, } return operator_map.get(operator, operator) @@ -267,10 +267,10 @@ class DifyConverter(BaseConverter): type=var_type, required=var["required"], default=self.convert_variable_type( - var_type, var["default"] + var_type, var.get("default") ), description=var["label"], - max_length=var.get("max_length"), + max_length=var.get("max_length", 50), ) start_vars.append(var_def) result = StartNodeConfig.model_construct( @@ -333,7 +333,7 @@ class DifyConverter(BaseConverter): MessageConfig( role="user", content=self.trans_variable_format( - node_data["memory"].get("query_prompt_template", "{{#sys.query#}}") + node_data["memory"].get("query_prompt_template") or "{{#sys.query#}}" ) ) ) @@ -612,7 +612,7 @@ class DifyConverter(BaseConverter): ), headers=headers, params=params, - verify_ssl=node_data["ssl_verify"], + verify_ssl=node_data.get("ssl_verify", False), timeouts=HttpTimeOutConfig.model_construct( connect_timeout=node_data["timeout"]["max_connect_timeout"] or 5, read_timeout=node_data["timeout"]["max_read_timeout"] or 5, @@ -696,7 +696,7 @@ class DifyConverter(BaseConverter): group_variables = {} group_type = {} if not advanced_settings or not advanced_settings["group_enabled"]: - group_variables["output"] = [ + group_variables = [ self._process_list_variable_litearl(variable) for variable in node_data["variables"] ] diff --git a/api/app/core/workflow/adapters/dify/dify_adapter.py b/api/app/core/workflow/adapters/dify/dify_adapter.py index 6336b1f9..5b506d16 100644 --- a/api/app/core/workflow/adapters/dify/dify_adapter.py +++ b/api/app/core/workflow/adapters/dify/dify_adapter.py @@ -83,6 +83,12 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter): require_fields = frozenset({'app', 'kind', 'version', 'workflow'}) if not all(field in self.config for field in require_fields): return False + if self.config.get("app",{}).get("mode") == "workflow": + self.errors.append(ExceptionDefineition( + type=ExceptionType.PLATFORM, + detail="workflow mode is not supported" + )) + return False for node in self.origin_nodes: if not self._valid_nodes(node): @@ -134,6 +140,8 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter): for node in self.origin_nodes: if self.map_node_type(node["data"]["type"]) == NodeType.LLM: self.node_output_map[f"{node['id']}.text"] = f"{node['id']}.output" + elif self.map_node_type(node["data"]["type"]) == NodeType.KNOWLEDGE_RETRIEVAL: + self.node_output_map[f"{node['id']}.result"] = f"{node['id']}.output" def _convert_cycle_node_position(self, node_id: str, position: dict): for node in self.origin_nodes: @@ -184,7 +192,7 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter): type=ExceptionType.NODE, node_id=node["id"], node_name=node["data"]["title"], - detail=f"node type {node_type} is unsupported", + detail=f"node type {node_type if node_type else 'notes'} is unsupported", )) return converter(node) except Exception as e: diff --git a/api/app/core/workflow/engine/graph_builder.py b/api/app/core/workflow/engine/graph_builder.py index 7b5c059c..5e4569ad 100644 --- a/api/app/core/workflow/engine/graph_builder.py +++ b/api/app/core/workflow/engine/graph_builder.py @@ -320,7 +320,7 @@ class GraphBuilder: # Used later to determine which branch to take based on the node's output # Assumes node output `node..output` matches the edge's label # For example, if node.123.output == 'CASE1', take the branch labeled 'CASE1' - related_edge[idx]['condition'] = f"node.{node_id}.output == '{related_edge[idx]['label']}'" + related_edge[idx]['condition'] = f"node['{node_id}']['output'] == '{related_edge[idx]['label']}'" if node_instance: # Wrap node's run method to avoid closure issues diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 78149e4c..ff979f2b 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -158,18 +158,36 @@ class WorkflowExecutor: full_content += self.variable_pool.get_value(f"{end_id}.output", default="", strict=False) # Append messages for user and assistant - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "assistant", - "content": full_content - } - ] - ) + if input_data.get("files"): + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "user", + "content": input_data.get("files") + }, + { + "role": "assistant", + "content": full_content + } + ] + ) + else: + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "assistant", + "content": full_content + } + ] + ) # Calculate elapsed time end_time = datetime.datetime.now() elapsed_time = (end_time - start_time).total_seconds() @@ -308,18 +326,36 @@ class WorkflowExecutor: elapsed_time = (end_time - start_time).total_seconds() # Append messages for user and assistant - result["messages"].extend( - [ - { - "role": "user", - "content": input_data.get("message", '') - }, - { - "role": "assistant", - "content": full_content - } - ] - ) + if input_data.get("files"): + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "user", + "content": input_data.get("files") + }, + { + "role": "assistant", + "content": full_content + } + ] + ) + else: + result["messages"].extend( + [ + { + "role": "user", + "content": input_data.get("message", '') + }, + { + "role": "assistant", + "content": full_content + } + ] + ) logger.info( f"Workflow execution completed (streaming), " f"elapsed: {elapsed_time:.2f}ms, execution_id: {self.execution_context.execution_id}" diff --git a/api/app/core/workflow/nodes/base_config.py b/api/app/core/workflow/nodes/base_config.py index 973e120d..4ae89376 100644 --- a/api/app/core/workflow/nodes/base_config.py +++ b/api/app/core/workflow/nodes/base_config.py @@ -85,20 +85,20 @@ class BaseNodeConfig(BaseModel): - tags: 节点标签(用于分类和搜索) """ - name: str | None = Field( - default=None, - description="节点名称(显示名称),如果不设置则使用节点 ID" - ) - - description: str | None = Field( - default=None, - description="节点描述,说明节点的作用" - ) - - tags: list[str] = Field( - default_factory=list, - description="节点标签,用于分类和搜索" - ) + # name: str | None = Field( + # default=None, + # description="节点名称(显示名称),如果不设置则使用节点 ID" + # ) + # + # description: str | None = Field( + # default=None, + # description="节点描述,说明节点的作用" + # ) + # + # tags: list[str] = Field( + # default_factory=list, + # description="节点标签,用于分类和搜索" + # ) class Config: """Pydantic 配置""" diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 3f30718c..dacbef85 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -617,10 +617,19 @@ class BaseNode(ABC): return variable_pool.has(selector) @staticmethod - async def process_message(provider: str, content: str | FileObject, enable_file=False) -> dict | str | None: + async def process_message(provider: str, content: str | dict | FileObject, enable_file=False) -> list | str | None: + if isinstance(content, dict): + content = FileObject( + type=content.get("type"), + url=content.get("url"), + transfer_method=content.get("transfer_method"), + origin_file_type=content.get("origin_file_type"), + file_id=content.get("file_id"), + is_file=True + ) if isinstance(content, str): if enable_file: - return {"text": content} + return [{"text": content}] return content elif isinstance(content, FileObject): @@ -639,8 +648,8 @@ class BaseNode(ABC): ) if message: - content.content_cache[provider] = message[0] - return message[0] + content.content_cache[provider] = message + return message return None raise TypeError(f'Unexpect input value type - {type(content)}') diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py index c109d59b..4b63bc4e 100644 --- a/api/app/core/workflow/nodes/llm/node.py +++ b/api/app/core/workflow/nodes/llm/node.py @@ -151,23 +151,23 @@ class LLMNode(BaseNode): if role == "system": messages.append({ "role": "system", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) elif role in ["user", "human"]: messages.append({ "role": "user", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) elif role in ["ai", "assistant"]: messages.append({ "role": "assistant", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) else: logger.warning(f"未知的消息角色: {role},默认使用 user") messages.append({ "role": "user", - "content": content + "content": await self.process_message(provider, content, self.typed_config.vision) }) if self.typed_config.vision_input and self.typed_config.vision: @@ -176,14 +176,28 @@ class LLMNode(BaseNode): for file in files.value: content = await self.process_message(provider, file.value, self.typed_config.vision) if content: - file_content.append(content) + file_content.extend(content) if messages and messages[-1]["role"] == 'user': - messages[-1]['content'] = [messages[-1]["content"]] + file_content + messages[-1]['content'] = messages[-1]["content"] + file_content else: messages.append({"role": "user", "content": file_content}) if self.typed_config.memory.enable: - messages = messages[:-1] + state["messages"][-self.typed_config.memory.window_size:] + messages[-1:] + history_message = [] + for message in state["messages"][-self.typed_config.memory.window_size:]: + if isinstance(message["content"], list): + file_content = [] + for file in message["content"]: + content = await self.process_message(provider, file, self.typed_config.vision) + if content: + file_content.extend(content) + history_message.append( + {"role": message["role"], "content": file_content} + ) + else: + message["content"] = await self.process_message(provider, message["content"], self.typed_config.vision) + history_message.append(message) + messages = messages[:-1] + history_message + messages[-1:] self.messages = messages else: # 使用简单的 prompt 格式(向后兼容) diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 5430d2f9..f3cdde2a 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -144,7 +144,7 @@ class AppChatService: ) # 保存消息 - self.conversation_service.save_conversation_messages( + message_id = self.conversation_service.save_conversation_messages( conversation_id=conversation_id, user_message=message, assistant_message=result["content"], @@ -163,6 +163,7 @@ class AppChatService: return { "conversation_id": conversation_id, + "message_id": str(message_id), "message": result["content"], "usage": result.get("usage", { "prompt_tokens": 0, @@ -191,7 +192,11 @@ class AppChatService: try: start_time = time.time() config_id = None - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" + message_id = uuid.uuid4() + yield f"event: start\ndata: {json.dumps({ + 'conversation_id': str(conversation_id), + "message_id": str(message_id) + }, ensure_ascii=False)}\n\n" variables = self.agent_service.prepare_variables(variables, config.variables) # 获取模型配置ID @@ -296,6 +301,7 @@ class AppChatService: ) self.conversation_service.add_message( + message_id=message_id, conversation_id=conversation_id, role="assistant", content=full_content, @@ -373,7 +379,7 @@ class AppChatService: content=message ) - self.conversation_service.add_message( + ai_message = self.conversation_service.add_message( conversation_id=conversation_id, role="assistant", content=result.get("message", ""), @@ -391,6 +397,7 @@ class AppChatService: return { "conversation_id": conversation_id, "message": result.get("message", ""), + "message_id": str(ai_message.id), "usage": { "prompt_tokens": 0, "completion_tokens": 0, @@ -419,9 +426,9 @@ class AppChatService: variables = {} try: - + message_id = uuid.uuid4() # 发送开始事件 - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" + yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id), "message_id": str(message_id)}, ensure_ascii=False)}\n\n" full_content = "" total_tokens = 0 @@ -429,6 +436,7 @@ class AppChatService: # 2. 创建编排器 orchestrator = MultiAgentOrchestrator(self.db, config) + # 3. 流式执行任务 async for event in orchestrator.execute_stream( message=message, @@ -472,6 +480,7 @@ class AppChatService: ) self.conversation_service.add_message( + message_id=message_id, conversation_id=conversation_id, role="assistant", content=full_content, diff --git a/api/app/services/conversation_service.py b/api/app/services/conversation_service.py index 553aefc4..aff5f533 100644 --- a/api/app/services/conversation_service.py +++ b/api/app/services/conversation_service.py @@ -178,7 +178,8 @@ class ConversationService: conversation_id: uuid.UUID, role: str, content: str, - meta_data: Optional[dict] = None + meta_data: Optional[dict] = None, + message_id: Optional[uuid.UUID] = None, ) -> Message: """ Add a message to a conversation using UnitOfWork. @@ -188,6 +189,7 @@ class ConversationService: role (str): Role of the message sender ('user' or 'assistant'). content (str): Message content. meta_data (Optional[dict]): Optional metadata. + message_id (Optional[uuid.UUID]): Optional custom message UUID. Returns: Message: Newly created Message instance. @@ -198,6 +200,7 @@ class ConversationService: ) message = Message( + id=message_id if message_id else uuid.uuid4(), conversation_id=conversation_id, role=role, content=content, @@ -317,7 +320,7 @@ class ConversationService: content=user_message ) - self.add_message( + ai_message = self.add_message( conversation_id=conversation_id, role="assistant", content=assistant_message, @@ -332,6 +335,7 @@ class ConversationService: "assistant_message_length": len(assistant_message) } ) + return ai_message.id def delete_conversation( self, diff --git a/api/app/services/workflow_import_service.py b/api/app/services/workflow_import_service.py index 2e17f404..2b36c5ea 100644 --- a/api/app/services/workflow_import_service.py +++ b/api/app/services/workflow_import_service.py @@ -56,7 +56,7 @@ class WorkflowImportService: success=False, temp_id=None, workflow_id=None, - errors=[InvalidConfiguration()] + errors=[InvalidConfiguration()] + adapter.errors ) workflow_config = adapter.parse_workflow() diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index d13e3454..eaf78b90 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -25,7 +25,7 @@ from app.repositories.workflow_repository import ( WorkflowExecutionRepository, WorkflowNodeExecutionRepository ) -from app.schemas import DraftRunRequest, FileInput +from app.schemas import DraftRunRequest, FileInput, FileType from app.services.conversation_service import ConversationService from app.services.multi_agent_service import convert_uuids_to_str from app.services.multimodal_service import MultimodalService @@ -496,6 +496,7 @@ class WorkflowService: "event": "start", "data": { "conversation_id": payload.get("conversation_id"), + "message_id": payload.get("message_id") } } case "workflow_end": @@ -600,6 +601,7 @@ class WorkflowService: try: files = await self._handle_file_input(payload.files) input_data["files"] = files + message_id = uuid.uuid4() # 更新状态为运行中 self.update_execution_status(execution.execution_id, "running") @@ -624,24 +626,45 @@ class WorkflowService: workspace_id=str(workspace_id), user_id=payload.user_id ) - # 更新执行结果 if result.get("status") == "completed": token_usage = result.get("token_usage", {}) or {} + + final_messages = result.get("messages", [])[init_message_length:] + human_message = "" + assistant_message = "" + for message in final_messages: + if message["role"] == "user": + if isinstance(message["content"], str): + human_message += message["content"] + elif isinstance(message["content"], list): + for file in message["content"]: + if file.get("type") == FileType.IMAGE: + human_message += f"![image]({file.get('url', '')})" + else: + human_message += f"[{file.get('type')}]({file.get('url', '')})" + if message["role"] == "assistant": + assistant_message = message["content"] + self.conversation_service.add_message( + conversation_id=conversation_id_uuid, + role="user", + content=human_message, + meta_data=None + ) + self.conversation_service.add_message( + message_id=message_id, + conversation_id=conversation_id_uuid, + role="assistant", + content=assistant_message, + meta_data={"usage": token_usage} + ) self.update_execution_status( execution.execution_id, "completed", output_data=result, token_usage=token_usage.get("total_tokens", None) ) - final_messages = result.get("messages", [])[init_message_length:] - for message in final_messages: - self.conversation_service.add_message( - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) + logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") else: @@ -650,6 +673,8 @@ class WorkflowService: "failed", error_message=result.get("error") ) + logger.error(f"Workflow Run Failed, execution_id: {execution.execution_id}," + f" error: {result.get('error')}") # 返回增强的响应结构 return { @@ -659,6 +684,7 @@ class WorkflowService: # "messages": result.get("messages"), "output": result.get("output"), # 最终输出(字符串) "message": result.get("output"), # 最终输出(字符串) + "message_id": str(message_id), # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID "error_message": result.get("error"), @@ -756,7 +782,7 @@ class WorkflowService: input_data["conv_messages"] = last_state.get("messages") or [] break init_message_length = len(input_data.get("conv_messages", [])) - + message_id = uuid.uuid4() async for event in execute_workflow_stream( workflow_config=workflow_config_dict, input_data=input_data, @@ -765,24 +791,43 @@ class WorkflowService: user_id=payload.user_id, ): if event.get("event") == "workflow_end": - status = event.get("data", {}).get("status") token_usage = event.get("data", {}).get("token_usage", {}) or {} if status == "completed": + final_messages = event.get("data", {}).get("messages", [])[init_message_length:] + human_message = "" + assistant_message = "" + for message in final_messages: + if message["role"] == "user": + if isinstance(message["content"], str): + human_message += message["content"] + elif isinstance(message["content"], list): + for file in message["content"]: + if file.get("type") == FileType.IMAGE: + human_message += f"![image]({file.get('url', '')})" + else: + human_message += f"[{file.get('type')}]({file.get('url', '')})" + if message["role"] == "assistant": + assistant_message = message["content"] + self.conversation_service.add_message( + conversation_id=conversation_id_uuid, + role="user", + content=human_message, + meta_data=None + ) + self.conversation_service.add_message( + message_id=message_id, + conversation_id=conversation_id_uuid, + role="assistant", + content=assistant_message, + meta_data={"usage": token_usage} + ) self.update_execution_status( execution.execution_id, "completed", output_data=event.get("data"), token_usage=token_usage.get("total_tokens", None) ) - final_messages = event.get("data", {}).get("messages", [])[init_message_length:] - for message in final_messages: - self.conversation_service.add_message( - conversation_id=conversation_id_uuid, - role=message["role"], - content=message["content"], - meta_data=None if message["role"] == "user" else {"usage": token_usage} - ) logger.info(f"Workflow Run Success, " f"execution_id: {execution.execution_id}, message count: {len(final_messages)}") elif status == "failed": @@ -793,6 +838,8 @@ class WorkflowService: ) else: logger.error(f"unexpect workflow run status, status: {status}") + elif event.get("event") == "workflow_start": + event["data"]["message_id"] = str(message_id) event = self._emit(public, event) if event: yield event