From fe4a53563e2d1745d2a7cd8cf569f020ad717233 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:04:44 +0800 Subject: [PATCH 01/13] fix(workflow): use loose rendering for end-node variables --- api/app/core/workflow/nodes/base_node.py | 165 +++++++++--------- api/app/core/workflow/nodes/end/node.py | 16 +- api/app/core/workflow/template_renderer.py | 39 +++-- .../workflows/simple_qa/template.yml | 2 +- 4 files changed, 126 insertions(+), 96 deletions(-) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index e7007884..727f7391 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -35,7 +35,7 @@ class WorkflowState(TypedDict): # Uses a deep merge function, supporting nested dict updates (e.g., conv.xxx) variables: Annotated[dict[str, Any], lambda x, y: { **x, - **{k: {**x.get(k, {}), **v} if isinstance(v, dict) and isinstance(x.get(k), dict) else v + **{k: {**x.get(k, {}), **v} if isinstance(v, dict) and isinstance(x.get(k), dict) else v for k, v in y.items()} }] @@ -46,12 +46,12 @@ class WorkflowState(TypedDict): # Runtime node variables (simplified version, stores business data for fast access between nodes) # Format: {node_id: business_result} runtime_vars: Annotated[dict[str, Any], lambda x, y: {**x, **y}] - + # Execution context execution_id: str workspace_id: str user_id: str - + # Error information (for error edges) error: str | None error_node: str | None @@ -66,7 +66,7 @@ class BaseNode(ABC): 所有节点类型都应该继承此基类,实现 execute 方法。 """ - + def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): """初始化节点 @@ -83,7 +83,7 @@ class BaseNode(ABC): # 使用 or 运算符处理 None 值 self.config = node_config.get("config") or {} self.error_handling = node_config.get("error_handling") or {} - + @abstractmethod async def execute(self, state: WorkflowState) -> Any: """执行节点业务逻辑(非流式) @@ -108,7 +108,7 @@ class BaseNode(ABC): >>> return {"message": "开始", "conversation_id": "xxx"} """ pass - + async def execute_stream(self, state: WorkflowState): """执行节点业务逻辑(流式) @@ -138,7 +138,7 @@ class BaseNode(ABC): result = await self.execute(state) # 默认实现:直接 yield 完成标记 yield {"__final__": True, "result": result} - + def supports_streaming(self) -> bool: """节点是否支持流式输出 @@ -147,7 +147,7 @@ class BaseNode(ABC): """ # 检查子类是否重写了 execute_stream 方法 return self.execute_stream.__func__ != BaseNode.execute_stream.__func__ - + def get_timeout(self) -> int: """获取超时时间(秒) @@ -156,7 +156,7 @@ class BaseNode(ABC): """ return 60 # return self.error_handling.get("timeout", 60) - + async def run(self, state: WorkflowState) -> dict[str, Any]: """执行节点(带错误处理和输出包装,非流式) @@ -173,33 +173,33 @@ class BaseNode(ABC): 标准化的状态更新字典 """ import time - + start_time = time.time() timeout = self.get_timeout() - + try: # 调用节点的业务逻辑 business_result = await asyncio.wait_for( self.execute(state), timeout=timeout ) - + elapsed_time = time.time() - start_time - + # 提取处理后的输出(调用子类的 _extract_output) extracted_output = self._extract_output(business_result) - + # 包装成标准输出格式 wrapped_output = self._wrap_output(business_result, elapsed_time, state) - + # 将提取后的输出存储到运行时变量中(供后续节点快速访问) # 如果提取后的输出是字典,拆包存储;否则存储为 output 字段 if isinstance(extracted_output, dict): runtime_var = extracted_output else: runtime_var = {"output": extracted_output} - + # 返回包装后的输出和运行时变量 return { **wrapped_output, @@ -208,7 +208,7 @@ class BaseNode(ABC): }, "looping": state["looping"] } - + except TimeoutError: elapsed_time = time.time() - start_time logger.error(f"节点 {self.node_id} 执行超时({timeout}秒)") @@ -217,7 +217,7 @@ class BaseNode(ABC): elapsed_time = time.time() - start_time logger.error(f"节点 {self.node_id} 执行失败: {e}", exc_info=True) return self._wrap_error(str(e), elapsed_time, state) - + async def run_stream(self, state: WorkflowState): """Execute node with error handling and output wrapping (streaming) @@ -240,40 +240,41 @@ class BaseNode(ABC): State updates with streaming buffer and final result """ import time - + start_time = time.time() timeout = self.get_timeout() - + try: # Get LangGraph's stream writer for sending custom data writer = get_stream_writer() - + # Check if this is an End node # End nodes CAN send chunks (for suffix), but only after LLM content is_end_node = self.node_type == "end" - + # Check if this node is adjacent to End node (for message type) is_adjacent_to_end = getattr(self, '_is_adjacent_to_end', False) - + # Determine chunk type: "message" for End and adjacent nodes, "node_chunk" for others chunk_type = "message" if (is_end_node or is_adjacent_to_end) else "node_chunk" - - logger.debug(f"节点 {self.node_id} chunk 类型: {chunk_type} (is_end={is_end_node}, adjacent={is_adjacent_to_end})") - + + logger.debug( + f"节点 {self.node_id} chunk 类型: {chunk_type} (is_end={is_end_node}, adjacent={is_adjacent_to_end})") + # Accumulate complete result (for final wrapping) chunks = [] final_result = None chunk_count = 0 - + # Stream chunks in real-time loop_start = asyncio.get_event_loop().time() - + async for item in self.execute_stream(state): # Check timeout if asyncio.get_event_loop().time() - loop_start > timeout: raise TimeoutError() - + # Check if it's a completion marker if isinstance(item, dict) and item.get("__final__"): final_result = item["result"] @@ -282,10 +283,10 @@ class BaseNode(ABC): chunk_count += 1 chunks.append(item) full_content = "".join(chunks) - + # Send chunks for all nodes (including End nodes for suffix) logger.debug(f"节点 {self.node_id} 发送 chunk #{chunk_count}: {item[:50]}...") - + # 1. Send via stream writer (for real-time client updates) writer({ "type": chunk_type, # "message" or "node_chunk" @@ -294,7 +295,7 @@ class BaseNode(ABC): "full_content": full_content, "chunk_index": chunk_count }) - + # 2. Update streaming buffer in state (for downstream nodes) # Only non-End nodes need streaming buffer if not is_end_node: @@ -313,7 +314,7 @@ class BaseNode(ABC): chunk_str = str(item) chunks.append(chunk_str) full_content = "".join(chunks) - + # Send chunks for all nodes writer({ "type": chunk_type, # "message" or "node_chunk" @@ -322,7 +323,7 @@ class BaseNode(ABC): "full_content": full_content, "chunk_index": chunk_count }) - + # Only non-End nodes need streaming buffer if not is_end_node: yield { @@ -334,23 +335,23 @@ class BaseNode(ABC): } } } - + elapsed_time = time.time() - start_time - + logger.info(f"节点 {self.node_id} 流式执行完成,耗时: {elapsed_time:.2f}s, chunks: {chunk_count}") - + # Extract processed output (call subclass's _extract_output) extracted_output = self._extract_output(final_result) - + # Wrap final result final_output = self._wrap_output(final_result, elapsed_time, state) - + # Store extracted output in runtime variables (for quick access by subsequent nodes) if isinstance(extracted_output, dict): runtime_var = extracted_output else: runtime_var = {"output": extracted_output} - + # Build complete state update (including node_outputs, runtime_vars, and final streaming buffer) state_update = { **final_output, @@ -359,7 +360,7 @@ class BaseNode(ABC): }, "looping": state["looping"] } - + # Add streaming buffer for non-End nodes if not is_end_node: state_update["streaming_buffer"] = { @@ -369,11 +370,11 @@ class BaseNode(ABC): "is_complete": True # Mark as complete } } - + # Finally yield state update # LangGraph will merge this into state yield state_update - + except TimeoutError: elapsed_time = time.time() - start_time logger.error(f"节点 {self.node_id} 执行超时 ({timeout}s)") @@ -384,12 +385,12 @@ class BaseNode(ABC): logger.error(f"节点 {self.node_id} 执行失败: {e}", exc_info=True) error_output = self._wrap_error(str(e), elapsed_time, state) yield error_output - + def _wrap_output( - self, - business_result: Any, - elapsed_time: float, - state: WorkflowState + self, + business_result: Any, + elapsed_time: float, + state: WorkflowState ) -> dict[str, Any]: """将业务结果包装成标准输出格式 @@ -403,13 +404,13 @@ class BaseNode(ABC): """ # 提取输入数据(用于记录) input_data = self._extract_input(state) - + # 提取 token 使用情况(如果有) token_usage = self._extract_token_usage(business_result) - + # 提取实际输出(去除元数据) output = self._extract_output(business_result) - + # 构建标准节点输出 node_output = { "node_id": self.node_id, @@ -422,18 +423,18 @@ class BaseNode(ABC): "token_usage": token_usage, "error": None } - + return { "node_outputs": { self.node_id: node_output } } - + def _wrap_error( - self, - error_message: str, - elapsed_time: float, - state: WorkflowState + self, + error_message: str, + elapsed_time: float, + state: WorkflowState ) -> dict[str, Any]: """将错误包装成标准输出格式 @@ -447,10 +448,10 @@ class BaseNode(ABC): """ # 查找错误边 error_edge = self._find_error_edge() - + # 提取输入数据 input_data = self._extract_input(state) - + # 构建错误输出 node_output = { "node_id": self.node_id, @@ -463,7 +464,7 @@ class BaseNode(ABC): "token_usage": None, "error": error_message } - + if error_edge: # 有错误边:记录错误并继续 logger.warning( @@ -480,7 +481,7 @@ class BaseNode(ABC): # 无错误边:抛出异常停止工作流 logger.error(f"节点 {self.node_id} 执行失败,停止工作流: {error_message}") raise Exception(f"节点 {self.node_id} 执行失败: {error_message}") - + def _extract_input(self, state: WorkflowState) -> dict[str, Any]: """提取节点输入数据(用于记录) @@ -494,7 +495,7 @@ class BaseNode(ABC): """ # 默认返回配置 return {"config": self.config} - + def _extract_output(self, business_result: Any) -> Any: """从业务结果中提取实际输出 @@ -508,7 +509,7 @@ class BaseNode(ABC): """ # 默认直接返回业务结果 return business_result - + def _extract_token_usage(self, business_result: Any) -> dict[str, int] | None: """从业务结果中提取 token 使用情况 @@ -522,7 +523,7 @@ class BaseNode(ABC): """ # 默认返回 None return None - + def _find_error_edge(self) -> dict[str, Any] | None: """查找错误边 @@ -533,8 +534,8 @@ class BaseNode(ABC): if edge.get("source") == self.node_id and edge.get("type") == "error": return edge return None - - def _render_template(self, template: str, state: WorkflowState | None, struct: bool = True) -> str: + + def _render_template(self, template: str, state: WorkflowState | None, strict: bool = True) -> str: """渲染模板 支持的变量命名空间: @@ -550,28 +551,28 @@ class BaseNode(ABC): 渲染后的字符串 """ from app.core.workflow.template_renderer import render_template - + # 处理 state 为 None 的情况 if state is None: state = {} - + # 使用变量池获取变量 pool = VariablePool(state) - + # 构建完整的 variables 结构 variables = { "sys": pool.get_all_system_vars(), "conv": pool.get_all_conversation_vars() } - + return render_template( template=template, variables=variables, node_outputs=pool.get_all_node_outputs(), system_vars=pool.get_all_system_vars(), - struct=struct + strict=strict ) - + def _evaluate_condition(self, expression: str, state: WorkflowState | None) -> bool: """评估条件表达式 @@ -588,20 +589,20 @@ class BaseNode(ABC): 布尔值结果 """ from app.core.workflow.expression_evaluator import evaluate_condition - + # 处理 state 为 None 的情况 if state is None: state = {} - + # 使用变量池获取变量 pool = VariablePool(state) - + # 构建完整的 variables 结构(包含 sys 和 conv) variables = { "sys": pool.get_all_system_vars(), "conv": pool.get_all_conversation_vars() } - + return evaluate_condition( expression=expression, variables=variables, @@ -626,12 +627,12 @@ class BaseNode(ABC): >>> llm_output = pool.get("llm_qa.output") """ return VariablePool(state) - + def get_variable( - self, - selector: list[str] | str, - state: WorkflowState, - default: Any = None + self, + selector: list[str] | str, + state: WorkflowState, + default: Any = None ) -> Any: """获取变量值(便捷方法) @@ -650,7 +651,7 @@ class BaseNode(ABC): """ pool = VariablePool(state) return pool.get(selector, default=default) - + def has_variable(self, selector: list[str] | str, state: WorkflowState) -> bool: """检查变量是否存在(便捷方法) diff --git a/api/app/core/workflow/nodes/end/node.py b/api/app/core/workflow/nodes/end/node.py index 6230345c..6195afbd 100644 --- a/api/app/core/workflow/nodes/end/node.py +++ b/api/app/core/workflow/nodes/end/node.py @@ -37,7 +37,7 @@ class EndNode(BaseNode): # 如果配置了输出模板,使用模板渲染;否则使用默认输出 if output_template: - output = self._render_template(output_template, state, struct=False) + output = self._render_template(output_template, state, strict=False) else: output = "工作流已完成" @@ -156,6 +156,16 @@ class EndNode(BaseNode): if not output_template: output = "工作流已完成" + from langgraph.config import get_stream_writer + writer = get_stream_writer() + writer({ + "type": "message", # End node output uses message type + "node_id": self.node_id, + "chunk": "", + "full_content": output, + "chunk_index": 1, + "is_suffix": False + }) yield {"__final__": True, "result": output} return @@ -190,7 +200,7 @@ class EndNode(BaseNode): if upstream_llm_ref_index is None: # No reference to direct upstream LLM node, output complete template content - output = self._render_template(output_template, state) + output = self._render_template(output_template, state, strict=False) logger.info(f"节点 {self.node_id} 没有引用直接上游 LLM 节点,输出完整内容: '{output[:50]}...'") # Send complete content via writer (as a single message chunk) @@ -246,7 +256,7 @@ class EndNode(BaseNode): suffix = "".join(suffix_parts) # 构建完整输出(用于返回,包含前缀 + 动态内容 + 后缀) - full_output = self._render_template(output_template, state) + full_output = self._render_template(output_template, state, strict=False) logger.info(f"[后缀调试] 节点 {self.node_id} 后缀部分数量: {len(suffix_parts)}") logger.info(f"[后缀调试] 后缀内容: '{suffix}'") diff --git a/api/app/core/workflow/template_renderer.py b/api/app/core/workflow/template_renderer.py index 198a3322..b6305b8c 100644 --- a/api/app/core/workflow/template_renderer.py +++ b/api/app/core/workflow/template_renderer.py @@ -5,6 +5,7 @@ """ import logging +from collections import defaultdict from typing import Any from jinja2 import TemplateSyntaxError, UndefinedError, Environment, StrictUndefined, Undefined @@ -12,6 +13,18 @@ from jinja2 import TemplateSyntaxError, UndefinedError, Environment, StrictUndef logger = logging.getLogger(__name__) +class SafeUndefined(Undefined): + """访问未定义属性不会报错,返回空字符串""" + __slots__ = () + + def _fail_with_undefined_error(self, *args, **kwargs): + return "" + + __add__ = __radd__ = __mul__ = __rmul__ = __div__ = __rdiv__ = __truediv__ = __rtruediv__ = _fail_with_undefined_error + __getitem__ = __getattr__ = _fail_with_undefined_error + __str__ = __repr__ = lambda self: "" + + class TemplateRenderer: """模板渲染器""" @@ -21,8 +34,9 @@ class TemplateRenderer: Args: strict: 是否使用严格模式(未定义变量会抛出异常) """ + self.strict = strict self.env = Environment( - undefined=StrictUndefined if strict else Undefined, + undefined=StrictUndefined if strict else SafeUndefined, autoescape=False # 不自动转义,因为我们处理的是文本而非 HTML ) @@ -69,12 +83,17 @@ class TemplateRenderer: # variables 的结构:{"sys": {...}, "conv": {...}} sys_vars = variables.get("sys", {}) if isinstance(variables, dict) else {} conv_vars = variables.get("conv", {}) if isinstance(variables, dict) else {} - - context = { - "conv": conv_vars, # 会话变量:{{conv.user_name}} - "node": node_outputs, # 节点输出:{{node.node_1.output}} - "sys": {**(system_vars or {}), **sys_vars}, # 系统变量:{{sys.execution_id}}(合并两个来源) - } + if self.strict: + context = defaultdict(dict) + context["conv"] = conv_vars + context["nodes"] = node_outputs + context["sys"] = {**(system_vars or {}), **sys_vars} + else: + context = { + "conv": conv_vars, # 会话变量:{{conv.user_name}} + "node": node_outputs, # 节点输出:{{node.node_1.output}} + "sys": {**(system_vars or {}), **sys_vars}, # 系统变量:{{sys.execution_id}}(合并两个来源) + } # 支持直接通过节点ID访问节点输出:{{llm_qa.output}} # 将所有节点输出添加到顶层上下文 @@ -141,12 +160,12 @@ def render_template( variables: dict[str, Any], node_outputs: dict[str, Any], system_vars: dict[str, Any] | None = None, - struct: bool = True + strict: bool = True ) -> str: """渲染模板(便捷函数) Args: - struct: 渲染模式 + strict: 严格模式 template: 模板字符串 variables: 用户变量 node_outputs: 节点输出 @@ -164,7 +183,7 @@ def render_template( ... ) '请分析: 这是一段文本' """ - renderer = TemplateRenderer(strict=struct) + renderer = TemplateRenderer(strict=strict) return renderer.render(template, variables, node_outputs, system_vars) diff --git a/api/app/templates/workflows/simple_qa/template.yml b/api/app/templates/workflows/simple_qa/template.yml index 2cf0f9b1..14de4a73 100644 --- a/api/app/templates/workflows/simple_qa/template.yml +++ b/api/app/templates/workflows/simple_qa/template.yml @@ -53,7 +53,7 @@ nodes: type: end name: 结束 config: - output: "{{llm_qa.output}}" + output: "{{ llm_qa.output }}" position: x: 900 y: 100 From dd7abc0d27a7a14ef1a2c9f349b1946ae75c6732 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:06:12 +0800 Subject: [PATCH 02/13] fix(workflow): use int type for memory node config id --- api/app/core/workflow/nodes/memory/config.py | 4 ++-- api/app/core/workflow/nodes/memory/node.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api/app/core/workflow/nodes/memory/config.py b/api/app/core/workflow/nodes/memory/config.py index 317dc507..987230c1 100644 --- a/api/app/core/workflow/nodes/memory/config.py +++ b/api/app/core/workflow/nodes/memory/config.py @@ -11,7 +11,7 @@ class MemoryReadNodeConfig(BaseNodeConfig): ... ) - config_id: str = Field( + config_id: int = Field( ... ) @@ -26,6 +26,6 @@ class MemoryWriteNodeConfig(BaseNodeConfig): ... ) - config_id: str = Field( + config_id: int = Field( ... ) diff --git a/api/app/core/workflow/nodes/memory/node.py b/api/app/core/workflow/nodes/memory/node.py index bb2366f6..0d1b1fb4 100644 --- a/api/app/core/workflow/nodes/memory/node.py +++ b/api/app/core/workflow/nodes/memory/node.py @@ -25,7 +25,7 @@ class MemoryReadNode(BaseNode): return await MemoryAgentService().read_memory( group_id=end_user_id, message=self._render_template(self.typed_config.message, state), - config_id=self.typed_config.config_id, + config_id=str(self.typed_config.config_id), search_switch=self.typed_config.search_switch, history=[], db=db, @@ -52,7 +52,7 @@ class MemoryWriteNode(BaseNode): return await MemoryAgentService().write_memory( group_id=end_user_id, message=self._render_template(self.typed_config.message, state), - config_id=self.typed_config.config_id, + config_id=str(self.typed_config.config_id), db=db, storage_type="neo4j", user_rag_memory_id="" From 592c2ac2173e64cd7948f448949534fd26c03154 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:09:06 +0800 Subject: [PATCH 03/13] fix(workflow): handle missing environment variable defaults --- api/app/core/workflow/executor.py | 23 +++++++++++++++----- api/app/core/workflow/graph_builder.py | 1 - api/app/core/workflow/nodes/assigner/node.py | 2 +- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index d42fcf75..e3d634d8 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -14,6 +14,7 @@ from langgraph.graph.state import CompiledStateGraph from app.core.workflow.graph_builder import GraphBuilder from app.core.workflow.nodes import WorkflowState +from app.core.workflow.nodes.base_config import VariableType from app.core.workflow.nodes.enums import NodeType # from app.core.tools.registry import ToolRegistry @@ -78,9 +79,21 @@ class WorkflowExecutor: var_name = var_def.get("name") var_default = var_def.get("default") if var_name: - # TODO: 入参类型校验 - conversation_vars[var_name] = var_default - + if var_default: + conversation_vars[var_name] = var_default + else: + var_type = var_def.get("type") + match var_type: + case VariableType.STRING: + conversation_vars[var_name] = "" + case VariableType.NUMBER: + conversation_vars[var_name] = 0 + case VariableType.OBJECT: + conversation_vars[var_name] = {} + case VariableType.BOOLEAN: + conversation_vars[var_name] = False + case VariableType.ARRAY_NUMBER | VariableType.ARRAY_OBJECT | VariableType.ARRAY_BOOLEAN | VariableType.ARRAY_STRING: + conversation_vars[var_name] = [] input_variables = input_data.get("variables") or {} # Start 节点的自定义变量 # 构建分层的变量结构 @@ -362,7 +375,7 @@ class WorkflowExecutor: inputv = payload.get("input", {}) variables = inputv.get("variables", {}) variables_sys = variables.get("sys", {}) - conversation_id = variables_sys.get("conversation_id") + conversation_id = input_data.get("conversation_id") execution_id = variables_sys.get("execution_id") logger.info(f"[DEBUG] Node starts execution: {node_name}") @@ -381,7 +394,7 @@ class WorkflowExecutor: inputv = result.get("input", {}) variables = inputv.get("variables", {}) variables_sys = variables.get("sys", {}) - conversation_id = variables_sys.get("conversation_id") + conversation_id = input_data.get("conversation_id") execution_id = variables_sys.get("execution_id") logger.info(f"[DEBUG] Node execution completed: {node_name}") diff --git a/api/app/core/workflow/graph_builder.py b/api/app/core/workflow/graph_builder.py index b24d5202..69ed3b6a 100644 --- a/api/app/core/workflow/graph_builder.py +++ b/api/app/core/workflow/graph_builder.py @@ -12,7 +12,6 @@ from app.core.workflow.nodes.enums import NodeType logger = logging.getLogger(__name__) -# TODO: 子图拆解支持 class GraphBuilder: def __init__( self, diff --git a/api/app/core/workflow/nodes/assigner/node.py b/api/app/core/workflow/nodes/assigner/node.py index 008002ed..7b9d645b 100644 --- a/api/app/core/workflow/nodes/assigner/node.py +++ b/api/app/core/workflow/nodes/assigner/node.py @@ -45,6 +45,7 @@ class AssignerNode(BaseNode): # Get the value or expression to assign value = assignment.value + logger.debug(f"left:{variable_selector}, right: {value}") pattern = r"\{\{\s*(.*?)\s*\}\}" if isinstance(value, str): expression = re.match(pattern, value) @@ -85,4 +86,3 @@ class AssignerNode(BaseNode): case _: raise ValueError(f"Invalid Operator: {assignment.operation}") logger.info(f"Node {self.node_id}: execution completed") - From 9427584825d87f72073332bed9bd9b7a300ac859 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:10:01 +0800 Subject: [PATCH 04/13] fix(workflow): render jinja variables with actual values in non-strict mode --- api/app/core/workflow/nodes/jinja_render/node.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api/app/core/workflow/nodes/jinja_render/node.py b/api/app/core/workflow/nodes/jinja_render/node.py index e18a2001..70993573 100644 --- a/api/app/core/workflow/nodes/jinja_render/node.py +++ b/api/app/core/workflow/nodes/jinja_render/node.py @@ -38,7 +38,11 @@ class JinjaRenderNode(BaseNode): context = {} for variable in self.typed_config.mapping: - context[variable.name] = self._render_template(variable.value, state) + try: + context[variable.name] = self.get_variable(variable.value, state) + except Exception: + logger.info(f"variable not found, var: {variable.value}") + continue try: res = render.env.from_string(self.typed_config.template).render(**context) From 8f114b0dfa6f76f8671a14188270a632bd267f57 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:23:30 +0800 Subject: [PATCH 05/13] fix(workflow): support reordering without a rerank model in knowledge base --- .../core/workflow/nodes/knowledge/config.py | 2 +- api/app/core/workflow/nodes/knowledge/node.py | 25 +++++++++++++++---- api/app/core/workflow/nodes/operators.py | 5 +++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/api/app/core/workflow/nodes/knowledge/config.py b/api/app/core/workflow/nodes/knowledge/config.py index cdb83131..9d307216 100644 --- a/api/app/core/workflow/nodes/knowledge/config.py +++ b/api/app/core/workflow/nodes/knowledge/config.py @@ -45,7 +45,7 @@ class KnowledgeRetrievalNodeConfig(BaseNodeConfig): ) reranker_id: UUID = Field( - ..., + default="", description="Reranker top k" ) diff --git a/api/app/core/workflow/nodes/knowledge/node.py b/api/app/core/workflow/nodes/knowledge/node.py index 5a6b2a7f..061328e1 100644 --- a/api/app/core/workflow/nodes/knowledge/node.py +++ b/api/app/core/workflow/nodes/knowledge/node.py @@ -203,19 +203,34 @@ class KnowledgeRetrievalNode(BaseNode): rs2 = vector_service.search_by_full_text(query=query, top_k=kb_config.top_k, indices=indices, score_threshold=kb_config.similarity_threshold) + # Deduplicate hy brid retrieval results unique_rs = self._deduplicate_docs(rs1, rs2) if not unique_rs: continue - vector_service.reranker = self.get_reranker_model() - rs.extend(vector_service.rerank(query=query, docs=unique_rs, top_k=kb_config.top_k)) + if self.typed_config.reranker_id: + vector_service.reranker = self.get_reranker_model() + rs.extend(vector_service.rerank(query=query, docs=unique_rs, top_k=kb_config.top_k)) + else: + rs.extend(sorted( + unique_rs, + key=lambda d: d.metadata.get("score", 0), + reverse=True + )[:kb_config.top_k]) case _: raise RuntimeError("Unknown retrieval type") if not rs: return [] - vector_service.reranker = self.get_reranker_model() - # TODO:其他重排序方式支持 - final_rs = vector_service.rerank(query=query, docs=rs, top_k=self.typed_config.reranker_top_k) + if self.typed_config.reranker_id: + vector_service.reranker = self.get_reranker_model() + final_rs = vector_service.rerank(query=query, docs=rs, top_k=self.typed_config.reranker_top_k) + else: + final_rs = sorted( + rs, + key=lambda d: d.metadata.get("score", 0), + reverse=True + )[:self.typed_config.reranker_top_k] + logger.info( f"Node {self.node_id}: knowledge base retrieval completed, results count: {len(final_rs)}" ) diff --git a/api/app/core/workflow/nodes/operators.py b/api/app/core/workflow/nodes/operators.py index 25caec07..ad38284a 100644 --- a/api/app/core/workflow/nodes/operators.py +++ b/api/app/core/workflow/nodes/operators.py @@ -386,7 +386,10 @@ class ArrayComparisonOperator(ConditionBase): return self.right_value not in self.left_value -class NoneObjectComparisonOperator(ConditionBase): +class NoneObjectComparisonOperator: + def __init__(self, *arg, **kwargs): + pass + def __getattr__(self, name): return lambda *args, **kwargs: False From ada63d9f5cab5cec1399d7b9e0b89a1d398db0b3 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 15:40:22 +0800 Subject: [PATCH 06/13] fix(workflow): fix typo in key value --- api/app/core/workflow/template_renderer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/core/workflow/template_renderer.py b/api/app/core/workflow/template_renderer.py index b6305b8c..c2d7f255 100644 --- a/api/app/core/workflow/template_renderer.py +++ b/api/app/core/workflow/template_renderer.py @@ -86,7 +86,7 @@ class TemplateRenderer: if self.strict: context = defaultdict(dict) context["conv"] = conv_vars - context["nodes"] = node_outputs + context["node"] = node_outputs context["sys"] = {**(system_vars or {}), **sys_vars} else: context = { From e60bc37fbf4a9866bab9c1bf22352f121fbfad0f Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Tue, 13 Jan 2026 17:28:41 +0800 Subject: [PATCH 07/13] fix(workflow): set default empty value for custom variables in start node --- api/app/core/workflow/nodes/start/node.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/api/app/core/workflow/nodes/start/node.py b/api/app/core/workflow/nodes/start/node.py index 7c3a2fca..f9927f0c 100644 --- a/api/app/core/workflow/nodes/start/node.py +++ b/api/app/core/workflow/nodes/start/node.py @@ -7,6 +7,7 @@ Start 节点实现 import logging from typing import Any +from app.core.workflow.nodes.base_config import VariableType from app.core.workflow.nodes.base_node import BaseNode, WorkflowState from app.core.workflow.nodes.start.config import StartNodeConfig @@ -113,6 +114,18 @@ class StartNode(BaseNode): logger.debug( f"变量 '{var_name}' 使用默认值: {var_def.default}" ) + else: + match var_def.type: + case VariableType.STRING: + processed[var_name] = "" + case VariableType.NUMBER: + processed[var_name] = 0 + case VariableType.OBJECT: + processed[var_name] = {} + case VariableType.BOOLEAN: + processed[var_name] = False + case VariableType.ARRAY_NUMBER | VariableType.ARRAY_OBJECT | VariableType.ARRAY_BOOLEAN | VariableType.ARRAY_STRING: + processed[var_name] = [] return processed From 4448296e7b2b090773070b6a0ce7e744515f7d42 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Wed, 14 Jan 2026 10:46:23 +0800 Subject: [PATCH 08/13] feat(workflow): officially support workflow session variables --- api/app/controllers/app_controller.py | 4 +- api/app/core/workflow/executor.py | 78 +++++++++++------------- api/app/core/workflow/graph_builder.py | 4 +- api/app/core/workflow/nodes/base_node.py | 4 +- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index 2300f148..f55ea5b5 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -60,14 +60,14 @@ def list_apps( """ workspace_id = current_user.current_workspace_id service = app_service.AppService(db) - + # 当 ids 存在且不为 None 时,根据 ids 获取应用 if ids is not None: app_ids = [id.strip() for id in ids.split(',') if id.strip()] items_orm = app_service.get_apps_by_ids(db, app_ids, workspace_id) items = [service._convert_to_schema(app, workspace_id) for app in items_orm] return success(data=items) - + # 正常分页查询 items_orm, total = app_service.list_apps( db, diff --git a/api/app/core/workflow/executor.py b/api/app/core/workflow/executor.py index e3d634d8..67689935 100644 --- a/api/app/core/workflow/executor.py +++ b/api/app/core/workflow/executor.py @@ -3,13 +3,11 @@ 基于 LangGraph 的工作流执行引擎。 """ - -# import uuid import datetime import logging +import uuid from typing import Any -from langchain_core.messages import HumanMessage from langgraph.graph.state import CompiledStateGraph from app.core.workflow.graph_builder import GraphBuilder @@ -55,6 +53,12 @@ class WorkflowExecutor: self.edges = workflow_config.get("edges", []) self.execution_config = workflow_config.get("execution_config", {}) + self.checkpoint_config = { + "configurable": { + "thread_id": uuid.uuid4(), + } + } + def _prepare_initial_state(self, input_data: dict[str, Any]) -> WorkflowState: """准备初始状态(注入系统变量和会话变量) @@ -95,7 +99,7 @@ class WorkflowExecutor: case VariableType.ARRAY_NUMBER | VariableType.ARRAY_OBJECT | VariableType.ARRAY_BOOLEAN | VariableType.ARRAY_STRING: conversation_vars[var_name] = [] input_variables = input_data.get("variables") or {} # Start 节点的自定义变量 - + conversation_vars = conversation_vars | input_data.get("conv", {}) # 构建分层的变量结构 variables = { "sys": { @@ -110,7 +114,7 @@ class WorkflowExecutor: } return { - "messages": [HumanMessage(content=user_message)], + "messages": [('user', user_message)], "variables": variables, "node_outputs": {}, "runtime_vars": {}, # 运行时节点变量(简化版,供快速访问) @@ -196,6 +200,28 @@ class WorkflowExecutor: logger.info(f"[前缀分析] 与 End 相邻且被引用的节点: {adjacent_and_referenced}") return prefixes, adjacent_and_referenced + def _build_final_output(self, result, elapsed_time): + node_outputs = result.get("node_outputs", {}) + final_output = self._extract_final_output(node_outputs) + token_usage = self._aggregate_token_usage(node_outputs) + conversation_id = None + for node_id, node_output in node_outputs.items(): + if node_output.get("node_type") == "start": + conversation_id = node_output.get("output", {}).get("conversation_id") + break + + return { + "status": "completed", + "output": final_output, + "node_outputs": node_outputs, + "messages": result.get("messages", []), + "conversation_id": conversation_id, + "elapsed_time": elapsed_time, + "token_usage": token_usage, + "error": result.get("error"), + "variables": result.get("variables", {}), + } + def build_graph(self, stream=False) -> CompiledStateGraph: """构建 LangGraph @@ -236,40 +262,16 @@ class WorkflowExecutor: # 3. 执行工作流 try: - result = await graph.ainvoke(initial_state) + + result = await graph.ainvoke(initial_state, config=self.checkpoint_config) # 计算耗时 end_time = datetime.datetime.now() elapsed_time = (end_time - start_time).total_seconds() - # 提取节点输出(现在包含 start 和 end 节点) - node_outputs = result.get("node_outputs", {}) - - # 提取最终输出(从最后一个非 start/end 节点) - final_output = self._extract_final_output(node_outputs) - - # 聚合 token 使用情况 - token_usage = self._aggregate_token_usage(node_outputs) - - # 提取 conversation_id(从 start 节点输出) - conversation_id = None - for node_id, node_output in node_outputs.items(): - if node_output.get("node_type") == "start": - conversation_id = node_output.get("output", {}).get("conversation_id") - break - logger.info(f"工作流执行完成: execution_id={self.execution_id}, elapsed_time={elapsed_time:.2f}s") - return { - "status": "completed", - "output": final_output, - "node_outputs": node_outputs, - "messages": result.get("messages", []), - "conversation_id": conversation_id, - "elapsed_time": elapsed_time, - "token_usage": token_usage, - "error": result.get("error") - } + return self._build_final_output(result, elapsed_time) except Exception as e: # 计算耗时(即使失败也记录) @@ -331,11 +333,11 @@ class WorkflowExecutor: # 3. Execute workflow try: chunk_count = 0 - final_state = None async for event in graph.astream( initial_state, stream_mode=["updates", "debug", "custom"], # Use updates + debug + custom mode + config=self.checkpoint_config ): # event should be a tuple: (mode, data) # But let's handle both cases @@ -411,12 +413,11 @@ class WorkflowExecutor: elif mode == "updates": # Handle state updates - store final state logger.debug(f"[UPDATES] 收到 state 更新 from {list(data.keys())}") - final_state = data # 计算耗时 end_time = datetime.datetime.now() elapsed_time = (end_time - start_time).total_seconds() - + result = graph.get_state(self.checkpoint_config).values logger.info( f"Workflow execution completed (streaming), " f"total chunks: {chunk_count}, elapsed: {elapsed_time:.2f}s" @@ -425,12 +426,7 @@ class WorkflowExecutor: # 发送 workflow_end 事件 yield { "event": "workflow_end", - "data": { - "execution_id": self.execution_id, - "status": "completed", - "elapsed_time": elapsed_time, - "timestamp": end_time.isoformat() - } + "data": self._build_final_output(result, elapsed_time) } except Exception as e: diff --git a/api/app/core/workflow/graph_builder.py b/api/app/core/workflow/graph_builder.py index 69ed3b6a..b75b867e 100644 --- a/api/app/core/workflow/graph_builder.py +++ b/api/app/core/workflow/graph_builder.py @@ -4,6 +4,7 @@ from typing import Any from langgraph.graph.state import CompiledStateGraph, StateGraph from langgraph.graph import START, END +from langgraph.checkpoint.memory import InMemorySaver from app.core.workflow.expression_evaluator import evaluate_condition from app.core.workflow.nodes import WorkflowState, NodeFactory @@ -249,4 +250,5 @@ class GraphBuilder: self.graph = StateGraph(WorkflowState) self.add_nodes() self.add_edges() # 添加边必须在添加节点之后 - return self.graph.compile() + checkpointer = InMemorySaver() + return self.graph.compile(checkpointer=checkpointer) diff --git a/api/app/core/workflow/nodes/base_node.py b/api/app/core/workflow/nodes/base_node.py index 727f7391..e3bf36c9 100644 --- a/api/app/core/workflow/nodes/base_node.py +++ b/api/app/core/workflow/nodes/base_node.py @@ -25,7 +25,7 @@ class WorkflowState(TypedDict): The state object passed between nodes in a workflow, containing messages, variables, node outputs, etc. """ # List of messages (append mode) - messages: Annotated[list[AnyMessage], add] + messages: Annotated[list[tuple[str, str]], add] # Set of loop node IDs, used for assigning values in loop nodes cycle_nodes: list @@ -203,6 +203,7 @@ class BaseNode(ABC): # 返回包装后的输出和运行时变量 return { **wrapped_output, + "variables": state["variables"], "runtime_vars": { self.node_id: runtime_var }, @@ -355,6 +356,7 @@ class BaseNode(ABC): # Build complete state update (including node_outputs, runtime_vars, and final streaming buffer) state_update = { **final_output, + "variables": state["variables"], "runtime_vars": { self.node_id: runtime_var }, From 7438fedd6b89c179cac40c041ca59a8f7ffd1e73 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Wed, 14 Jan 2026 10:46:33 +0800 Subject: [PATCH 09/13] fix(workflow): fix workflow state not updating correctly after streaming runs --- api/app/services/workflow_service.py | 41 +++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index 68d6279b..7d3c784f 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -491,6 +491,17 @@ class WorkflowService: ) end_user_id = str(new_end_user.id) + executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) + + for exec_res in executions: + if exec_res.status == "completed": + last_state = exec_res.output_data + if isinstance(last_state, dict): + variables = last_state.get("variables", {}) + conv_vars = variables.get("conv", {}) + input_data["conv"] = conv_vars + break + result = await execute_workflow( workflow_config=workflow_config_dict, input_data=input_data, @@ -504,7 +515,7 @@ class WorkflowService: self.update_execution_status( execution.execution_id, "completed", - output_data=result.get("node_outputs", {}) + output_data=result ) else: self.update_execution_status( @@ -517,6 +528,7 @@ class WorkflowService: return { "execution_id": execution.execution_id, "status": result.get("status"), + "variables": result.get("variables"), "output": result.get("output"), # 最终输出(字符串) "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID @@ -617,6 +629,16 @@ class WorkflowService: original_user_id=payload.user_id # Save original user_id to other_id ) end_user_id = str(new_end_user.id) + executions = self.execution_repo.get_by_conversation_id(conversation_id=conversation_id_uuid) + + for exec_res in executions: + if exec_res.status == "completed": + last_state = exec_res.output_data + if isinstance(last_state, dict): + variables = last_state.get("variables", {}) + conv_vars = variables.get("conv", {}) + input_data["conv"] = conv_vars + break # 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件) async for event in self._run_workflow_stream( @@ -827,6 +849,23 @@ class WorkflowService: user_id=user_id ): # 直接转发事件(executor 已经返回正确格式) + if event.get("event") == "workflow_end": + + status = event.get("data", {}).get("status") + if status == "completed": + self.update_execution_status( + execution_id, + "completed", + output_data=event.get("data") + ) + elif status == "failed": + self.update_execution_status( + execution_id, + "failed", + output_data=event.get("data") + ) + else: + logger.error(f"unexpect workflow run status, status: {status}") yield event except Exception as e: From 84e24ede046e13af4cc127b1cf0584f1d27f3582 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Wed, 14 Jan 2026 10:47:38 +0800 Subject: [PATCH 10/13] fix(workflow): move node config validation to runtime for proper error handling --- api/app/core/workflow/nodes/assigner/node.py | 3 ++- api/app/core/workflow/nodes/cycle_graph/node.py | 1 - api/app/core/workflow/nodes/http_request/node.py | 3 ++- api/app/core/workflow/nodes/if_else/node.py | 3 ++- api/app/core/workflow/nodes/jinja_render/node.py | 3 ++- api/app/core/workflow/nodes/knowledge/node.py | 3 ++- api/app/core/workflow/nodes/llm/node.py | 3 ++- api/app/core/workflow/nodes/memory/node.py | 3 ++- api/app/core/workflow/nodes/parameter_extractor/node.py | 3 ++- api/app/core/workflow/nodes/question_classifier/node.py | 6 ++++-- api/app/core/workflow/nodes/start/node.py | 3 ++- api/app/core/workflow/nodes/tool/node.py | 3 ++- api/app/core/workflow/nodes/variable_aggregator/node.py | 3 ++- 13 files changed, 26 insertions(+), 14 deletions(-) diff --git a/api/app/core/workflow/nodes/assigner/node.py b/api/app/core/workflow/nodes/assigner/node.py index 7b9d645b..96f68ce8 100644 --- a/api/app/core/workflow/nodes/assigner/node.py +++ b/api/app/core/workflow/nodes/assigner/node.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class AssignerNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = AssignerNodeConfig(**self.config) + self.typed_config: AssignerNodeConfig | None = None async def execute(self, state: WorkflowState) -> Any: """ @@ -28,6 +28,7 @@ class AssignerNode(BaseNode): None or the result of the assignment operation. """ # Initialize a variable pool for accessing conversation, node, and system variables + self.typed_config = AssignerNodeConfig(**self.config) logger.info(f"节点 {self.node_id} 开始执行") pool = VariablePool(state) for assignment in self.typed_config.assignments: diff --git a/api/app/core/workflow/nodes/cycle_graph/node.py b/api/app/core/workflow/nodes/cycle_graph/node.py index fb062f39..1659395e 100644 --- a/api/app/core/workflow/nodes/cycle_graph/node.py +++ b/api/app/core/workflow/nodes/cycle_graph/node.py @@ -30,7 +30,6 @@ class CycleGraphNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config: LoopNodeConfig | IterationNodeConfig | None = None self.cycle_nodes = list() # Nodes belonging to this cycle self.cycle_edges = list() # Edges connecting nodes within the cycle diff --git a/api/app/core/workflow/nodes/http_request/node.py b/api/app/core/workflow/nodes/http_request/node.py index 2e5de796..141cba79 100644 --- a/api/app/core/workflow/nodes/http_request/node.py +++ b/api/app/core/workflow/nodes/http_request/node.py @@ -32,7 +32,7 @@ class HttpRequestNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = HttpRequestNodeConfig(**self.config) + self.typed_config: HttpRequestNodeConfig | None = None def _build_timeout(self) -> Timeout: """ @@ -181,6 +181,7 @@ class HttpRequestNode(BaseNode): - dict: Serialized HttpRequestNodeOutput on success - str: Branch identifier (e.g. "ERROR") when branching is enabled """ + self.typed_config = HttpRequestNodeConfig(**self.config) async with httpx.AsyncClient( verify=self.typed_config.verify_ssl, timeout=self._build_timeout(), diff --git a/api/app/core/workflow/nodes/if_else/node.py b/api/app/core/workflow/nodes/if_else/node.py index 8c6d222f..41f1138b 100644 --- a/api/app/core/workflow/nodes/if_else/node.py +++ b/api/app/core/workflow/nodes/if_else/node.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) class IfElseNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = IfElseNodeConfig(**self.config) + self.typed_config: IfElseNodeConfig | None= None @staticmethod def _evaluate(operator, instance: CompareOperatorInstance) -> Any: @@ -109,6 +109,7 @@ class IfElseNode(BaseNode): Returns: str: The matched branch identifier, e.g., 'CASE1', 'CASE2', ..., used for node transitions. """ + self.typed_config = IfElseNodeConfig(**self.config) expressions = self.evaluate_conditional_edge_expressions(state) # TODO: 变量类型及文本类型解析 for i in range(len(expressions)): diff --git a/api/app/core/workflow/nodes/jinja_render/node.py b/api/app/core/workflow/nodes/jinja_render/node.py index 70993573..822f1918 100644 --- a/api/app/core/workflow/nodes/jinja_render/node.py +++ b/api/app/core/workflow/nodes/jinja_render/node.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) class JinjaRenderNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = JinjaRenderNodeConfig(**self.config) + self.typed_config: JinjaRenderNodeConfig | None = None async def execute(self, state: WorkflowState) -> Any: """ @@ -34,6 +34,7 @@ class JinjaRenderNode(BaseNode): RuntimeError: If Jinja2 template rendering fails due to invalid template syntax or missing variables. """ + self.typed_config = JinjaRenderNodeConfig(**self.config) render = TemplateRenderer(strict=False) context = {} diff --git a/api/app/core/workflow/nodes/knowledge/node.py b/api/app/core/workflow/nodes/knowledge/node.py index 061328e1..221ca079 100644 --- a/api/app/core/workflow/nodes/knowledge/node.py +++ b/api/app/core/workflow/nodes/knowledge/node.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) class KnowledgeRetrievalNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = KnowledgeRetrievalNodeConfig(**self.config) + self.typed_config: KnowledgeRetrievalNodeConfig | None = None @staticmethod def _build_kb_filter(kb_ids: list[uuid.UUID], permission: knowledge_model.PermissionType): @@ -171,6 +171,7 @@ class KnowledgeRetrievalNode(BaseNode): Raises: RuntimeError: If no valid knowledge base is found or access is denied. """ + self.typed_config = KnowledgeRetrievalNodeConfig(**self.config) query = self._render_template(self.typed_config.query, state) with get_db_read() as db: knowledge_bases = self.typed_config.knowledge_bases diff --git a/api/app/core/workflow/nodes/llm/node.py b/api/app/core/workflow/nodes/llm/node.py index 5fb86ae2..6395d3b8 100644 --- a/api/app/core/workflow/nodes/llm/node.py +++ b/api/app/core/workflow/nodes/llm/node.py @@ -68,7 +68,7 @@ class LLMNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = LLMNodeConfig(**self.config) + self.typed_config: LLMNodeConfig | None = None def _render_context(self, message, state): context = f"{self._render_template(self.typed_config.context, state)}" @@ -164,6 +164,7 @@ class LLMNode(BaseNode): Returns: LLM 响应消息 """ + self.typed_config = LLMNodeConfig(**self.config) llm, prompt_or_messages = self._prepare_llm(state, True) logger.info(f"节点 {self.node_id} 开始执行 LLM 调用(非流式)") diff --git a/api/app/core/workflow/nodes/memory/node.py b/api/app/core/workflow/nodes/memory/node.py index 0d1b1fb4..f1c99ddb 100644 --- a/api/app/core/workflow/nodes/memory/node.py +++ b/api/app/core/workflow/nodes/memory/node.py @@ -10,9 +10,10 @@ from app.services.memory_agent_service import MemoryAgentService class MemoryReadNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = MemoryReadNodeConfig(**self.config) + self.typed_config: MemoryReadNodeConfig | None = None async def execute(self, state: WorkflowState) -> Any: + self.typed_config = MemoryReadNodeConfig(**self.config) with get_db_read() as db: workspace_id = self.get_variable('sys.workspace_id', state) end_user_id = self.get_variable("sys.user_id", state) diff --git a/api/app/core/workflow/nodes/parameter_extractor/node.py b/api/app/core/workflow/nodes/parameter_extractor/node.py index 84d61aa9..ec58d96c 100644 --- a/api/app/core/workflow/nodes/parameter_extractor/node.py +++ b/api/app/core/workflow/nodes/parameter_extractor/node.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) class ParameterExtractorNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = ParameterExtractorNodeConfig(**self.config) + self.typed_config: ParameterExtractorNodeConfig | None = None @staticmethod def _get_prompt(): @@ -145,6 +145,7 @@ class ParameterExtractorNode(BaseNode): Raises: BusinessException: If LLM output cannot be parsed as valid JSON. """ + self.typed_config = ParameterExtractorNodeConfig(**self.config) llm = self._get_llm_instance() system_prompt, user_prompt = self._get_prompt() diff --git a/api/app/core/workflow/nodes/question_classifier/node.py b/api/app/core/workflow/nodes/question_classifier/node.py index b0f2c28d..aee72eda 100644 --- a/api/app/core/workflow/nodes/question_classifier/node.py +++ b/api/app/core/workflow/nodes/question_classifier/node.py @@ -21,8 +21,8 @@ class QuestionClassifierNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = QuestionClassifierNodeConfig(**self.config) - self.category_to_case_map = self._build_category_case_map() + self.typed_config: QuestionClassifierNodeConfig | None = None + self.category_to_case_map = {} def _get_llm_instance(self) -> RedBearLLM: """获取LLM实例""" @@ -67,6 +67,8 @@ class QuestionClassifierNode(BaseNode): async def execute(self, state: WorkflowState) -> dict: """执行问题分类""" + self.typed_config = QuestionClassifierNodeConfig(**self.config) + self.category_to_case_map = self._build_category_case_map() question = self.typed_config.input_variable supplement_prompt = self.typed_config.user_supplement_prompt or "" categories = self.typed_config.categories or [] diff --git a/api/app/core/workflow/nodes/start/node.py b/api/app/core/workflow/nodes/start/node.py index f9927f0c..69560422 100644 --- a/api/app/core/workflow/nodes/start/node.py +++ b/api/app/core/workflow/nodes/start/node.py @@ -35,7 +35,7 @@ class StartNode(BaseNode): super().__init__(node_config, workflow_config) # 解析并验证配置 - self.typed_config = StartNodeConfig(**self.config) + self.typed_config: StartNodeConfig | None = None async def execute(self, state: WorkflowState) -> dict[str, Any]: """执行 start 节点业务逻辑 @@ -48,6 +48,7 @@ class StartNode(BaseNode): Returns: 包含系统参数、会话变量和自定义变量的字典 """ + self.typed_config = StartNodeConfig(**self.config) logger.info(f"节点 {self.node_id} (Start) 开始执行") # 创建变量池实例(在方法内复用) diff --git a/api/app/core/workflow/nodes/tool/node.py b/api/app/core/workflow/nodes/tool/node.py index e1b5f380..a83aea9f 100644 --- a/api/app/core/workflow/nodes/tool/node.py +++ b/api/app/core/workflow/nodes/tool/node.py @@ -17,10 +17,11 @@ class ToolNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = ToolNodeConfig(**self.config) + self.typed_config: ToolNodeConfig | None = None async def execute(self, state: WorkflowState) -> dict[str, Any]: """执行工具""" + self.typed_config = ToolNodeConfig(**self.config) # 获取租户ID和用户ID tenant_id = self.get_variable("sys.tenant_id", state) user_id = self.get_variable("sys.user_id", state) diff --git a/api/app/core/workflow/nodes/variable_aggregator/node.py b/api/app/core/workflow/nodes/variable_aggregator/node.py index e6cbf75b..5bff8e33 100644 --- a/api/app/core/workflow/nodes/variable_aggregator/node.py +++ b/api/app/core/workflow/nodes/variable_aggregator/node.py @@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) class VariableAggregatorNode(BaseNode): def __init__(self, node_config: dict[str, Any], workflow_config: dict[str, Any]): super().__init__(node_config, workflow_config) - self.typed_config = VariableAggregatorNodeConfig(**self.config) + self.typed_config: VariableAggregatorNodeConfig | None = None @staticmethod def _get_express(variable_string: str) -> Any: @@ -37,6 +37,7 @@ class VariableAggregatorNode(BaseNode): - str: In non-group mode, returns the first non-None variable value. - dict: In group mode, returns a mapping of group_name -> first non-None variable value. """ + self.typed_config = VariableAggregatorNodeConfig(**self.config) if not self.typed_config.group: # -------------------------- # Non-group mode From 95b61e99729fa1952b4b9c7d4ddd7ac53da3ffd6 Mon Sep 17 00:00:00 2001 From: mengyonghao <1533512157@qq.com> Date: Wed, 14 Jan 2026 10:55:05 +0800 Subject: [PATCH 11/13] perf(workflow): optimize default value of rerank_id configuration --- api/app/core/workflow/nodes/knowledge/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/core/workflow/nodes/knowledge/config.py b/api/app/core/workflow/nodes/knowledge/config.py index 9d307216..5475636e 100644 --- a/api/app/core/workflow/nodes/knowledge/config.py +++ b/api/app/core/workflow/nodes/knowledge/config.py @@ -44,8 +44,8 @@ class KnowledgeRetrievalNodeConfig(BaseNodeConfig): description="Knowledge base config" ) - reranker_id: UUID = Field( - default="", + reranker_id: UUID | None = Field( + default=None, description="Reranker top k" ) From 9eb3e1329fc185abc622b96a28278df8bdd24ff4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=90=E5=8A=9B=E9=BD=90?= <162269739+lanceyq@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:39:47 +0800 Subject: [PATCH 12/13] Fix/content attribute (#105) * [fix]Fix the return of the "content" attribute * [changes]Improve the code based on AI review * Apply suggestion from @sourcery-ai[bot] Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * [fix]Fix the return of the "content" attribute * [changes]Improve the code based on AI review * Apply suggestion from @sourcery-ai[bot] Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> * [changes]Improve the code based on AI review --------- Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- .../access_history_manager.py | 46 +++++++++---- api/app/repositories/neo4j/graph_search.py | 65 ++++++++++++++----- 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/api/app/core/memory/storage_services/forgetting_engine/access_history_manager.py b/api/app/core/memory/storage_services/forgetting_engine/access_history_manager.py index 913874f1..1a2e3cbc 100644 --- a/api/app/core/memory/storage_services/forgetting_engine/access_history_manager.py +++ b/api/app/core/memory/storage_services/forgetting_engine/access_history_manager.py @@ -620,34 +620,52 @@ class AccessHistoryManager: new_version = current_version + 1 # 步骤2:使用乐观锁更新节点 - # 只有当版本号匹配时才更新 - update_query = f""" - MATCH (n:{node_label} {{id: $node_id}}) - """ + # 根据节点类型构建完整的查询语句 + content_field_map = { + 'Statement': 'n.statement as statement', + 'MemorySummary': 'n.content as content', + 'ExtractedEntity': 'null as content_placeholder' # 占位符,后续会被过滤 + } + + # 显式检查节点类型,不支持的类型抛出错误 + if node_label not in content_field_map: + raise ValueError( + f"Unsupported node_label: {node_label}. " + f"Supported labels are: {list(content_field_map.keys())}" + ) + + content_field = content_field_map[node_label] + + # 构建 WHERE 子句 + where_conditions = [] if group_id: - update_query += " WHERE n.group_id = $group_id" + where_conditions.append("n.group_id = $group_id") # 添加版本检查 if current_version > 0: - update_query += " AND n.version = $current_version" + where_conditions.append("n.version = $current_version") else: - # 如果节点没有版本号,检查是否为首次更新 - update_query += " AND (n.version IS NULL OR n.version = 0)" + where_conditions.append("(n.version IS NULL OR n.version = 0)") - update_query += """ + where_clause = " AND ".join(where_conditions) if where_conditions else "true" + + # 构建完整的更新查询 + update_query = f""" + MATCH (n:{node_label} {{id: $node_id}}) + WHERE {where_clause} SET n.activation_value = $activation_value, n.access_history = $access_history, n.last_access_time = $last_access_time, n.access_count = $access_count, n.version = $new_version RETURN n.id as id, - n.statement as statement, n.activation_value as activation_value, n.access_history as access_history, n.last_access_time as last_access_time, n.access_count as access_count, n.importance_score as importance_score, - n.version as version + n.version as version, + {content_field} """ update_params = { @@ -671,7 +689,11 @@ class AccessHistoryManager: f"Expected version {current_version}, but node was modified by another transaction." ) - return dict(updated_node) + # 转换为字典并移除占位符字段 + result_dict = dict(updated_node) + result_dict.pop('content_placeholder', None) + + return result_dict # 执行事务 try: diff --git a/api/app/repositories/neo4j/graph_search.py b/api/app/repositories/neo4j/graph_search.py index 1549ef86..80756793 100644 --- a/api/app/repositories/neo4j/graph_search.py +++ b/api/app/repositories/neo4j/graph_search.py @@ -66,24 +66,38 @@ async def _update_activation_values_batch( max_retries=max_retries ) - # 提取节点ID列表 - node_ids = [node.get('id') for node in nodes if node.get('id')] + # 提取节点ID列表并去重(保持原始顺序) + seen_ids = set() + unique_node_ids = [] + for node in nodes: + node_id = node.get('id') + if node_id and node_id not in seen_ids: + seen_ids.add(node_id) + unique_node_ids.append(node_id) - if not node_ids: + if not unique_node_ids: logger.warning(f"批量更新激活值:没有有效的节点ID") return nodes + + # 记录去重信息(仅针对具有有效 ID 的节点) + id_nodes_count = sum(1 for n in nodes if n.get("id")) + if len(unique_node_ids) < id_nodes_count: + logger.info( + f"批量更新激活值:检测到重复节点,具有有效ID的节点数量={id_nodes_count}, " + f"去重后唯一ID数量={len(unique_node_ids)}" + ) # 批量记录访问 try: updated_nodes = await access_manager.record_batch_access( - node_ids=node_ids, + node_ids=unique_node_ids, node_label=node_label, group_id=group_id ) logger.info( f"批量更新激活值成功: {node_label}, " - f"更新数量={len(updated_nodes)}/{len(node_ids)}" + f"更新数量={len(updated_nodes)}/{len(unique_node_ids)}" ) return updated_nodes @@ -153,19 +167,38 @@ async def _update_search_results_activation( original_nodes = results[key] updated_nodes = update_result - # 创建 ID 到原始节点的映射(用于快速查找 score) - original_map = {node.get('id'): node for node in original_nodes if node.get('id')} + # 创建 ID 到更新节点的映射(用于快速查找激活值数据) + updated_map = {node.get('id'): node for node in updated_nodes if node.get('id')} - # 合并数据:激活值来自更新结果,score 来自原始结果 + # 合并数据:保留所有原始节点(包括重复的),用更新后的激活值数据填充 merged_nodes = [] - for updated_node in updated_nodes: - node_id = updated_node.get('id') - if node_id and node_id in original_map: - # 保留原始的 score 字段 - original_score = original_map[node_id].get('score') - if original_score is not None: - updated_node['score'] = original_score - merged_nodes.append(updated_node) + for original_node in original_nodes: + node_id = original_node.get('id') + if node_id and node_id in updated_map: + # 从原始节点开始,用更新后的激活值数据覆盖 + merged_node = original_node.copy() + + # 更新激活值相关字段 + activation_fields = { + 'activation_value', + 'access_history', + 'last_access_time', + 'access_count', + 'importance_score', + 'version', + 'statement', # Statement 节点的内容字段 + 'content' # MemorySummary 节点的内容字段 + } + + # 只更新激活值相关字段,保留原始节点的其他字段 + for field in activation_fields: + if field in updated_map[node_id]: + merged_node[field] = updated_map[node_id][field] + + merged_nodes.append(merged_node) + else: + # 如果没有更新数据,保留原始节点 + merged_nodes.append(original_node) updated_results[key] = merged_nodes else: From 78bb9315b7949e8843f1df34da89d1b7d545fd63 Mon Sep 17 00:00:00 2001 From: lixinyue11 <94037597+lixinyue11@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:40:12 +0800 Subject: [PATCH 13/13] Fix/develop bug jiqun (#102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 修复RAG集群BUG * Agent应用层的记忆从深度检索改为快速检索 * 应用层快速检索添加(深度检索放在后台) * 应用层快速检索添加(深度检索放在后台) --- api/app/services/draft_run_service.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/api/app/services/draft_run_service.py b/api/app/services/draft_run_service.py index acea60b7..569684d5 100644 --- a/api/app/services/draft_run_service.py +++ b/api/app/services/draft_run_service.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, Field from sqlalchemy import select from sqlalchemy.orm import Session +from app.celery_app import celery_app from app.core.error_codes import BizCode from app.core.exceptions import BusinessException from app.core.logging_config import get_business_logger @@ -22,6 +23,7 @@ from app.core.rag.nlp.search import knowledge_retrieval from app.models import AgentConfig, ModelApiKey, ModelConfig from app.repositories.tool_repository import ToolRepository from app.schemas.prompt_schema import PromptMessageRole, render_prompt_message +from app.services import task_service from app.services.langchain_tool_server import Search from app.services.memory_agent_service import MemoryAgentService from app.services.model_parameter_merger import ModelParameterMerger @@ -101,6 +103,14 @@ def create_long_term_memory_tool(memory_config: Dict[str, Any], end_user_id: str user_rag_memory_id=user_rag_memory_id ) ) + task = celery_app.send_task( + "app.core.memory.agent.read_message", + args=[end_user_id, question, [], "1", config_id, storage_type, user_rag_memory_id] + ) + result = task_service.get_task_memory_read_result(task.id) + status = result.get("status") + logger.info(f"读取任务状态:{status}") + finally: db.close() logger.info(f'用户ID:Agent:{end_user_id}')