Merge #57 into develop from feature/20251219_myh

feat(workflow): add Jinja2 template rendering node

* feature/20251219_myh: (4 commits)
  feat(workflow): support variables wrapped with {{}} in assignment nodes, add input config example
  feat(workflow): add default User-Agent and redirect following for HTTP request ...
  fix(workflow): fix missing re-ranking for hybrid retrieval results in knowledge...
  feat(workflow): add Jinja2 template rendering node

Signed-off-by: Eternity <1533512157@qq.com>
Reviewed-by: zhuwenhui5566@163.com <zhuwenhui5566@163.com>
Merged-by: zhuwenhui5566@163.com <zhuwenhui5566@163.com>

CR-link: https://codeup.aliyun.com/redbearai/python/redbear-mem-open/change/57
This commit is contained in:
朱文辉
2025-12-25 17:38:27 +08:00
17 changed files with 195 additions and 21 deletions

View File

@@ -8,10 +8,11 @@ from app.core.workflow.nodes.agent import AgentNode
from app.core.workflow.nodes.assigner import AssignerNode from app.core.workflow.nodes.assigner import AssignerNode
from app.core.workflow.nodes.base_node import BaseNode, WorkflowState from app.core.workflow.nodes.base_node import BaseNode, WorkflowState
from app.core.workflow.nodes.end import EndNode from app.core.workflow.nodes.end import EndNode
from app.core.workflow.nodes.http_request import HttpRequestNode
from app.core.workflow.nodes.if_else import IfElseNode from app.core.workflow.nodes.if_else import IfElseNode
from app.core.workflow.nodes.jinja_render import JinjaRenderNode
from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNode from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNode
from app.core.workflow.nodes.llm import LLMNode from app.core.workflow.nodes.llm import LLMNode
from app.core.workflow.nodes.http_request import HttpRequestNode
from app.core.workflow.nodes.node_factory import NodeFactory, WorkflowNode from app.core.workflow.nodes.node_factory import NodeFactory, WorkflowNode
from app.core.workflow.nodes.start import StartNode from app.core.workflow.nodes.start import StartNode
from app.core.workflow.nodes.transform import TransformNode from app.core.workflow.nodes.transform import TransformNode
@@ -29,5 +30,6 @@ __all__ = [
"WorkflowNode", "WorkflowNode",
"KnowledgeRetrievalNode", "KnowledgeRetrievalNode",
"AssignerNode", "AssignerNode",
"HttpRequestNode" "HttpRequestNode",
"JinjaRenderNode",
] ]

View File

@@ -30,3 +30,18 @@ class AssignerNodeConfig(BaseNodeConfig):
..., ...,
description="List of variable assignment definitions", description="List of variable assignment definitions",
) )
class Config:
json_schema_extra = {
"examples": [
{
"assignments": [
{
"variable_selector": "{{ conv.test1 }}",
"operation": "add",
"value": "3"
}
]
}
]
}

View File

@@ -1,4 +1,5 @@
import logging import logging
import re
from typing import Any from typing import Any
from app.core.workflow.expression_evaluator import ExpressionEvaluator from app.core.workflow.expression_evaluator import ExpressionEvaluator
@@ -34,7 +35,9 @@ class AssignerNode(BaseNode):
variable_selector = assignment.variable_selector variable_selector = assignment.variable_selector
if isinstance(variable_selector, str): if isinstance(variable_selector, str):
# Support dot-separated string paths, e.g., "conv.test" -> ["conv", "test"] # Support dot-separated string paths, e.g., "conv.test" -> ["conv", "test"]
variable_selector = variable_selector.split('.') pattern = r"\{\{\s*(.*?)\s*\}\}"
expression = re.sub(pattern, r"\1", variable_selector).strip()
variable_selector = expression.split('.')
# Only conversation variables ('conv') are allowed # Only conversation variables ('conv') are allowed
if variable_selector[0] != 'conv': # TODO: Loop node variable support (Feature) if variable_selector[0] != 'conv': # TODO: Loop node variable support (Feature)

View File

@@ -7,10 +7,12 @@
import asyncio import asyncio
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, TypedDict, Annotated
from operator import add from operator import add
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage from typing import Any
from langchain_core.messages import AnyMessage, AIMessage
from langgraph.config import get_stream_writer from langgraph.config import get_stream_writer
from typing_extensions import TypedDict, Annotated
from app.core.workflow.variable_pool import VariablePool from app.core.workflow.variable_pool import VariablePool

View File

@@ -3,20 +3,21 @@
所有节点的配置类都在这里导出,方便使用。 所有节点的配置类都在这里导出,方便使用。
""" """
from app.core.workflow.nodes.agent.config import AgentNodeConfig
from app.core.workflow.nodes.assigner.config import AssignerNodeConfig
from app.core.workflow.nodes.base_config import ( from app.core.workflow.nodes.base_config import (
BaseNodeConfig, BaseNodeConfig,
VariableDefinition, VariableDefinition,
VariableType, VariableType,
) )
from app.core.workflow.nodes.start.config import StartNodeConfig
from app.core.workflow.nodes.end.config import EndNodeConfig from app.core.workflow.nodes.end.config import EndNodeConfig
from app.core.workflow.nodes.llm.config import LLMNodeConfig, MessageConfig
from app.core.workflow.nodes.agent.config import AgentNodeConfig
from app.core.workflow.nodes.transform.config import TransformNodeConfig
from app.core.workflow.nodes.if_else.config import IfElseNodeConfig
from app.core.workflow.nodes.knowledge.config import KnowledgeRetrievalNodeConfig
from app.core.workflow.nodes.http_request.config import HttpRequestNodeConfig from app.core.workflow.nodes.http_request.config import HttpRequestNodeConfig
from app.core.workflow.nodes.assigner.config import AssignerNodeConfig from app.core.workflow.nodes.if_else.config import IfElseNodeConfig
from app.core.workflow.nodes.jinja_render.config import JinjaRenderNodeConfig
from app.core.workflow.nodes.knowledge.config import KnowledgeRetrievalNodeConfig
from app.core.workflow.nodes.llm.config import LLMNodeConfig, MessageConfig
from app.core.workflow.nodes.start.config import StartNodeConfig
from app.core.workflow.nodes.transform.config import TransformNodeConfig
__all__ = [ __all__ = [
# 基础类 # 基础类
@@ -33,5 +34,6 @@ __all__ = [
"IfElseNodeConfig", "IfElseNodeConfig",
"KnowledgeRetrievalNodeConfig", "KnowledgeRetrievalNodeConfig",
"AssignerNodeConfig", "AssignerNodeConfig",
"HttpRequestNodeConfig" "HttpRequestNodeConfig",
"JinjaRenderNodeConfig",
] ]

View File

@@ -24,6 +24,7 @@ class NodeType(StrEnum):
TOOL = "tool" TOOL = "tool"
AGENT = "agent" AGENT = "agent"
ASSIGNER = "assigner" ASSIGNER = "assigner"
JINJARENDER = "jinja-render"
class ComparisonOperator(StrEnum): class ComparisonOperator(StrEnum):

View File

@@ -188,6 +188,50 @@ class HttpRequestNodeConfig(BaseNodeConfig):
description="Configuration for handling HTTP request errors", description="Configuration for handling HTTP request errors",
) )
class Config:
json_schema_extra = {
"examples": [
{
"method": "GET",
"url": "{{sys.message}}",
"auth": {
"auth_type": "none",
"header": "",
"api_key": ""
},
"headers": {
# "Content-Type": "application/json",
# "User-Agent": "Workflow-HttpNode/1.0"
},
"params": {},
"body": {
"content_type": "none",
"data": ""
},
"verify_ssl": True,
"timeouts": {
"connect_timeout": 5,
"read_timeout": 30,
"write_timeout": 10
},
"retry": {
"max_attempts": 3,
"retry_interval": 500
},
"error_handle": {
"method": "default",
"default": {
"body": "Upstream service unavailable",
"status_code": 502,
"headers": {
"Content-Type": "text/plain"
}
}
}
}
]
}
class HttpRequestNodeOutput(BaseModel): class HttpRequestNodeOutput(BaseModel):
body: str = Field( body: str = Field(

View File

@@ -7,7 +7,7 @@ import httpx
# import filetypes # TODO: File support (Feature) # import filetypes # TODO: File support (Feature)
from httpx import AsyncClient, Response, Timeout from httpx import AsyncClient, Response, Timeout
from app.core.workflow.nodes import BaseNode, WorkflowState from app.core.workflow.nodes.base_node import BaseNode, WorkflowState
from app.core.workflow.nodes.enums import HttpRequestMethod, HttpErrorHandle, HttpAuthType, HttpContentType from app.core.workflow.nodes.enums import HttpRequestMethod, HttpErrorHandle, HttpAuthType, HttpContentType
from app.core.workflow.nodes.http_request.config import HttpRequestNodeConfig, HttpRequestNodeOutput from app.core.workflow.nodes.http_request.config import HttpRequestNodeConfig, HttpRequestNodeOutput
@@ -204,6 +204,7 @@ class HttpRequestNode(BaseNode):
timeout=self._build_timeout(), timeout=self._build_timeout(),
headers=self._build_header(state) | self._build_auth(state), headers=self._build_header(state) | self._build_auth(state),
params=self._build_params(state), params=self._build_params(state),
follow_redirects=True
) as client: ) as client:
retries = self.typed_config.retry.max_attempts retries = self.typed_config.retry.max_attempts
while retries > 0: while retries > 0:

View File

@@ -158,10 +158,21 @@ class IfElseNode(BaseNode):
async def execute(self, state: WorkflowState) -> Any: async def execute(self, state: WorkflowState) -> Any:
""" """
Execute the conditional branching logic of the node.
Evaluates the node's configured conditional expressions (expressions) in order.
Once a condition is satisfied, it returns the corresponding CASE identifier.
If none of the conditions match, it returns the default last CASE.
Args:
state (WorkflowState): The current workflow state, containing variables, messages, node outputs, etc.
Returns:
str: The matched branch identifier, e.g., 'CASE1', 'CASE2', ..., used for node transitions.
""" """
expressions = self.build_conditional_edge_expressions() expressions = self.build_conditional_edge_expressions()
for i in range(len(expressions)): for i in range(len(expressions)):
logger.info(expressions[i]) logger.info(expressions[i])
if self._evaluate_condition(expressions[i], state): if self._evaluate_condition(expressions[i], state):
return f'CASE{i+1}' return f'CASE{i + 1}'
return f'CASE{len(expressions)}' return f'CASE{len(expressions)}'

View File

@@ -0,0 +1,4 @@
from app.core.workflow.nodes.jinja_render.config import JinjaRenderNodeConfig
from app.core.workflow.nodes.jinja_render.node import JinjaRenderNode
__all__ = ["JinjaRenderNode", "JinjaRenderNodeConfig"]

View File

@@ -0,0 +1,24 @@
from pydantic import Field, BaseModel
from app.core.workflow.nodes.base_config import BaseNodeConfig
class VariablesMappingConfig(BaseModel):
name: str = Field(
...,
description="The variable name to be rendered"
)
value: str = Field(
...,
description="The corresponding value from the workflow"
)
class JinjaRenderNodeConfig(BaseNodeConfig):
template: str = Field(
...,
description="The Jinja2 template string to be rendered"
)
mapping: list[VariablesMappingConfig] = Field(
...,
description="Mapping configuration for variables used in the template"
)

View File

@@ -0,0 +1,45 @@
from typing import Any
from app.core.workflow.nodes import WorkflowState
from app.core.workflow.nodes.base_node import BaseNode
from app.core.workflow.nodes.jinja_render.config import JinjaRenderNodeConfig
from app.core.workflow.template_renderer import TemplateRenderer
class JinjaRenderNode(BaseNode):
def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]):
super().__init__(node_config, workflow_config)
self.typed_config = JinjaRenderNodeConfig(**self.config)
async def execute(self, state: WorkflowState) -> Any:
"""
Execute the node: render the Jinja2 template with mapped variables.
The rendered result is returned in a structure compatible with WorkflowState
merging, so that downstream nodes can access it via node_outputs.
Args:
state (WorkflowState): Current workflow state containing variables,
node outputs, and runtime variables.
Returns:
dict[str, Any]: Node output dictionary containing the rendered result
under `node_outputs[self.node_id]["output"]["rendered"]` and a
status flag "completed".
Raises:
RuntimeError: If Jinja2 template rendering fails due to invalid template
syntax or missing variables.
"""
render = TemplateRenderer(strict=False)
context = {}
for variable in self.typed_config.mapping:
context[variable.name] = self._render_template(variable.value, state)
try:
res = render.env.from_string(self.typed_config.template).render(**context)
except Exception as e:
raise RuntimeError(f"JinjaRender Node {self.node_name} render failed: {e}") from e
return res

View File

@@ -36,3 +36,19 @@ class KnowledgeRetrievalNodeConfig(BaseNodeConfig):
default=RetrieveType.PARTICIPLE, default=RetrieveType.PARTICIPLE,
description="Retrieve type" description="Retrieve type"
) )
class Config:
json_schema_extra = {
"examples": [
{
"query": "{{sys.message}}",
"kb_ids": [
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
],
"similarity_threshold": 0.2,
"vector_similarity_weight": 0.3,
"top_k": 1,
"retrieve_type": "hybrid"
}
]
}

View File

@@ -163,5 +163,6 @@ class KnowledgeRetrievalNode(BaseNode):
indices=indices, indices=indices,
score_threshold=self.typed_config.similarity_threshold) score_threshold=self.typed_config.similarity_threshold)
# Deduplicate hybrid retrieval results # Deduplicate hybrid retrieval results
rs = self._deduplicate_docs(rs1, rs2) unique_rs = self._deduplicate_docs(rs1, rs2)
rs = vector_service.rerank(query=query, docs=unique_rs, top_k=self.typed_config.top_k)
return [chunk.model_dump() for chunk in rs] return [chunk.model_dump() for chunk in rs]

View File

@@ -7,17 +7,18 @@
import logging import logging
from typing import Any, Union from typing import Any, Union
from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNode
from app.core.workflow.nodes.http_request import HttpRequestNode
from app.core.workflow.nodes.agent import AgentNode from app.core.workflow.nodes.agent import AgentNode
from app.core.workflow.nodes.assigner import AssignerNode
from app.core.workflow.nodes.base_node import BaseNode from app.core.workflow.nodes.base_node import BaseNode
from app.core.workflow.nodes.end import EndNode from app.core.workflow.nodes.end import EndNode
from app.core.workflow.nodes.enums import NodeType from app.core.workflow.nodes.enums import NodeType
from app.core.workflow.nodes.http_request import HttpRequestNode
from app.core.workflow.nodes.if_else import IfElseNode from app.core.workflow.nodes.if_else import IfElseNode
from app.core.workflow.nodes.jinja_render import JinjaRenderNode
from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNode
from app.core.workflow.nodes.llm import LLMNode from app.core.workflow.nodes.llm import LLMNode
from app.core.workflow.nodes.start import StartNode from app.core.workflow.nodes.start import StartNode
from app.core.workflow.nodes.transform import TransformNode from app.core.workflow.nodes.transform import TransformNode
from app.core.workflow.nodes.assigner import AssignerNode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -32,6 +33,7 @@ WorkflowNode = Union[
AssignerNode, AssignerNode,
HttpRequestNode, HttpRequestNode,
KnowledgeRetrievalNode, KnowledgeRetrievalNode,
JinjaRenderNode,
] ]
@@ -52,6 +54,7 @@ class NodeFactory:
NodeType.KNOWLEDGE_RETRIEVAL: KnowledgeRetrievalNode, NodeType.KNOWLEDGE_RETRIEVAL: KnowledgeRetrievalNode,
NodeType.ASSIGNER: AssignerNode, NodeType.ASSIGNER: AssignerNode,
NodeType.HTTP_REQUEST: HttpRequestNode, NodeType.HTTP_REQUEST: HttpRequestNode,
NodeType.JINJARENDER: JinjaRenderNode,
} }
@classmethod @classmethod

View File

@@ -7,7 +7,7 @@
import logging import logging
from typing import Any from typing import Any
from jinja2 import Template, TemplateSyntaxError, UndefinedError, Environment, StrictUndefined from jinja2 import TemplateSyntaxError, UndefinedError, Environment, StrictUndefined, Undefined
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -22,7 +22,7 @@ class TemplateRenderer:
strict: 是否使用严格模式(未定义变量会抛出异常) strict: 是否使用严格模式(未定义变量会抛出异常)
""" """
self.env = Environment( self.env = Environment(
undefined=StrictUndefined if strict else None, undefined=StrictUndefined if strict else Undefined,
autoescape=False # 不自动转义,因为我们处理的是文本而非 HTML autoescape=False # 不自动转义,因为我们处理的是文本而非 HTML
) )