From cef33fce0de718c238bc87b469e56209dc719363 Mon Sep 17 00:00:00 2001 From: wxy Date: Thu, 7 May 2026 16:26:47 +0800 Subject: [PATCH] fix(workflow): sanitize condition expression building and cache assigner node inputs - Sanitize condition expression construction in graph_builder.py using json.dumps to prevent potential injection vulnerabilities. - Cache input data prior to assigner node execution to ensure variable values are correctly captured before processing. --- api/app/core/workflow/engine/graph_builder.py | 5 ++++- api/app/core/workflow/nodes/assigner/node.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/api/app/core/workflow/engine/graph_builder.py b/api/app/core/workflow/engine/graph_builder.py index fca0e2fe..4804fd0a 100644 --- a/api/app/core/workflow/engine/graph_builder.py +++ b/api/app/core/workflow/engine/graph_builder.py @@ -2,6 +2,7 @@ # Author: Eternity # @Email: 1533512157@qq.com # @Time : 2026/2/10 13:33 +import json import logging import re import uuid @@ -318,7 +319,9 @@ class GraphBuilder: # For LLM nodes, use branch_signal field for routing (output is dynamic text) # For other branch nodes (e.g. HTTP), use output field route_field = "branch_signal" if node_type == NodeType.LLM else "output" - related_edge[idx]['condition'] = f"node['{node_id}']['{route_field}'] == '{related_edge[idx]['label']}'" + related_edge[idx]['condition'] = ( + f"node[{json.dumps(node_id)}][{json.dumps(route_field)}] == {json.dumps(related_edge[idx]['label'])}" + ) if node_instance: # Wrap node's run method to avoid closure issues diff --git a/api/app/core/workflow/nodes/assigner/node.py b/api/app/core/workflow/nodes/assigner/node.py index f5bdf000..211cc6e8 100644 --- a/api/app/core/workflow/nodes/assigner/node.py +++ b/api/app/core/workflow/nodes/assigner/node.py @@ -18,10 +18,17 @@ class AssignerNode(BaseNode): super().__init__(node_config, workflow_config, down_stream_nodes) self.variable_updater = True self.typed_config: AssignerNodeConfig | None = None + self._input_data: dict[str, Any] | None = None def _output_types(self) -> dict[str, VariableType]: return {} + def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: + """提取节点输入,如果有缓存的执行前数据则使用缓存""" + if self._input_data is not None: + return self._input_data + return {"config": self._resolve_config(self.config, variable_pool)} + async def execute(self, state: WorkflowState, variable_pool: VariablePool) -> Any: """ Execute the assignment operation defined by this node. @@ -34,6 +41,9 @@ class AssignerNode(BaseNode): Returns: None or the result of the assignment operation. """ + # 在执行前提取并缓存输入数据(捕获执行前的变量值) + self._input_data = {"config": self._resolve_config(self.config, variable_pool)} + # Initialize a variable pool for accessing conversation, node, and system variables self.typed_config = AssignerNodeConfig(**self.config) logger.info(f"节点 {self.node_id} 开始执行")