From 5cf2b087771af886debc1f681c5ee89f5e21028b Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Mon, 2 Mar 2026 14:52:51 +0800 Subject: [PATCH 1/2] fix(workflow): handle non-stream output field changes, add file type support to HTTP node, fix iteration node flattening bug --- .../core/workflow/adapters/dify/converter.py | 2 +- api/app/core/workflow/engine/variable_pool.py | 16 +++++++- .../core/workflow/nodes/cycle_graph/node.py | 2 +- .../core/workflow/nodes/http_request/node.py | 35 ++++++++++++---- .../workflow/variable/variable_objects.py | 41 ++++++++++++++----- api/app/services/workflow_service.py | 1 + 6 files changed, 76 insertions(+), 21 deletions(-) diff --git a/api/app/core/workflow/adapters/dify/converter.py b/api/app/core/workflow/adapters/dify/converter.py index 18beef15..798b78b7 100644 --- a/api/app/core/workflow/adapters/dify/converter.py +++ b/api/app/core/workflow/adapters/dify/converter.py @@ -671,4 +671,4 @@ class DifyConverter(BaseConverter): type=ExceptionType.CONFIG, detail=f"Please reconfigure the tool node.", )) - return {} \ No newline at end of file + return {} diff --git a/api/app/core/workflow/engine/variable_pool.py b/api/app/core/workflow/engine/variable_pool.py index 22be08c8..fd28eba5 100644 --- a/api/app/core/workflow/engine/variable_pool.py +++ b/api/app/core/workflow/engine/variable_pool.py @@ -73,7 +73,7 @@ class VariableStruct(BaseModel, Generic[T]): instance: The concrete variable object. The actual Python type is represented by the generic parameter ``T`` (e.g. StringVariable, - NumberVariable, ArrayObject[StringVariable]). + NumberVariable, ArrayVariable[StringVariable]). mut: Whether the variable is mutable. """ @@ -152,6 +152,20 @@ class VariablePool: return None return var_instance + def get_instance( + self, + selector: str, + default: Any = None, + strict: bool = True + ): + variable_struct = self._get_variable_struct(selector) + if variable_struct is None: + if strict: + raise KeyError(f"{selector} not exist") + return default + + return variable_struct.instance + def get_value( self, selector: str, diff --git a/api/app/core/workflow/nodes/cycle_graph/node.py b/api/app/core/workflow/nodes/cycle_graph/node.py index f2912e2c..71e0dbdb 100644 --- a/api/app/core/workflow/nodes/cycle_graph/node.py +++ b/api/app/core/workflow/nodes/cycle_graph/node.py @@ -66,7 +66,7 @@ class CycleGraphNode(BaseNode): if config.flatten: outputs['output'] = config.output_type else: - outputs['output'] = VariableType.ARRAY_STRING + outputs['output'] = VariableType.NESTED_ARRAY else: outputs['output'] = VariableType(f"array[{config.output_type}]") return outputs diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index df899940..e6c00eff 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import uuid from typing import Any, Callable, Coroutine import httpx @@ -13,6 +14,7 @@ from app.core.workflow.nodes.base_node import BaseNode from app.core.workflow.nodes.enums import HttpRequestMethod, HttpErrorHandle, HttpAuthType, HttpContentType from app.core.workflow.nodes.http_request.config import HttpRequestNodeConfig, HttpRequestNodeOutput from app.core.workflow.variable.base_variable import VariableType +from app.core.workflow.variable.variable_objects import FileVariable, ArrayVariable logger = logging.getLogger(__file__) @@ -115,7 +117,7 @@ class HttpRequestNode(BaseNode): params[self._render_template(key, variable_pool)] = self._render_template(value, variable_pool) return params - def _build_content(self, variable_pool: VariablePool) -> dict[str, Any]: + async def _build_content(self, variable_pool: VariablePool) -> dict[str, Any]: """ Build HTTP request body arguments for httpx request methods. @@ -135,16 +137,35 @@ class HttpRequestNode(BaseNode): )) case HttpContentType.FROM_DATA: data = {} + content["files"] = {} for item in self.typed_config.body.data: if item.type == "text": - data[self._render_template(item.key, variable_pool)] = self._render_template(item.value, variable_pool) + data[self._render_template(item.key, variable_pool)] = self._render_template(item.value, + variable_pool) elif item.type == "file": - # TODO: File support (Feature) - pass + content["files"][self._render_template(item.key, variable_pool)] = ( + uuid.uuid4().hex, + await variable_pool.get_instance(item.value).get_content() + ) content["data"] = data case HttpContentType.BINARY: - # TODO: File support (Feature) - pass + content["files"] = [] + file_instence = variable_pool.get_instance(self.typed_config.body.data) + if isinstance(file_instence, ArrayVariable): + for v in file_instence.value: + if isinstance(v, FileVariable): + content["files"].append( + ( + "files", (uuid.uuid4().hex, await v.get_content()) + ) + ) + elif isinstance(file_instence, FileVariable): + content["files"].append( + ( + "file", (uuid.uuid4().hex, await file_instence.get_content()) + ) + ) + case HttpContentType.WWW_FORM: content["data"] = json.loads(self._render_template( json.dumps(self.typed_config.body.data), variable_pool @@ -207,7 +228,7 @@ class HttpRequestNode(BaseNode): request_func = self._get_client_method(client) resp = await request_func( url=self._render_template(self.typed_config.url, variable_pool), - **self._build_content(variable_pool) + **(await self._build_content(variable_pool)) ) resp.raise_for_status() logger.info(f"Node {self.node_id}: HTTP request succeeded") diff --git a/api/app/core/workflow/variable/variable_objects.py b/api/app/core/workflow/variable/variable_objects.py index 7a39835c..49541afc 100644 --- a/api/app/core/workflow/variable/variable_objects.py +++ b/api/app/core/workflow/variable/variable_objects.py @@ -1,8 +1,10 @@ from typing import Any, TypeVar, Type, Generic +import httpx from deprecated import deprecated from app.core.workflow.variable.base_variable import BaseVariable, VariableType, FileObject, FileType +from app.core.config import settings T = TypeVar("T", bound=BaseVariable) @@ -80,8 +82,23 @@ class FileVariable(BaseVariable): def get_value(self) -> Any: return self.value.model_dump() + async def get_content(self): + total_bytes = 0 + chunks = [] -class ArrayObject(BaseVariable, Generic[T]): + async with httpx.AsyncClient() as client: + async with client.stream("GET", self.value.url) as resp: + resp.raise_for_status() + async for chunk in resp.aiter_bytes(8192): + total_bytes += len(chunk) + if total_bytes > settings.MAX_FILE_SIZE: + raise ValueError(f"File too large: {total_bytes} bytes") + chunks.append(chunk) + + return b"".join(chunks) + + +class ArrayVariable(BaseVariable, Generic[T]): type = 'array' def __init__(self, child_type: Type[T], value: list[Any]): @@ -108,7 +125,7 @@ class ArrayObject(BaseVariable, Generic[T]): return [v.get_value() for v in self.value] -class NestedArrayObject(BaseVariable): +class NestedArrayVariable(BaseVariable): type = 'array_nest' def valid_value(self, value: list[T]) -> list[T]: @@ -116,23 +133,23 @@ class NestedArrayObject(BaseVariable): raise TypeError(f"Value must be a list - {type(value)}:{value}") final_value = [] for v in value: - if not isinstance(v, ArrayObject): + if not isinstance(v, list): raise TypeError("All elements must be of type list") - final_value.append(v) + final_value.append(make_array(AnyVariable, v)) return final_value def to_literal(self) -> str: - return "\n".join(["\n".join([item.to_literal() for item in row]) for row in self.value]) + return "\n".join(["\n".join([str(item) for item in row.get_value()]) for row in self.value]) def get_value(self) -> Any: - return [[item.get_value() for item in row] for row in self.value] + return [[item for item in row.get_value()] for row in self.value] @deprecated( reason="Using arbitrary-type values may cause unexpected errors; please switch to strongly-typed values.", category=RuntimeWarning ) -class AnyObject(BaseVariable): +class AnyVariable(BaseVariable): type = 'any' def valid_value(self, value: Any) -> Any: @@ -142,10 +159,10 @@ class AnyObject(BaseVariable): return str(self.value) -def make_array(child_type: Type[T], value: list[Any]) -> ArrayObject[T]: - """简化 ArrayObject 创建,不需要重复写类型""" +def make_array(child_type: Type[T], value: list[Any]) -> ArrayVariable[T]: + """简化 ArrayVariable 创建,不需要重复写类型""" - return ArrayObject(child_type, value) + return ArrayVariable(child_type, value) def create_variable_instance(var_type: VariableType, value: Any) -> T: @@ -168,7 +185,9 @@ def create_variable_instance(var_type: VariableType, value: Any) -> T: return make_array(DictVariable, value) case VariableType.ARRAY_FILE: return make_array(FileVariable, value) + case VariableType.NESTED_ARRAY: + return NestedArrayVariable(value) case VariableType.ANY: - return AnyObject(value) + return AnyVariable(value) case _: raise TypeError(f"Invalid type - {var_type}") diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 188ef6cd..ffcf8b0c 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -580,6 +580,7 @@ class WorkflowService: # "variables": result.get("variables"), # "messages": result.get("messages"), "output": result.get("output"), # 最终输出(字符串) + "message": result.get("output"), # 最终输出(字符串) # "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID "error_message": result.get("error"), From 574ab4506b755bb346d3c238ef5da6872d450921 Mon Sep 17 00:00:00 2001 From: Eternity <1533512157@qq.com> Date: Mon, 2 Mar 2026 16:05:25 +0800 Subject: [PATCH 2/2] feat(workflow): add placeholder node for unknown types --- .../core/workflow/adapters/dify/converter.py | 152 ++++++++++++------ .../workflow/adapters/dify/dify_adapter.py | 11 +- api/app/core/workflow/engine/variable_pool.py | 16 ++ api/app/core/workflow/executor.py | 64 ++++---- api/app/core/workflow/nodes/enums.py | 2 + api/app/core/workflow/nodes/node_factory.py | 4 +- 6 files changed, 163 insertions(+), 86 deletions(-) diff --git a/api/app/core/workflow/adapters/dify/converter.py b/api/app/core/workflow/adapters/dify/converter.py index 798b78b7..2014b4c3 100644 --- a/api/app/core/workflow/adapters/dify/converter.py +++ b/api/app/core/workflow/adapters/dify/converter.py @@ -12,7 +12,7 @@ from app.core.workflow.adapters.errors import UnsupportVariableType, UnknowModel ExceptionType from app.core.workflow.nodes.assigner import AssignerNodeConfig from app.core.workflow.nodes.assigner.config import AssignmentItem -from app.core.workflow.nodes.base_config import VariableDefinition +from app.core.workflow.nodes.base_config import VariableDefinition, BaseNodeConfig from app.core.workflow.nodes.code import CodeNodeConfig from app.core.workflow.nodes.code.config import InputVariable, OutputVariable from app.core.workflow.nodes.configs import StartNodeConfig, LLMNodeConfig @@ -69,9 +69,27 @@ class DifyConverter(BaseConverter): } def get_node_convert(self, node_type): - func = self.CONFIG_CONVERT_MAP.get(node_type, None) + func = self.CONFIG_CONVERT_MAP.get(node_type, lambda x: {}) return func + def config_validate( + self, + node_id: str, + node_name: str, + config: type[BaseNodeConfig], + value: dict + ): + try: + return config.model_validate(value) + except Exception as e: + self.errors.append(ExceptionDefineition( + type=ExceptionType.CONFIG, + node_id=node_id, + node_name=node_name, + detail=str(e) + )) + return None + @staticmethod def is_variable(expression) -> bool: return bool(re.match(r"\{\{#(.*?)#}}", expression)) @@ -80,14 +98,16 @@ class DifyConverter(BaseConverter): if not var_selector: return "" selector = var_selector.split('.') - if len(selector) != 2: + if len(selector) not in [2, 3]: raise Exception(f"invalid variable selector: {var_selector}") + if len(selector) == 3: + selector = selector[1:] if selector[0] == "conversation": selector[0] = "conv" var_selector = ".".join(selector) mapping = { - "sys.query": "sys.message" - } | self.node_output_map + "sys.query": "sys.message" + } | self.node_output_map var_selector = mapping.get(var_selector, var_selector) return var_selector @@ -237,7 +257,7 @@ class DifyConverter(BaseConverter): node_id=node["id"], node_name=node_data["title"], name=var["variable"], - detail=f"Unsupport Variable type for start node: {var_type}" + detail=f"Unsupported Variable type for start node: {var_type}" ) ) continue @@ -253,9 +273,11 @@ class DifyConverter(BaseConverter): max_length=var.get("max_length"), ) start_vars.append(var_def) - return StartNodeConfig( + result = StartNodeConfig.model_construct( variables=start_vars ).model_dump() + self.config_validate(node["id"], node["data"]["title"], StartNodeConfig, result) + return result def convert_question_classifier_node_config(self, node: dict) -> dict: node_data = node["data"] @@ -270,16 +292,18 @@ class DifyConverter(BaseConverter): for category in node_data["classes"]: self.branch_node_cache[node["id"]].append(category["id"]) categories.append( - ClassifierConfig( + ClassifierConfig.model_construct( class_name=category["name"], ) ) - return QuestionClassifierNodeConfig.model_construct( - input_variable=self._process_list_variable_litearl(node_data["query_variable_selector"]), - user_supplement_prompt=self.trans_variable_format(node_data["instructions"]), + result = QuestionClassifierNodeConfig.model_construct( + input_variable=self._process_list_variable_litearl(node_data.get("query_variable_selector")), + user_supplement_prompt=self.trans_variable_format(node_data.get("instructions", "")), categories=categories, ).model_dump() + self.config_validate(node["id"], node["data"]["title"], QuestionClassifierNodeConfig, result) + return result def convert_llm_node_config(self, node: dict) -> dict: node_data = node["data"] @@ -315,7 +339,7 @@ class DifyConverter(BaseConverter): vision_input = self._process_list_variable_litearl( node_data["vision"]["configs"]["variable_selector"] ) if vision else None - return LLMNodeConfig.model_construct( + result = LLMNodeConfig.model_construct( model_id=None, context=context, memory=memory, @@ -323,12 +347,16 @@ class DifyConverter(BaseConverter): vision_input=vision_input, messages=messages ).model_dump() + self.config_validate(node["id"], node["data"]["title"], LLMNodeConfig, result) + return result def convert_end_node_config(self, node: dict) -> dict: node_data = node["data"] - return EndNodeConfig( - output=self.trans_variable_format(node_data["answer"]), + result = EndNodeConfig.model_construct( + output=self.trans_variable_format(node_data.get("answer", "")), ).model_dump() + self.config_validate(node["id"], node["data"]["title"], EndNodeConfig, result) + return result def convert_if_else_node_config(self, node: dict) -> dict: node_data = node["data"] @@ -359,9 +387,11 @@ class DifyConverter(BaseConverter): ) ) self.branch_node_cache[node["id"]].append(case_id) - return IfElseNodeConfig( + result = IfElseNodeConfig.model_construct( cases=cases ).model_dump() + self.config_validate(node["id"], node["data"]["title"], IfElseNodeConfig, result) + return result def convert_loop_node_config(self, node: dict) -> dict: node_data = node["data"] @@ -370,7 +400,7 @@ class DifyConverter(BaseConverter): for condition in node_data["break_conditions"]: right_value = condition["value"] conditions.append( - LoopConditionDetail( + LoopConditionDetail.model_construct( operator=self.convert_compare_operator(condition["comparison_operator"]), left=self._process_list_variable_litearl(condition["variable_selector"]), right=self.trans_variable_format( @@ -383,7 +413,7 @@ class DifyConverter(BaseConverter): if isinstance(right_value, str) and self.is_variable(right_value) else ValueInputType.CONSTANT, ) ) - condition_config = ConditionsConfig( + condition_config = ConditionsConfig.model_construct( logical_operator=logical_operator, expressions=conditions ) @@ -392,9 +422,9 @@ class DifyConverter(BaseConverter): right_input_type = variable["value_type"] right_value_type = self.variable_type_map(variable["var_type"]) if right_input_type == ValueInputType.VARIABLE: - right_value = self._process_list_variable_litearl(variable["value"]) + right_value = self._process_list_variable_litearl(variable.get("value", "")) else: - right_value = self.convert_variable_type(right_value_type, variable["value"]) + right_value = self.convert_variable_type(right_value_type, variable.get("value", "")) loop_variables.append( CycleVariable( name=variable["label"], @@ -403,23 +433,28 @@ class DifyConverter(BaseConverter): input_type=right_input_type ) ) - return LoopNodeConfig( + result = LoopNodeConfig.model_construct( condition=condition_config, cycle_vars=loop_variables, - max_loop=node_data["loop_count"] + max_loop=node_data.get("loop_count", 10) ).model_dump() + self.config_validate(node["id"], node["data"]["title"], LoopNodeConfig, result) + return result def convert_iteration_node_config(self, node: dict) -> dict: node_data = node["data"] - return IterationNodeConfig( + result = IterationNodeConfig.model_construct( input=self._process_list_variable_litearl(node_data["iterator_selector"]), parallel=node_data["is_parallel"], parallel_count=node_data["parallel_nums"], output=self._process_list_variable_litearl(node_data["output_selector"]), - output_type=self.variable_type_map(node_data["output_type"]), + output_type=self.variable_type_map(node_data.get("output_type")), flatten=node_data["flatten_output"], ).model_dump() + self.config_validate(node["id"], node["data"]["title"], IterationNodeConfig, result) + return result + def convert_assigner_node_config(self, node: dict) -> dict: node_data = node["data"] assignments = [] @@ -435,16 +470,18 @@ class DifyConverter(BaseConverter): operation=self.convert_assignment_operator(assignment["operation"]) ) ) - return AssignerNodeConfig( + result = AssignerNodeConfig.model_construct( assignments=assignments ).model_dump() + self.config_validate(node["id"], node["data"]["title"], AssignerNodeConfig, result) + return result def convert_code_node_config(self, node: dict) -> dict: node_data = node["data"] input_variables = [] for input_variable in node_data["variables"]: input_variables.append( - InputVariable( + InputVariable.model_construct( name=input_variable["variable"], variable=self._process_list_variable_litearl(input_variable["value_selector"]), ) @@ -453,7 +490,7 @@ class DifyConverter(BaseConverter): output_variables = [] for output_variable in node_data["outputs"]: output_variables.append( - OutputVariable( + OutputVariable.model_construct( name=output_variable, type=node_data["outputs"][output_variable]["type"], ) @@ -461,18 +498,20 @@ class DifyConverter(BaseConverter): code = base64.b64encode(quote(node_data["code"]).encode("utf-8")).decode("utf-8") - return CodeNodeConfig( + result = CodeNodeConfig.model_construct( input_variables=input_variables, language=node_data["code_language"], output_variables=output_variables, code=code ).model_dump() + self.config_validate(node["id"], node["data"]["title"], CodeNodeConfig, result) + return result def convert_http_node_config(self, node: dict) -> dict: node_data = node["data"] - if node_data["authorization"] != 'no-auth': + if node_data["authorization"]["type"] != 'no-auth': auth_type = self.convert_http_auth_type(node_data["authorization"]["config"]["type"]) - auth_config = HttpAuthConfig( + auth_config = HttpAuthConfig.model_construct( auth_type=auth_type, header=node_data["authorization"]["config"].get("header"), api_key=node_data["authorization"]["config"].get("api_key"), @@ -504,7 +543,7 @@ class DifyConverter(BaseConverter): body_content = "" headers = {} - for header in node_data["headers"].split("\n"): + for header in node_data.get("headers", "").split("\n"): if not header: continue @@ -522,7 +561,7 @@ class DifyConverter(BaseConverter): )) params = {} - for param in node_data["params"].split("\n"): + for param in node_data.get("params", "").split("\n"): if not param: continue @@ -547,7 +586,7 @@ class DifyConverter(BaseConverter): default_body = "" default_header = {} default_status_code = 0 - for var in node_data["default_value"]: + for var in node_data.get("default_value") or []: if var["key"] == "body": default_body = var["value"] elif var["key"] == "header": @@ -561,45 +600,50 @@ class DifyConverter(BaseConverter): ) self.error_branch_node_cache.append(node['id']) - return HttpRequestNodeConfig( + result = HttpRequestNodeConfig.model_construct( method=node_data["method"].upper(), url=node_data["url"], auth=auth_config, - body=HttpContentTypeConfig( + body=HttpContentTypeConfig.model_construct( content_type=self.convert_http_content_type(node_data["body"]["type"]), data=body_content, ), headers=headers, params=params, verify_ssl=node_data["ssl_verify"], - timeouts=HttpTimeOutConfig( + timeouts=HttpTimeOutConfig.model_construct( connect_timeout=node_data["timeout"]["max_connect_timeout"] or 5, read_timeout=node_data["timeout"]["max_read_timeout"] or 5, write_timeout=node_data["timeout"]["max_write_timeout"] or 5, ), - retry=HttpRetryConfig( + retry=HttpRetryConfig.model_construct( enable=node_data["retry_config"]["retry_enabled"], max_attempts=node_data["retry_config"]["max_retries"], retry_interval=node_data["retry_config"]["retry_interval"], ), - error_handle=HttpErrorHandleConfig( + error_handle=HttpErrorHandleConfig.model_construct( method=error_handle_type, default=default_value, ) ).model_dump() + self.config_validate(node["id"], node["data"]["title"], HttpRequestNodeConfig, result) + return result + def convert_jinja_render_node_config(self, node: dict) -> dict: node_data = node["data"] mapping = [] for variable in node_data["variables"]: - mapping.append(VariablesMappingConfig( + mapping.append(VariablesMappingConfig.model_construct( name=variable["variable"], value=self._process_list_variable_litearl(variable["value_selector"]) )) - return JinjaRenderNodeConfig( + result = JinjaRenderNodeConfig.model_construct( template=node_data["template"], mapping=mapping, ).model_dump() + self.config_validate(node["id"], node["data"]["title"], JinjaRenderNodeConfig, result) + return result def convert_knowledge_node_config(self, node: dict) -> dict: node_data = node["data"] @@ -609,10 +653,13 @@ class DifyConverter(BaseConverter): type=ExceptionType.CONFIG, detail=f"Please reconfigure the Knowledge Retrieval node.", )) - return KnowledgeRetrievalNodeConfig.model_construct( + result = KnowledgeRetrievalNodeConfig.model_construct( query=self._process_list_variable_litearl(node_data["query_variable_selector"]), ).model_dump() + self.config_validate(node["id"], node["data"]["title"], KnowledgeRetrievalNodeConfig, result) + return result + def convert_parameter_extractor_node_config(self, node: dict) -> dict: node_data = node["data"] self.warnings.append( @@ -623,46 +670,53 @@ class DifyConverter(BaseConverter): ) ) params = [] - for param in node_data["parameters"]: + for param in node_data.get("parameters", []): params.append( - ParamsConfig( + ParamsConfig.model_construct( name=param["name"], desc=param["description"], required=param["required"], type=param["type"], ) ) - return ParameterExtractorNodeConfig.model_construct( + result = ParameterExtractorNodeConfig.model_construct( text=self._process_list_variable_litearl(node_data["query"]), params=params, - prompt=node_data["instruction"] + prompt=node_data.get("instruction") ).model_dump() + self.config_validate(node["id"], node["data"]["title"], ParameterExtractorNodeConfig, result) + return result + def convert_variable_aggregator_node_config(self, node: dict) -> dict: node_data = node["data"] - group_enable = node_data["advanced_settings"]["group_enabled"] + advanced_settings = node_data.get("advanced_settings", {}) group_variables = {} group_type = {} - if not group_enable: + if not advanced_settings or not advanced_settings["group_enabled"]: group_variables["output"] = [ self._process_list_variable_litearl(variable) for variable in node_data["variables"] ] group_type["output"] = node_data["output_type"] else: - for group in node_data["advanced_settings"]["groups"]: + for group in advanced_settings["groups"]: group_variables[group["group_name"]] = [ self._process_list_variable_litearl(variable) for variable in group["variables"] ] group_type[group["group_name"]] = group["output_type"] - return VariableAggregatorNodeConfig( - group=group_enable, + result = VariableAggregatorNodeConfig.model_construct( + group=advanced_settings.get("group_enabled", False), group_variables=group_variables, group_type=group_type, ).model_dump() + self.config_validate(node["id"], node["data"]["title"], VariableAggregatorNodeConfig, result) + + return result + def convert_tool_node_config(self, node: dict) -> dict: node_data = node["data"] self.warnings.append(ExceptionDefineition( diff --git a/api/app/core/workflow/adapters/dify/dify_adapter.py b/api/app/core/workflow/adapters/dify/dify_adapter.py index 2ecde092..dcd14c7f 100644 --- a/api/app/core/workflow/adapters/dify/dify_adapter.py +++ b/api/app/core/workflow/adapters/dify/dify_adapter.py @@ -59,7 +59,7 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter): ) def map_node_type(self, platform_node_type) -> str: - return self.NODE_TYPE_MAPPING.get(platform_node_type) + return self.NODE_TYPE_MAPPING.get(platform_node_type, NodeType.UNKNOWN) @property def origin_nodes(self): @@ -179,8 +179,13 @@ class DifyAdapter(BasePlatformAdapter, DifyConverter): node_type = node_data["type"] try: converter = self.get_node_convert(node_type) - if converter is None: - raise Exception(f"node type not supported - {node_type}") + if node_type not in self.CONFIG_CONVERT_MAP: + self.errors.append(ExceptionDefineition( + type=ExceptionType.NODE, + node_id=node["id"], + node_name=node["data"]["title"], + detail=f"node type {node_type} is unsupported", + )) return converter(node) except Exception as e: self.errors.append(ExceptionDefineition( diff --git a/api/app/core/workflow/engine/variable_pool.py b/api/app/core/workflow/engine/variable_pool.py index fd28eba5..d08f47e5 100644 --- a/api/app/core/workflow/engine/variable_pool.py +++ b/api/app/core/workflow/engine/variable_pool.py @@ -158,6 +158,22 @@ class VariablePool: default: Any = None, strict: bool = True ): + """Retrieve a variable instance from the variable pool. + + Args: + selector: + Variable selector as a string variable literal (e.g. "{{ sys.message }}"). + default: + The value to return if the variable does not exist. + strict: + If True, raises KeyError when the variable does not exist. + + Returns: + The variable instance object if it exists; otherwise returns `default`. + + Raises: + KeyError: If strict is True and the variable does not exist. + """ variable_struct = self._get_variable_struct(selector) if variable_struct is None: if strict: diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 3c3137fe..e781b6c4 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -132,24 +132,24 @@ class WorkflowExecutor: start_time = datetime.datetime.now() - # Build the workflow graph - graph = self.build_graph() - - # Initialize the variable pool with input data - await self.variable_initializer.initialize( - variable_pool=self.variable_pool, - input_data=input_data, - execution_context=self.execution_context - ) - initial_state = self.state_manager.create_initial_state( - workflow_config=self.workflow_config, - input_data=input_data, - execution_context=self.execution_context, - start_node_id=self.start_node_id - ) - # Execute the workflow try: + # Build the workflow graph + graph = self.build_graph() + + # Initialize the variable pool with input data + await self.variable_initializer.initialize( + variable_pool=self.variable_pool, + input_data=input_data, + execution_context=self.execution_context + ) + initial_state = self.state_manager.create_initial_state( + workflow_config=self.workflow_config, + input_data=input_data, + execution_context=self.execution_context, + start_node_id=self.start_node_id + ) + result = await graph.ainvoke(initial_state, config=self.execution_context.checkpoint_config) # Aggregate output from all End nodes @@ -231,23 +231,23 @@ class WorkflowExecutor: } } - # Build the workflow graph in streaming mode - graph = self.build_graph(stream=True) - - # Initialize the variable pool and system variables - await self.variable_initializer.initialize( - variable_pool=self.variable_pool, - input_data=input_data, - execution_context=self.execution_context - ) - initial_state = self.state_manager.create_initial_state( - workflow_config=self.workflow_config, - input_data=input_data, - execution_context=self.execution_context, - start_node_id=self.start_node_id - ) - try: + # Build the workflow graph in streaming mode + graph = self.build_graph(stream=True) + + # Initialize the variable pool and system variables + await self.variable_initializer.initialize( + variable_pool=self.variable_pool, + input_data=input_data, + execution_context=self.execution_context + ) + initial_state = self.state_manager.create_initial_state( + workflow_config=self.workflow_config, + input_data=input_data, + execution_context=self.execution_context, + start_node_id=self.start_node_id + ) + full_content = '' self.stream_coordinator.update_scope_activation("sys") diff --git a/api/app/core/workflow/nodes/enums.py b/api/app/core/workflow/nodes/enums.py index 0579bdf5..ae9b81ff 100644 --- a/api/app/core/workflow/nodes/enums.py +++ b/api/app/core/workflow/nodes/enums.py @@ -24,6 +24,8 @@ class NodeType(StrEnum): MEMORY_READ = "memory-read" MEMORY_WRITE = "memory-write" + UNKNOWN = "unknown" + BRANCH_NODES = [NodeType.IF_ELSE, NodeType.HTTP_REQUEST, NodeType.QUESTION_CLASSIFIER] diff --git a/api/app/core/workflow/nodes/node_factory.py b/api/app/core/workflow/nodes/node_factory.py index 00120ca0..864e3251 100644 --- a/api/app/core/workflow/nodes/node_factory.py +++ b/api/app/core/workflow/nodes/node_factory.py @@ -123,10 +123,10 @@ class NodeFactory: # 获取节点类 node_class = cls._node_types.get(node_type) if not node_class: - raise ValueError(f"不支持的节点类型: {node_type}") + raise ValueError(f"Unsupported node type: {node_type}") # 创建节点实例 - logger.debug(f"创建节点: {node_config.get('id')} (type={node_type})") + logger.debug(f"create node instance: {node_config.get('id')} (type={node_type})") return node_class(node_config, workflow_config) @classmethod