From da6b17de2b9415b36682f2d0ef78703da392ab2b Mon Sep 17 00:00:00 2001 From: Mark Date: Mon, 22 Dec 2025 18:24:36 +0800 Subject: [PATCH] [modify] fix workflow execute logic --- api/app/core/workflow/executor.py | 12 ++++++++++- api/app/core/workflow/nodes/base_node.py | 23 +++++++++++++++++++--- api/app/core/workflow/template_renderer.py | 19 ++++++++++++------ 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index 3555d179..d73e25eb 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -69,7 +69,17 @@ class WorkflowExecutor: 初始化的工作流状态 """ user_message = input_data.get("message") or "" - conversation_vars = input_data.get("conversation_vars") or {} + + # 会话变量处理:从配置文件获取变量定义列表,转换为字典(name -> default value) + config_variables_list = self.workflow_config.get("variables") or [] + conversation_vars = {} + for var_def in config_variables_list: + if isinstance(var_def, dict): + var_name = var_def.get("name") + var_default = var_def.get("default") + if var_name: + conversation_vars[var_name] = var_default + input_variables = input_data.get("variables") or {} # Start 节点的自定义变量 # 构建分层的变量结构 diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 25fdd29e..44c92755 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -26,7 +26,12 @@ class WorkflowState(TypedDict): messages: Annotated[list[AnyMessage], add] # 输入变量(从配置的 variables 传入) - variables: dict[str, Any] + # 使用深度合并函数,支持嵌套字典的更新(如 conv.xxx) + variables: Annotated[dict[str, Any], lambda x, y: { + **x, + **{k: {**x.get(k, {}), **v} if isinstance(v, dict) and isinstance(x.get(k), dict) else v + for k, v in y.items()} + }] # 节点输出(存储每个节点的执行结果,用于变量引用) # 使用自定义合并函数,将新的节点输出合并到现有字典中 @@ -544,9 +549,15 @@ class BaseNode(ABC): # 使用变量池获取变量 pool = VariablePool(state) + # 构建完整的 variables 结构 + variables = { + "sys": pool.get_all_system_vars(), + "conv": pool.get_all_conversation_vars() + } + return render_template( template=template, - variables=pool.get_all_conversation_vars(), + variables=variables, node_outputs=pool.get_all_node_outputs(), system_vars=pool.get_all_system_vars() ) @@ -575,9 +586,15 @@ class BaseNode(ABC): # 使用变量池获取变量 pool = VariablePool(state) + # 构建完整的 variables 结构(包含 sys 和 conv) + variables = { + "sys": pool.get_all_system_vars(), + "conv": pool.get_all_conversation_vars() + } + return evaluate_condition( expression=expression, - variables=pool.get_all_conversation_vars(), + variables=variables, node_outputs=pool.get_all_node_outputs(), system_vars=pool.get_all_system_vars() ) diff --git a/api/app/core/workflow/template_renderer.py b/api/app/core/workflow/template_renderer.py index e9efec0b..b927bd98 100644 --- a/api/app/core/workflow/template_renderer.py +++ b/api/app/core/workflow/template_renderer.py @@ -66,19 +66,26 @@ class TemplateRenderer: '分析结果: 正面情绪' """ # 构建命名空间上下文 + # variables 的结构:{"sys": {...}, "conv": {...}} + sys_vars = variables.get("sys", {}) if isinstance(variables, dict) else {} + conv_vars = variables.get("conv", {}) if isinstance(variables, dict) else {} + context = { - "var": variables, # 用户变量:{{var.user_input}} + "conv": conv_vars, # 会话变量:{{conv.user_name}} "node": node_outputs, # 节点输出:{{node.node_1.output}} - "sys": system_vars or {}, # 系统变量:{{sys.execution_id}} + "sys": {**(system_vars or {}), **sys_vars}, # 系统变量:{{sys.execution_id}}(合并两个来源) } # 支持直接通过节点ID访问节点输出:{{llm_qa.output}} # 将所有节点输出添加到顶层上下文 - context.update(node_outputs) + if node_outputs: + context.update(node_outputs) - # 为了向后兼容,也支持直接访问用户变量 - context.update(variables) - context["nodes"] = node_outputs # 旧语法兼容 + # 支持直接访问会话变量(不需要 conv. 前缀):{{user_name}} + if conv_vars: + context.update(conv_vars) + + context["nodes"] = node_outputs or {} # 旧语法兼容 try: tmpl = self.env.from_string(template)