feat(workflow): support array[file] field-level conditions in if-else nodes

Added support for evaluating conditions on individual fields of file objects within array[file] variables. Extended variable pool to extract fields from array elements, introduced new condition models (SubVariableConditionItem, SubVariableCondition, ConditionGroup), and added ArrayFileContainsOperator to handle contains/not_contains logic with nested sub-conditions. Includes backward compatibility migration for legacy flat expressions.
This commit is contained in:
Timebomb2018
2026-04-17 17:27:51 +08:00
parent 4cbb0cee2f
commit 62c721bdf6
4 changed files with 165 additions and 38 deletions

View File

@@ -201,12 +201,15 @@ class VariablePool:
@staticmethod
def _extract_field(struct: "VariableStruct", field: str | None) -> Any:
"""If field is given, drill into a dict/object variable's value."""
"""If field is given, drill into a dict/object/array[file] variable's value."""
if field is None:
return struct.instance.get_value()
value = struct.instance.get_value()
# array[file]: extract the field from every element, return a list
if isinstance(value, list):
return [item.get(field) if isinstance(item, dict) else getattr(item, field, None) for item in value]
if not isinstance(value, dict):
raise KeyError(f"Variable is not an object, cannot access field '{field}'")
raise KeyError(f"Variable is not an object or array, cannot access field '{field}'")
return value.get(field)
def get_instance(

View File

@@ -1,11 +1,25 @@
"""Condition Configuration"""
from typing import Any
from pydantic import Field, BaseModel, field_validator
from pydantic import Field, BaseModel, field_validator, model_validator
from app.core.workflow.nodes.base_config import BaseNodeConfig
from app.core.workflow.nodes.enums import ComparisonOperator, LogicOperator, ValueInputType
class SubVariableConditionItem(BaseModel):
"""A single condition on a file object's field, used inside sub_variable_condition."""
key: str = Field(..., description="Field name of the file object, e.g. type, size, name")
operator: ComparisonOperator = Field(..., description="Comparison operator")
value: Any = Field(default=None, description="Value to compare with")
var_type: str = Field(default="string", description="Field value type: string or number")
class SubVariableCondition(BaseModel):
"""Sub-conditions applied to each file element in an array[file] variable."""
logical_operator: LogicOperator = Field(default=LogicOperator.AND)
conditions: list[SubVariableConditionItem] = Field(default_factory=list)
class ConditionDetail(BaseModel):
operator: ComparisonOperator = Field(
...,
@@ -14,12 +28,12 @@ class ConditionDetail(BaseModel):
left: str = Field(
...,
description="Value to compare against"
description="Variable selector, e.g. {{sys.files}}"
)
right: Any = Field(
default=None,
description="Value to compare with"
description="Value to compare with (unused when sub_variable_condition is set)"
)
input_type: ValueInputType = Field(
@@ -27,6 +41,11 @@ class ConditionDetail(BaseModel):
description="Value input type for comparison"
)
sub_variable_condition: SubVariableCondition | None = Field(
default=None,
description="Sub-conditions for array[file] fields. When set, operator must be contains/not_contains."
)
@field_validator("input_type", mode="before")
@classmethod
def lower_input_type(cls, v):
@@ -38,19 +57,49 @@ class ConditionDetail(BaseModel):
return v
class ConditionGroup(BaseModel):
"""A group of conditions combined by group_operator (AND/OR)"""
group_operator: LogicOperator = Field(
default=LogicOperator.AND,
description="Logical operator used to combine conditions within this group"
)
conditions: list[ConditionDetail] = Field(
...,
description="List of conditions within this group"
)
class ConditionBranchConfig(BaseModel):
"""Configuration for a conditional branch"""
"""Configuration for a conditional branch.
logical_operator controls how groups are combined.
Each group's group_operator controls how conditions within it are combined.
"""
logical_operator: LogicOperator = Field(
default=LogicOperator.AND,
description="Logical operator used to combine multiple condition expressions"
description="Logical operator used to combine condition groups"
)
expressions: list[ConditionDetail] = Field(
expressions: list[ConditionGroup] = Field(
...,
description="List of condition expressions within this branch"
description="List of condition groups within this branch"
)
@model_validator(mode="before")
@classmethod
def migrate_flat_expressions(cls, data):
"""Migrate legacy flat expressions (list[ConditionDetail]) to list[ConditionGroup]."""
exprs = data.get("expressions", [])
if exprs and isinstance(exprs[0], dict) and "left" in exprs[0]:
data["expressions"] = [{
"group_operator": data.get("logical_operator", "and"),
"conditions": exprs
}]
return data
class IfElseNodeConfig(BaseNodeConfig):
cases: list[ConditionBranchConfig] = Field(

View File

@@ -7,7 +7,7 @@ from app.core.workflow.engine.variable_pool import VariablePool
from app.core.workflow.nodes.base_node import BaseNode
from app.core.workflow.nodes.enums import ComparisonOperator, LogicOperator, ValueInputType
from app.core.workflow.nodes.if_else import IfElseNodeConfig
from app.core.workflow.nodes.operators import ConditionExpressionResolver, CompareOperatorInstance
from app.core.workflow.nodes.operators import ConditionExpressionResolver, CompareOperatorInstance, ArrayFileContainsOperator
from app.core.workflow.variable.base_variable import VariableType
logger = logging.getLogger(__name__)
@@ -26,17 +26,23 @@ class IfElseNode(BaseNode):
def _extract_input(self, state: WorkflowState, variable_pool: VariablePool) -> dict[str, Any]:
result = []
for case in self.typed_config.cases:
expressions = []
for expression in case.expressions:
expressions.append({
"left": self.get_variable(expression.left, variable_pool, strict=False),
"right": expression.right
if expression.input_type == ValueInputType.CONSTANT or expression.right is None
else self.get_variable(expression.right, variable_pool, strict=False),
"operator": str(expression.operator),
groups = []
for group in case.expressions:
conditions = []
for condition in group.conditions:
conditions.append({
"left": self.get_variable(condition.left, variable_pool, strict=False),
"right": condition.right
if condition.input_type == ValueInputType.CONSTANT or condition.right is None
else self.get_variable(condition.right, variable_pool, strict=False),
"operator": str(condition.operator),
})
groups.append({
"group_operator": str(group.group_operator),
"conditions": conditions,
})
result.append({
"expressions": expressions,
"expressions": groups,
"logical_operator": str(case.logical_operator),
})
return {
@@ -90,30 +96,39 @@ class IfElseNode(BaseNode):
list[str]: A list of Python boolean expression strings,
ordered by branch priority.
"""
branch_index = 0
conditions = []
for case_branch in self.typed_config.cases:
branch_index += 1
branch_result = []
for expression in case_branch.expressions:
pattern = r"\{\{\s*(.*?)\s*\}\}"
left_string = re.sub(pattern, r"\1", expression.left).strip()
try:
left_value = self.get_variable(left_string, variable_pool)
except KeyError:
left_value = None
evaluator = ConditionExpressionResolver.resolve_by_value(left_value)(
variable_pool,
expression.left,
expression.right,
expression.input_type
)
branch_result.append(self._evaluate(expression.operator, evaluator))
group_results = []
for group in case_branch.expressions:
condition_results = []
for condition in group.conditions:
pattern = r"\{\{\s*(.*?)\s*\}\}"
left_string = re.sub(pattern, r"\1", condition.left).strip()
try:
left_value = self.get_variable(left_string, variable_pool)
except KeyError:
left_value = None
# array[file] + sub_variable_condition: use ArrayFileContainsOperator directly
if condition.sub_variable_condition is not None and isinstance(left_value, list):
evaluator = ArrayFileContainsOperator(left_value, condition.sub_variable_condition)
else:
evaluator = ConditionExpressionResolver.resolve_by_value(left_value)(
variable_pool,
condition.left,
condition.right,
condition.input_type
)
condition_results.append(self._evaluate(condition.operator, evaluator))
if group.group_operator == LogicOperator.AND:
group_results.append(all(condition_results))
else:
group_results.append(any(condition_results))
if case_branch.logical_operator == LogicOperator.AND:
conditions.append(all(branch_result))
conditions.append(all(group_results))
else:
condition_res = any(branch_result)
condition_res = any(group_results)
conditions.append(condition_res)
if condition_res:
return conditions

View File

@@ -395,11 +395,71 @@ class NoneObjectComparisonOperator:
return lambda *args, **kwargs: False
class ArrayFileContainsOperator:
"""Handles contains/not_contains on array[file] with sub_variable_condition.
Evaluates whether any (contains) or no (not_contains) file element
in the array satisfies all sub-conditions.
"""
def __init__(self, left_value: list[dict], sub_variable_condition: Any):
self.left_value = left_value
self.sub_variable_condition = sub_variable_condition
def _match_item(self, file_item: dict) -> bool:
"""Check if a single file dict satisfies all sub-conditions."""
results = []
for cond in self.sub_variable_condition.conditions:
field_val = file_item.get(cond.key)
result = self._eval_sub(field_val, cond)
results.append(result)
if self.sub_variable_condition.logical_operator.value == "and":
return all(results)
return any(results)
@staticmethod
def _eval_sub(field_val: Any, cond: Any) -> bool:
op = cond.operator.value
expected = cond.value
if field_val is None:
return op in ("empty", "not_empty") and op == "empty"
match op:
case "eq": return str(field_val) == str(expected)
case "ne": return str(field_val) != str(expected)
case "contains": return isinstance(field_val, str) and str(expected) in field_val
case "not_contains": return isinstance(field_val, str) and str(expected) not in field_val
case "in": return field_val in (expected if isinstance(expected, list) else [expected])
case "not_in": return field_val not in (expected if isinstance(expected, list) else [expected])
case "gt": return isinstance(field_val, (int, float)) and field_val > float(expected)
case "ge": return isinstance(field_val, (int, float)) and field_val >= float(expected)
case "lt": return isinstance(field_val, (int, float)) and field_val < float(expected)
case "le": return isinstance(field_val, (int, float)) and field_val <= float(expected)
case "empty": return field_val in (None, "", 0)
case "not_empty": return field_val not in (None, "", 0)
case _: return False
def contains(self) -> bool:
return any(self._match_item(f) for f in self.left_value if isinstance(f, dict))
def not_contains(self) -> bool:
return not self.contains()
def empty(self) -> bool:
return not self.left_value
def not_empty(self) -> bool:
return bool(self.left_value)
def __getattr__(self, name):
return lambda *args, **kwargs: False
CompareOperatorInstance = Union[
StringComparisonOperator,
NumberComparisonOperator,
BooleanComparisonOperator,
ArrayComparisonOperator,
ArrayFileContainsOperator,
ObjectComparisonOperator
]
CompareOperatorType = Type[CompareOperatorInstance]