From 62c721bdf666a110dd4bf8b4079df1941c4108af Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 17 Apr 2026 17:27:51 +0800 Subject: [PATCH 1/3] feat(workflow): support array[file] field-level conditions in if-else nodes Added support for evaluating conditions on individual fields of file objects within array[file] variables. Extended variable pool to extract fields from array elements, introduced new condition models (SubVariableConditionItem, SubVariableCondition, ConditionGroup), and added ArrayFileContainsOperator to handle contains/not_contains logic with nested sub-conditions. Includes backward compatibility migration for legacy flat expressions. --- api/app/core/workflow/engine/variable_pool.py | 7 +- api/app/core/workflow/nodes/if_else/config.py | 63 ++++++++++++++-- api/app/core/workflow/nodes/if_else/node.py | 73 +++++++++++-------- api/app/core/workflow/nodes/operators.py | 60 +++++++++++++++ 4 files changed, 165 insertions(+), 38 deletions(-) diff --git a/api/app/core/workflow/engine/variable_pool.py b/api/app/core/workflow/engine/variable_pool.py index 08d10e22..b34efe15 100644 --- a/api/app/core/workflow/engine/variable_pool.py +++ b/api/app/core/workflow/engine/variable_pool.py @@ -201,12 +201,15 @@ class VariablePool: @staticmethod def _extract_field(struct: "VariableStruct", field: str | None) -> Any: - """If field is given, drill into a dict/object variable's value.""" + """If field is given, drill into a dict/object/array[file] variable's value.""" if field is None: return struct.instance.get_value() value = struct.instance.get_value() + # array[file]: extract the field from every element, return a list + if isinstance(value, list): + return [item.get(field) if isinstance(item, dict) else getattr(item, field, None) for item in value] if not isinstance(value, dict): - raise KeyError(f"Variable is not an object, cannot access field '{field}'") + raise KeyError(f"Variable is not an object or array, cannot access field '{field}'") return value.get(field) def get_instance( diff --git a/api/app/core/workflow/nodes/if_else/config.py b/api/app/core/workflow/nodes/if_else/config.py index 638e4b2d..4886aa2c 100644 --- a/api/app/core/workflow/nodes/if_else/config.py +++ b/api/app/core/workflow/nodes/if_else/config.py @@ -1,11 +1,25 @@ """Condition Configuration""" from typing import Any -from pydantic import Field, BaseModel, field_validator +from pydantic import Field, BaseModel, field_validator, model_validator from app.core.workflow.nodes.base_config import BaseNodeConfig from app.core.workflow.nodes.enums import ComparisonOperator, LogicOperator, ValueInputType +class SubVariableConditionItem(BaseModel): + """A single condition on a file object's field, used inside sub_variable_condition.""" + key: str = Field(..., description="Field name of the file object, e.g. type, size, name") + operator: ComparisonOperator = Field(..., description="Comparison operator") + value: Any = Field(default=None, description="Value to compare with") + var_type: str = Field(default="string", description="Field value type: string or number") + + +class SubVariableCondition(BaseModel): + """Sub-conditions applied to each file element in an array[file] variable.""" + logical_operator: LogicOperator = Field(default=LogicOperator.AND) + conditions: list[SubVariableConditionItem] = Field(default_factory=list) + + class ConditionDetail(BaseModel): operator: ComparisonOperator = Field( ..., @@ -14,12 +28,12 @@ class ConditionDetail(BaseModel): left: str = Field( ..., - description="Value to compare against" + description="Variable selector, e.g. {{sys.files}}" ) right: Any = Field( default=None, - description="Value to compare with" + description="Value to compare with (unused when sub_variable_condition is set)" ) input_type: ValueInputType = Field( @@ -27,6 +41,11 @@ class ConditionDetail(BaseModel): description="Value input type for comparison" ) + sub_variable_condition: SubVariableCondition | None = Field( + default=None, + description="Sub-conditions for array[file] fields. When set, operator must be contains/not_contains." + ) + @field_validator("input_type", mode="before") @classmethod def lower_input_type(cls, v): @@ -38,19 +57,49 @@ class ConditionDetail(BaseModel): return v +class ConditionGroup(BaseModel): + """A group of conditions combined by group_operator (AND/OR)""" + + group_operator: LogicOperator = Field( + default=LogicOperator.AND, + description="Logical operator used to combine conditions within this group" + ) + + conditions: list[ConditionDetail] = Field( + ..., + description="List of conditions within this group" + ) + + class ConditionBranchConfig(BaseModel): - """Configuration for a conditional branch""" + """Configuration for a conditional branch. + + logical_operator controls how groups are combined. + Each group's group_operator controls how conditions within it are combined. + """ logical_operator: LogicOperator = Field( default=LogicOperator.AND, - description="Logical operator used to combine multiple condition expressions" + description="Logical operator used to combine condition groups" ) - expressions: list[ConditionDetail] = Field( + expressions: list[ConditionGroup] = Field( ..., - description="List of condition expressions within this branch" + description="List of condition groups within this branch" ) + @model_validator(mode="before") + @classmethod + def migrate_flat_expressions(cls, data): + """Migrate legacy flat expressions (list[ConditionDetail]) to list[ConditionGroup].""" + exprs = data.get("expressions", []) + if exprs and isinstance(exprs[0], dict) and "left" in exprs[0]: + data["expressions"] = [{ + "group_operator": data.get("logical_operator", "and"), + "conditions": exprs + }] + return data + class IfElseNodeConfig(BaseNodeConfig): cases: list[ConditionBranchConfig] = Field( diff --git a/api/app/core/workflow/nodes/if_else/node.py b/api/app/core/workflow/nodes/if_else/node.py index ec46b20b..8dc1221f 100644 --- a/api/app/core/workflow/nodes/if_else/node.py +++ b/api/app/core/workflow/nodes/if_else/node.py @@ -7,7 +7,7 @@ from app.core.workflow.engine.variable_pool import VariablePool from app.core.workflow.nodes.base_node import BaseNode from app.core.workflow.nodes.enums import ComparisonOperator, LogicOperator, ValueInputType from app.core.workflow.nodes.if_else import IfElseNodeConfig -from app.core.workflow.nodes.operators import ConditionExpressionResolver, CompareOperatorInstance +from app.core.workflow.nodes.operators import ConditionExpressionResolver, CompareOperatorInstance, ArrayFileContainsOperator from app.core.workflow.variable.base_variable import VariableType logger = logging.getLogger(__name__) @@ -26,17 +26,23 @@ class IfElseNode(BaseNode): def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: result = [] for case in self.typed_config.cases: - expressions = [] - for expression in case.expressions: - expressions.append({ - "left": self.get_variable(expression.left, variable_pool, strict=False), - "right": expression.right - if expression.input_type == ValueInputType.CONSTANT or expression.right is None - else self.get_variable(expression.right, variable_pool, strict=False), - "operator": str(expression.operator), + groups = [] + for group in case.expressions: + conditions = [] + for condition in group.conditions: + conditions.append({ + "left": self.get_variable(condition.left, variable_pool, strict=False), + "right": condition.right + if condition.input_type == ValueInputType.CONSTANT or condition.right is None + else self.get_variable(condition.right, variable_pool, strict=False), + "operator": str(condition.operator), + }) + groups.append({ + "group_operator": str(group.group_operator), + "conditions": conditions, }) result.append({ - "expressions": expressions, + "expressions": groups, "logical_operator": str(case.logical_operator), }) return { @@ -90,30 +96,39 @@ class IfElseNode(BaseNode): list[str]: A list of Python boolean expression strings, ordered by branch priority. """ - branch_index = 0 conditions = [] for case_branch in self.typed_config.cases: - branch_index += 1 - branch_result = [] - for expression in case_branch.expressions: - pattern = r"\{\{\s*(.*?)\s*\}\}" - left_string = re.sub(pattern, r"\1", expression.left).strip() - try: - left_value = self.get_variable(left_string, variable_pool) - except KeyError: - left_value = None - evaluator = ConditionExpressionResolver.resolve_by_value(left_value)( - variable_pool, - expression.left, - expression.right, - expression.input_type - ) - branch_result.append(self._evaluate(expression.operator, evaluator)) + group_results = [] + for group in case_branch.expressions: + condition_results = [] + for condition in group.conditions: + pattern = r"\{\{\s*(.*?)\s*\}\}" + left_string = re.sub(pattern, r"\1", condition.left).strip() + try: + left_value = self.get_variable(left_string, variable_pool) + except KeyError: + left_value = None + + # array[file] + sub_variable_condition: use ArrayFileContainsOperator directly + if condition.sub_variable_condition is not None and isinstance(left_value, list): + evaluator = ArrayFileContainsOperator(left_value, condition.sub_variable_condition) + else: + evaluator = ConditionExpressionResolver.resolve_by_value(left_value)( + variable_pool, + condition.left, + condition.right, + condition.input_type + ) + condition_results.append(self._evaluate(condition.operator, evaluator)) + if group.group_operator == LogicOperator.AND: + group_results.append(all(condition_results)) + else: + group_results.append(any(condition_results)) if case_branch.logical_operator == LogicOperator.AND: - conditions.append(all(branch_result)) + conditions.append(all(group_results)) else: - condition_res = any(branch_result) + condition_res = any(group_results) conditions.append(condition_res) if condition_res: return conditions diff --git a/api/app/core/workflow/nodes/operators.py b/api/app/core/workflow/nodes/operators.py index 14fc9d9f..30634bbe 100644 --- a/api/app/core/workflow/nodes/operators.py +++ b/api/app/core/workflow/nodes/operators.py @@ -395,11 +395,71 @@ class NoneObjectComparisonOperator: return lambda *args, **kwargs: False +class ArrayFileContainsOperator: + """Handles contains/not_contains on array[file] with sub_variable_condition. + + Evaluates whether any (contains) or no (not_contains) file element + in the array satisfies all sub-conditions. + """ + + def __init__(self, left_value: list[dict], sub_variable_condition: Any): + self.left_value = left_value + self.sub_variable_condition = sub_variable_condition + + def _match_item(self, file_item: dict) -> bool: + """Check if a single file dict satisfies all sub-conditions.""" + results = [] + for cond in self.sub_variable_condition.conditions: + field_val = file_item.get(cond.key) + result = self._eval_sub(field_val, cond) + results.append(result) + if self.sub_variable_condition.logical_operator.value == "and": + return all(results) + return any(results) + + @staticmethod + def _eval_sub(field_val: Any, cond: Any) -> bool: + op = cond.operator.value + expected = cond.value + if field_val is None: + return op in ("empty", "not_empty") and op == "empty" + match op: + case "eq": return str(field_val) == str(expected) + case "ne": return str(field_val) != str(expected) + case "contains": return isinstance(field_val, str) and str(expected) in field_val + case "not_contains": return isinstance(field_val, str) and str(expected) not in field_val + case "in": return field_val in (expected if isinstance(expected, list) else [expected]) + case "not_in": return field_val not in (expected if isinstance(expected, list) else [expected]) + case "gt": return isinstance(field_val, (int, float)) and field_val > float(expected) + case "ge": return isinstance(field_val, (int, float)) and field_val >= float(expected) + case "lt": return isinstance(field_val, (int, float)) and field_val < float(expected) + case "le": return isinstance(field_val, (int, float)) and field_val <= float(expected) + case "empty": return field_val in (None, "", 0) + case "not_empty": return field_val not in (None, "", 0) + case _: return False + + def contains(self) -> bool: + return any(self._match_item(f) for f in self.left_value if isinstance(f, dict)) + + def not_contains(self) -> bool: + return not self.contains() + + def empty(self) -> bool: + return not self.left_value + + def not_empty(self) -> bool: + return bool(self.left_value) + + def __getattr__(self, name): + return lambda *args, **kwargs: False + + CompareOperatorInstance = Union[ StringComparisonOperator, NumberComparisonOperator, BooleanComparisonOperator, ArrayComparisonOperator, + ArrayFileContainsOperator, ObjectComparisonOperator ] CompareOperatorType = Type[CompareOperatorInstance] From fb93c509f48fa7fe19aae35896b93015632b626d Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 17 Apr 2026 17:46:49 +0800 Subject: [PATCH 2/3] refactor(workflow): simplify if-else node condition structure by removing nested condition groups The changes remove the `ConditionGroup` abstraction and flatten condition expressions directly under `ConditionBranchConfig.expressions`. This simplifies the data model and evaluation logic, eliminating redundant grouping layers while preserving all functionality. The migration logic and group-level operators are removed as they are no longer needed. BREAKING CHANGE: `ConditionBranchConfig.expressions` now expects a flat list of `ConditionDetail` instead of `ConditionGroup`; existing configurations must be updated to use direct condition lists. --- api/app/core/workflow/nodes/if_else/config.py | 39 ++-------- api/app/core/workflow/nodes/if_else/node.py | 74 ++++++++----------- 2 files changed, 36 insertions(+), 77 deletions(-) diff --git a/api/app/core/workflow/nodes/if_else/config.py b/api/app/core/workflow/nodes/if_else/config.py index 4886aa2c..302d469f 100644 --- a/api/app/core/workflow/nodes/if_else/config.py +++ b/api/app/core/workflow/nodes/if_else/config.py @@ -1,6 +1,6 @@ """Condition Configuration""" from typing import Any -from pydantic import Field, BaseModel, field_validator, model_validator +from pydantic import Field, BaseModel, field_validator from app.core.workflow.nodes.base_config import BaseNodeConfig from app.core.workflow.nodes.enums import ComparisonOperator, LogicOperator, ValueInputType @@ -57,49 +57,22 @@ class ConditionDetail(BaseModel): return v -class ConditionGroup(BaseModel): - """A group of conditions combined by group_operator (AND/OR)""" - - group_operator: LogicOperator = Field( - default=LogicOperator.AND, - description="Logical operator used to combine conditions within this group" - ) - - conditions: list[ConditionDetail] = Field( - ..., - description="List of conditions within this group" - ) - - class ConditionBranchConfig(BaseModel): """Configuration for a conditional branch. - logical_operator controls how groups are combined. - Each group's group_operator controls how conditions within it are combined. + logical_operator controls how all expressions are combined (AND/OR). """ logical_operator: LogicOperator = Field( default=LogicOperator.AND, - description="Logical operator used to combine condition groups" + description="Logical operator used to combine all conditions" ) - expressions: list[ConditionGroup] = Field( - ..., - description="List of condition groups within this branch" + expressions: list[ConditionDetail] = Field( + default_factory=list, + description="List of conditions within this branch" ) - @model_validator(mode="before") - @classmethod - def migrate_flat_expressions(cls, data): - """Migrate legacy flat expressions (list[ConditionDetail]) to list[ConditionGroup].""" - exprs = data.get("expressions", []) - if exprs and isinstance(exprs[0], dict) and "left" in exprs[0]: - data["expressions"] = [{ - "group_operator": data.get("logical_operator", "and"), - "conditions": exprs - }] - return data - class IfElseNodeConfig(BaseNodeConfig): cases: list[ConditionBranchConfig] = Field( diff --git a/api/app/core/workflow/nodes/if_else/node.py b/api/app/core/workflow/nodes/if_else/node.py index 8dc1221f..28ef9ce6 100644 --- a/api/app/core/workflow/nodes/if_else/node.py +++ b/api/app/core/workflow/nodes/if_else/node.py @@ -26,28 +26,20 @@ class IfElseNode(BaseNode): def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: result = [] for case in self.typed_config.cases: - groups = [] - for group in case.expressions: - conditions = [] - for condition in group.conditions: - conditions.append({ - "left": self.get_variable(condition.left, variable_pool, strict=False), - "right": condition.right - if condition.input_type == ValueInputType.CONSTANT or condition.right is None - else self.get_variable(condition.right, variable_pool, strict=False), - "operator": str(condition.operator), - }) - groups.append({ - "group_operator": str(group.group_operator), - "conditions": conditions, + conditions = [] + for condition in case.expressions: + conditions.append({ + "left": self.get_variable(condition.left, variable_pool, strict=False), + "right": condition.right + if condition.input_type == ValueInputType.CONSTANT or condition.right is None + else self.get_variable(condition.right, variable_pool, strict=False), + "operator": str(condition.operator), }) result.append({ - "expressions": groups, + "expressions": conditions, "logical_operator": str(case.logical_operator), }) - return { - "cases": result - } + return {"cases": result} @staticmethod def _evaluate(operator, instance: CompareOperatorInstance) -> Any: @@ -99,36 +91,30 @@ class IfElseNode(BaseNode): conditions = [] for case_branch in self.typed_config.cases: - group_results = [] - for group in case_branch.expressions: - condition_results = [] - for condition in group.conditions: - pattern = r"\{\{\s*(.*?)\s*\}\}" - left_string = re.sub(pattern, r"\1", condition.left).strip() - try: - left_value = self.get_variable(left_string, variable_pool) - except KeyError: - left_value = None + condition_results = [] + for condition in case_branch.expressions: + pattern = r"\{\{\s*(.*?)\s*\}\}" + left_string = re.sub(pattern, r"\1", condition.left).strip() + try: + left_value = self.get_variable(left_string, variable_pool) + except KeyError: + left_value = None - # array[file] + sub_variable_condition: use ArrayFileContainsOperator directly - if condition.sub_variable_condition is not None and isinstance(left_value, list): - evaluator = ArrayFileContainsOperator(left_value, condition.sub_variable_condition) - else: - evaluator = ConditionExpressionResolver.resolve_by_value(left_value)( - variable_pool, - condition.left, - condition.right, - condition.input_type - ) - condition_results.append(self._evaluate(condition.operator, evaluator)) - if group.group_operator == LogicOperator.AND: - group_results.append(all(condition_results)) + if condition.sub_variable_condition is not None and isinstance(left_value, list): + evaluator = ArrayFileContainsOperator(left_value, condition.sub_variable_condition) else: - group_results.append(any(condition_results)) + evaluator = ConditionExpressionResolver.resolve_by_value(left_value)( + variable_pool, + condition.left, + condition.right, + condition.input_type + ) + condition_results.append(self._evaluate(condition.operator, evaluator)) + if case_branch.logical_operator == LogicOperator.AND: - conditions.append(all(group_results)) + conditions.append(all(condition_results)) else: - condition_res = any(group_results) + condition_res = any(condition_results) conditions.append(condition_res) if condition_res: return conditions From d4a1904b19120fe2b1e4b0e100072f3499228848 Mon Sep 17 00:00:00 2001 From: Timebomb2018 <18868801967@163.com> Date: Fri, 17 Apr 2026 18:02:48 +0800 Subject: [PATCH 3/3] refactor(workflow): rename condition variables to expression in if-else node logic --- api/app/core/workflow/nodes/if_else/node.py | 44 +++++++++++---------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/api/app/core/workflow/nodes/if_else/node.py b/api/app/core/workflow/nodes/if_else/node.py index 28ef9ce6..faecd87c 100644 --- a/api/app/core/workflow/nodes/if_else/node.py +++ b/api/app/core/workflow/nodes/if_else/node.py @@ -26,20 +26,22 @@ class IfElseNode(BaseNode): def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]: result = [] for case in self.typed_config.cases: - conditions = [] - for condition in case.expressions: - conditions.append({ - "left": self.get_variable(condition.left, variable_pool, strict=False), - "right": condition.right - if condition.input_type == ValueInputType.CONSTANT or condition.right is None - else self.get_variable(condition.right, variable_pool, strict=False), - "operator": str(condition.operator), + expressions = [] + for expression in case.expressions: + expressions.append({ + "left": self.get_variable(expression.left, variable_pool, strict=False), + "right": expression.right + if expression.input_type == ValueInputType.CONSTANT or expression.right is None + else self.get_variable(expression.right, variable_pool, strict=False), + "operator": str(expression.operator), }) result.append({ - "expressions": conditions, + "expressions": expressions, "logical_operator": str(case.logical_operator), }) - return {"cases": result} + return { + "cases": result + } @staticmethod def _evaluate(operator, instance: CompareOperatorInstance) -> Any: @@ -91,30 +93,30 @@ class IfElseNode(BaseNode): conditions = [] for case_branch in self.typed_config.cases: - condition_results = [] - for condition in case_branch.expressions: + branch_result = [] + for expression in case_branch.expressions: pattern = r"\{\{\s*(.*?)\s*\}\}" - left_string = re.sub(pattern, r"\1", condition.left).strip() + left_string = re.sub(pattern, r"\1", expression.left).strip() try: left_value = self.get_variable(left_string, variable_pool) except KeyError: left_value = None - if condition.sub_variable_condition is not None and isinstance(left_value, list): - evaluator = ArrayFileContainsOperator(left_value, condition.sub_variable_condition) + if expression.sub_variable_condition is not None and isinstance(left_value, list): + evaluator = ArrayFileContainsOperator(left_value, expression.sub_variable_condition) else: evaluator = ConditionExpressionResolver.resolve_by_value(left_value)( variable_pool, - condition.left, - condition.right, - condition.input_type + expression.left, + expression.right, + expression.input_type ) - condition_results.append(self._evaluate(condition.operator, evaluator)) + branch_result.append(self._evaluate(expression.operator, evaluator)) if case_branch.logical_operator == LogicOperator.AND: - conditions.append(all(condition_results)) + conditions.append(all(branch_result)) else: - condition_res = any(condition_results) + condition_res = any(branch_result) conditions.append(condition_res) if condition_res: return conditions