fix(app): Workflow import verification

This commit is contained in:
Timebomb2018
2026-03-10 16:18:22 +08:00
parent e81faebf69
commit f941541304
4 changed files with 221 additions and 26 deletions

View File

@@ -4,65 +4,145 @@
# @Time : 2026/2/25 14:11
from typing import Any
from app.core.logging_config import get_logger
from app.core.workflow.adapters.base_adapter import (
PlatformMetadata,
PlatformType,
BasePlatformAdapter,
WorkflowParserResult
)
from app.schemas.workflow_schema import ExecutionConfig
from app.core.workflow.adapters.errors import ExceptionDefineition, ExceptionType, UnsupportNodeType
from app.core.workflow.adapters.memory_bear.memory_bear_converter import MemoryBearConverter
from app.core.workflow.nodes.enums import NodeType
from app.schemas.workflow_schema import ExecutionConfig, NodeDefinition, EdgeDefinition, VariableDefinition
logger = get_logger()
VALID_NODE_TYPES = frozenset(t.value for t in NodeType if t != NodeType.UNKNOWN)
class MemoryBearAdapter(BasePlatformAdapter):
NODE_TYPE_MAPPING = {}
class MemoryBearAdapter(BasePlatformAdapter, MemoryBearConverter):
NODE_TYPE_MAPPING = {t.value: t for t in NodeType}
def __init__(self, config: dict[str, Any]):
MemoryBearConverter.__init__(self)
BasePlatformAdapter.__init__(self, config)
@property
def origin_nodes(self):
return self.config.get("workflow").get("nodes")
return self.config.get("workflow").get("nodes") or []
@property
def origin_edges(self):
return self.config.get("workflow").get("edges")
return self.config.get("workflow").get("edges") or []
@property
def origin_variables(self):
return self.config.get("workflow").get("variables")
return self.config.get("workflow").get("variables") or []
def get_metadata(self) -> PlatformMetadata:
return PlatformMetadata(
platform_name=PlatformType.MEMORY_BEAR,
version="0.2.5",
support_node_types=list(self.NODE_TYPE_MAPPING.keys())
support_node_types=list(VALID_NODE_TYPES)
)
def map_node_type(self, platform_node_type) -> str:
return platform_node_type
def map_node_type(self, platform_node_type: str) -> NodeType:
return self.NODE_TYPE_MAPPING.get(platform_node_type, NodeType.UNKNOWN)
@staticmethod
def _valid_nodes(node: dict[str, Any]):
if "type" not in node["data"]:
return False
def _valid_node(node: dict[str, Any]) -> bool:
if "id" not in node or "type" not in node:
return False
if not isinstance(node.get("config"), dict):
return False
return True
def validate_config(self) -> bool:
require_fields = frozenset({'app', 'workflow'})
if not all(field in self.config for field in require_fields):
return False
for node in self.origin_nodes:
if not self._valid_nodes(node):
if not self._valid_node(node):
return False
return True
def _convert_node(self, node: dict[str, Any]) -> NodeDefinition | None:
node_id = node.get("id")
node_name = node.get("name")
try:
node_type = self.map_node_type(node["type"])
if node_type == NodeType.UNKNOWN:
self.errors.append(UnsupportNodeType(
node_id=node_id,
node_type=node["type"]
))
return None
config = node.get("config") or {}
converter = self.get_node_convert(node_type)
converter(node_id, node_name, config) # validates and appends errors if invalid
return NodeDefinition(**node)
except Exception as e:
self.errors.append(ExceptionDefineition(
type=ExceptionType.NODE,
node_id=node_id,
node_name=node_name,
detail=f"convert node error - {e}"
))
logger.debug(f"MemoryBear convert node error - {e}", exc_info=True)
return None
def _convert_edge(self, edge: dict[str, Any], valid_node_ids: set) -> EdgeDefinition | None:
try:
if edge.get("source") not in valid_node_ids or edge.get("target") not in valid_node_ids:
self.warnings.append(ExceptionDefineition(
type=ExceptionType.EDGE,
detail=f"edge {edge.get('id')} skipped: source or target node not found"
))
return None
return EdgeDefinition(**edge)
except Exception as e:
self.errors.append(ExceptionDefineition(
type=ExceptionType.EDGE,
detail=f"convert edge error - {e}"
))
logger.debug(f"MemoryBear convert edge error - {e}", exc_info=True)
return None
def _convert_variable(self, variable: dict[str, Any]) -> VariableDefinition | None:
try:
return VariableDefinition(**variable)
except Exception as e:
self.warnings.append(ExceptionDefineition(
type=ExceptionType.VARIABLE,
name=variable.get("name"),
detail=f"convert variable error - {e}"
))
logger.debug(f"MemoryBear convert variable error - {e}", exc_info=True)
return None
def parse_workflow(self) -> WorkflowParserResult:
self.nodes = self.origin_nodes
self.edges = self.origin_edges
self.conv_variables = self.origin_variables
for node in self.origin_nodes:
converted = self._convert_node(node)
if converted:
self.nodes.append(converted)
valid_node_ids = {n.id for n in self.nodes}
for edge in self.origin_edges:
converted = self._convert_edge(edge, valid_node_ids)
if converted:
self.edges.append(converted)
for variable in self.origin_variables:
converted = self._convert_variable(variable)
if converted:
self.conv_variables.append(converted)
return WorkflowParserResult(
success=True,
success=not self.errors and not self.warnings,
platform=self.get_metadata(),
execution_config=ExecutionConfig(),
origin_config=self.config,
@@ -72,5 +152,4 @@ class MemoryBearAdapter(BasePlatformAdapter):
variables=self.conv_variables,
warnings=self.warnings,
errors=self.errors,
)

View File

@@ -0,0 +1,85 @@
# -*- coding: UTF-8 -*-
from app.core.workflow.adapters.base_converter import BaseConverter
from app.core.workflow.adapters.errors import ExceptionDefineition, ExceptionType
from app.core.workflow.nodes.base_config import BaseNodeConfig
from app.core.workflow.nodes.configs import (
StartNodeConfig,
EndNodeConfig,
LLMNodeConfig,
AgentNodeConfig,
IfElseNodeConfig,
KnowledgeRetrievalNodeConfig,
AssignerNodeConfig,
CodeNodeConfig,
HttpRequestNodeConfig,
JinjaRenderNodeConfig,
VariableAggregatorNodeConfig,
ParameterExtractorNodeConfig,
LoopNodeConfig,
IterationNodeConfig,
QuestionClassifierNodeConfig,
ToolNodeConfig,
MemoryReadNodeConfig,
MemoryWriteNodeConfig,
NoteNodeConfig,
)
from app.core.workflow.nodes.enums import NodeType
class MemoryBearConverter(BaseConverter):
errors: list
warnings: list
CONFIG_CLASS_MAP: dict[NodeType, type[BaseNodeConfig]] = {
NodeType.START: StartNodeConfig,
NodeType.END: EndNodeConfig,
NodeType.ANSWER: EndNodeConfig,
NodeType.LLM: LLMNodeConfig,
NodeType.AGENT: AgentNodeConfig,
NodeType.IF_ELSE: IfElseNodeConfig,
NodeType.KNOWLEDGE_RETRIEVAL: KnowledgeRetrievalNodeConfig,
NodeType.ASSIGNER: AssignerNodeConfig,
NodeType.CODE: CodeNodeConfig,
NodeType.HTTP_REQUEST: HttpRequestNodeConfig,
NodeType.JINJARENDER: JinjaRenderNodeConfig,
NodeType.VAR_AGGREGATOR: VariableAggregatorNodeConfig,
NodeType.PARAMETER_EXTRACTOR: ParameterExtractorNodeConfig,
NodeType.LOOP: LoopNodeConfig,
NodeType.ITERATION: IterationNodeConfig,
NodeType.QUESTION_CLASSIFIER: QuestionClassifierNodeConfig,
NodeType.TOOL: ToolNodeConfig,
NodeType.MEMORY_READ: MemoryReadNodeConfig,
NodeType.MEMORY_WRITE: MemoryWriteNodeConfig,
NodeType.NOTES: NoteNodeConfig,
}
@staticmethod
def _convert_file(var):
return None
@staticmethod
def _convert_array_file(var):
return []
def config_validate(self, node_id: str, node_name: str, config_cls: type[BaseNodeConfig], value: dict):
try:
return config_cls.model_validate(value)
except Exception as e:
self.errors.append(ExceptionDefineition(
type=ExceptionType.CONFIG,
node_id=node_id,
node_name=node_name,
detail=str(e)
))
return None
def get_node_convert(self, node_type: NodeType):
config_cls = self.CONFIG_CLASS_MAP.get(node_type)
if not config_cls:
return lambda node_id, node_name, config: config
def validate(node_id: str, node_name: str, config: dict):
self.config_validate(node_id, node_name, config_cls, config)
return config
return validate