feat(workflow): 增强工作流节点解析功能
添加工作流节点解析方法,支持工具和知识库ID的匹配与验证 改进知识库和工具解析逻辑,优先匹配ID并处理共享资源
This commit is contained in:
@@ -14,12 +14,14 @@ from app.models.app_model import App, AppType
|
|||||||
from app.models.appshare_model import AppShare
|
from app.models.appshare_model import AppShare
|
||||||
from app.models.app_release_model import AppRelease
|
from app.models.app_release_model import AppRelease
|
||||||
from app.models.knowledge_model import Knowledge
|
from app.models.knowledge_model import Knowledge
|
||||||
|
from app.models.knowledgeshare_model import KnowledgeShare
|
||||||
from app.models.models_model import ModelConfig
|
from app.models.models_model import ModelConfig
|
||||||
from app.models.tool_model import ToolConfig as ToolConfigModel
|
from app.models.tool_model import ToolConfig as ToolConfigModel
|
||||||
from app.models.skill_model import Skill
|
from app.models.skill_model import Skill
|
||||||
from app.models.workflow_model import WorkflowConfig
|
from app.models.workflow_model import WorkflowConfig
|
||||||
from app.services.workflow_service import WorkflowService
|
from app.services.workflow_service import WorkflowService
|
||||||
from app.core.workflow.adapters.memory_bear.memory_bear_adapter import MemoryBearAdapter
|
from app.core.workflow.adapters.memory_bear.memory_bear_adapter import MemoryBearAdapter
|
||||||
|
from app.core.workflow.nodes.enums import NodeType
|
||||||
from app.models.memory_config_model import MemoryConfig as MemoryConfigModel
|
from app.models.memory_config_model import MemoryConfig as MemoryConfigModel
|
||||||
|
|
||||||
|
|
||||||
@@ -359,7 +361,11 @@ class AppDslService:
|
|||||||
self.db.add(MultiAgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
|
self.db.add(MultiAgentConfig(id=uuid.uuid4(), app_id=app_id, is_active=True, created_at=now, **fields))
|
||||||
|
|
||||||
elif app_type == AppType.WORKFLOW:
|
elif app_type == AppType.WORKFLOW:
|
||||||
adapter = MemoryBearAdapter(dsl)
|
raw_wf = dsl.get("workflow") or {}
|
||||||
|
raw_nodes = raw_wf.get("nodes") or []
|
||||||
|
resolved_nodes = self._resolve_workflow_nodes(raw_nodes, tenant_id, workspace_id, warnings)
|
||||||
|
resolved_dsl = {**dsl, "workflow": {**raw_wf, "nodes": resolved_nodes}}
|
||||||
|
adapter = MemoryBearAdapter(resolved_dsl)
|
||||||
if not adapter.validate_config():
|
if not adapter.validate_config():
|
||||||
raise BusinessException("工作流配置格式无效", BizCode.BAD_REQUEST)
|
raise BusinessException("工作流配置格式无效", BizCode.BAD_REQUEST)
|
||||||
result = adapter.parse_workflow()
|
result = adapter.parse_workflow()
|
||||||
@@ -367,7 +373,6 @@ class AppDslService:
|
|||||||
warnings.append(f"[节点错误] {e.node_name or e.node_id}: {e.detail}")
|
warnings.append(f"[节点错误] {e.node_name or e.node_id}: {e.detail}")
|
||||||
for w in result.warnings:
|
for w in result.warnings:
|
||||||
warnings.append(f"[节点警告] {w.node_name or w.node_id}: {w.detail}")
|
warnings.append(f"[节点警告] {w.node_name or w.node_id}: {w.detail}")
|
||||||
wf = dsl.get("workflow") or {}
|
|
||||||
wf_service = WorkflowService(self.db)
|
wf_service = WorkflowService(self.db)
|
||||||
if create:
|
if create:
|
||||||
wf_service.create_workflow_config(
|
wf_service.create_workflow_config(
|
||||||
@@ -375,9 +380,9 @@ class AppDslService:
|
|||||||
nodes=[n.model_dump() for n in result.nodes],
|
nodes=[n.model_dump() for n in result.nodes],
|
||||||
edges=[e.model_dump() for e in result.edges],
|
edges=[e.model_dump() for e in result.edges],
|
||||||
variables=[v.model_dump() for v in result.variables],
|
variables=[v.model_dump() for v in result.variables],
|
||||||
execution_config=wf.get("execution_config", {}),
|
execution_config=raw_wf.get("execution_config", {}),
|
||||||
features=wf.get("features", {}),
|
features=raw_wf.get("features", {}),
|
||||||
triggers=wf.get("triggers", []),
|
triggers=raw_wf.get("triggers", []),
|
||||||
validate=False,
|
validate=False,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
@@ -386,9 +391,9 @@ class AppDslService:
|
|||||||
existing.nodes = [n.model_dump() for n in result.nodes]
|
existing.nodes = [n.model_dump() for n in result.nodes]
|
||||||
existing.edges = [e.model_dump() for e in result.edges]
|
existing.edges = [e.model_dump() for e in result.edges]
|
||||||
existing.variables = [v.model_dump() for v in result.variables]
|
existing.variables = [v.model_dump() for v in result.variables]
|
||||||
existing.execution_config = wf.get("execution_config", {})
|
existing.execution_config = raw_wf.get("execution_config", {})
|
||||||
existing.features = wf.get("features", {})
|
existing.features = raw_wf.get("features", {})
|
||||||
existing.triggers = wf.get("triggers", [])
|
existing.triggers = raw_wf.get("triggers", [])
|
||||||
existing.updated_at = now
|
existing.updated_at = now
|
||||||
else:
|
else:
|
||||||
wf_service.create_workflow_config(
|
wf_service.create_workflow_config(
|
||||||
@@ -396,9 +401,9 @@ class AppDslService:
|
|||||||
nodes=[n.model_dump() for n in result.nodes],
|
nodes=[n.model_dump() for n in result.nodes],
|
||||||
edges=[e.model_dump() for e in result.edges],
|
edges=[e.model_dump() for e in result.edges],
|
||||||
variables=[v.model_dump() for v in result.variables],
|
variables=[v.model_dump() for v in result.variables],
|
||||||
execution_config=wf.get("execution_config", {}),
|
execution_config=raw_wf.get("execution_config", {}),
|
||||||
features=wf.get("features", {}),
|
features=raw_wf.get("features", {}),
|
||||||
triggers=wf.get("triggers", []),
|
triggers=raw_wf.get("triggers", []),
|
||||||
validate=False,
|
validate=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -446,27 +451,63 @@ class AppDslService:
|
|||||||
def _resolve_kb(self, ref: Optional[dict], workspace_id: uuid.UUID, warnings: list) -> Optional[str]:
|
def _resolve_kb(self, ref: Optional[dict], workspace_id: uuid.UUID, warnings: list) -> Optional[str]:
|
||||||
if not ref:
|
if not ref:
|
||||||
return None
|
return None
|
||||||
kb = self.db.query(Knowledge).filter(
|
kb_id = ref.get("id")
|
||||||
Knowledge.workspace_id == workspace_id,
|
if kb_id:
|
||||||
Knowledge.name == ref.get("name")
|
try:
|
||||||
).first()
|
kb_uuid = uuid.UUID(str(kb_id))
|
||||||
if not kb:
|
kb_share = self.db.query(KnowledgeShare).filter(
|
||||||
warnings.append(f"知识库 '{ref.get('name')}' 未匹配,已置空,请导入后手动配置")
|
KnowledgeShare.target_workspace_id == workspace_id,
|
||||||
return str(kb.id) if kb else None
|
KnowledgeShare.source_kb_id == kb_uuid
|
||||||
|
).first()
|
||||||
|
if kb_share:
|
||||||
|
kb = self.db.query(Knowledge).filter(
|
||||||
|
Knowledge.id == kb_share.target_kb_id
|
||||||
|
).first()
|
||||||
|
if kb and kb.status == 1:
|
||||||
|
return str(kb_share.target_kb_id)
|
||||||
|
kb = self.db.query(Knowledge).filter(
|
||||||
|
Knowledge.workspace_id == workspace_id,
|
||||||
|
Knowledge.id == kb_uuid,
|
||||||
|
Knowledge.status == 1
|
||||||
|
).first()
|
||||||
|
if kb:
|
||||||
|
return str(kb.id)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
pass
|
||||||
|
warnings.append(f"知识库 '{kb_id}' 未匹配,已置空,请导入后手动配置")
|
||||||
|
return None
|
||||||
|
|
||||||
def _resolve_tool(self, ref: Optional[dict], tenant_id: uuid.UUID, warnings: list) -> Optional[str]:
|
def _resolve_tool(self, ref: Optional[dict], tenant_id: uuid.UUID, warnings: list) -> Optional[str]:
|
||||||
if not ref:
|
if not ref:
|
||||||
return None
|
return None
|
||||||
q = self.db.query(ToolConfigModel).filter(
|
tool_id = ref.get("id")
|
||||||
ToolConfigModel.tenant_id == tenant_id,
|
tool_name = ref.get("name")
|
||||||
ToolConfigModel.name == ref.get("name")
|
if tool_id:
|
||||||
)
|
try:
|
||||||
if ref.get("tool_type"):
|
tool_uuid = uuid.UUID(str(tool_id))
|
||||||
q = q.filter(ToolConfigModel.tool_type == ref["tool_type"])
|
t = self.db.query(ToolConfigModel).filter(
|
||||||
t = q.first()
|
ToolConfigModel.id == tool_uuid,
|
||||||
if not t:
|
ToolConfigModel.tenant_id == tenant_id,
|
||||||
warnings.append(f"工具 '{ref.get('name')}' 未匹配,已置空,请导入后手动配置")
|
ToolConfigModel.is_active.is_(True)
|
||||||
return str(t.id) if t else None
|
).first()
|
||||||
|
if t:
|
||||||
|
return str(t.id)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
pass
|
||||||
|
if tool_name:
|
||||||
|
q = self.db.query(ToolConfigModel).filter(
|
||||||
|
ToolConfigModel.tenant_id == tenant_id,
|
||||||
|
ToolConfigModel.name == tool_name
|
||||||
|
)
|
||||||
|
if ref.get("tool_type"):
|
||||||
|
q = q.filter(ToolConfigModel.tool_type == ref["tool_type"])
|
||||||
|
t = q.first()
|
||||||
|
if t:
|
||||||
|
return str(t.id)
|
||||||
|
warnings.append(f"工具 '{tool_name}' 未匹配,已置空,请导入后手动配置")
|
||||||
|
else:
|
||||||
|
warnings.append(f"工具 '{tool_id}' 未匹配,已置空,请导入后手动配置")
|
||||||
|
return None
|
||||||
|
|
||||||
def _resolve_release(self, ref: Optional[dict], warnings: list) -> Optional[uuid.UUID]:
|
def _resolve_release(self, ref: Optional[dict], warnings: list) -> Optional[uuid.UUID]:
|
||||||
if not ref:
|
if not ref:
|
||||||
@@ -508,6 +549,61 @@ class AppDslService:
|
|||||||
result.append(entry)
|
result.append(entry)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def _resolve_workflow_nodes(self, nodes: list, tenant_id: uuid.UUID, workspace_id: uuid.UUID, warnings: list) -> list:
|
||||||
|
"""解析工作流节点中的工具ID和知识库ID,匹配不到则清空配置"""
|
||||||
|
resolved_nodes = []
|
||||||
|
for node in nodes:
|
||||||
|
node_type = node.get("type")
|
||||||
|
config = dict(node.get("config") or {})
|
||||||
|
node_label = node.get("name") or node.get("id")
|
||||||
|
if node_type == NodeType.TOOL.value:
|
||||||
|
tool_id = config.get("tool_id")
|
||||||
|
if not tool_id:
|
||||||
|
# tool_id 本身就是空,直接置空不重复 warning
|
||||||
|
config["tool_id"] = None
|
||||||
|
config["tool_parameters"] = {}
|
||||||
|
else:
|
||||||
|
tool_ref = {}
|
||||||
|
if isinstance(tool_id, str) and len(tool_id) >= 36:
|
||||||
|
try:
|
||||||
|
uuid.UUID(tool_id)
|
||||||
|
tool_ref["id"] = tool_id
|
||||||
|
except ValueError:
|
||||||
|
tool_ref["name"] = tool_id
|
||||||
|
else:
|
||||||
|
tool_ref["name"] = tool_id
|
||||||
|
resolved_tool_id = self._resolve_tool(tool_ref, tenant_id, [])
|
||||||
|
if resolved_tool_id:
|
||||||
|
config["tool_id"] = resolved_tool_id
|
||||||
|
else:
|
||||||
|
warnings.append(f"[{node_label}] 工具 '{tool_id}' 未匹配,已置空,请导入后手动配置")
|
||||||
|
config["tool_id"] = None
|
||||||
|
config["tool_parameters"] = {}
|
||||||
|
elif node_type == NodeType.KNOWLEDGE_RETRIEVAL.value:
|
||||||
|
knowledge_bases = config.get("knowledge_bases") or []
|
||||||
|
resolved_kbs = []
|
||||||
|
for kb in knowledge_bases:
|
||||||
|
kb_id = kb.get("kb_id")
|
||||||
|
if not kb_id:
|
||||||
|
continue
|
||||||
|
kb_ref = {}
|
||||||
|
if isinstance(kb_id, str) and len(kb_id) >= 36:
|
||||||
|
try:
|
||||||
|
uuid.UUID(kb_id)
|
||||||
|
kb_ref["id"] = kb_id
|
||||||
|
except ValueError:
|
||||||
|
kb_ref["name"] = kb_id
|
||||||
|
else:
|
||||||
|
kb_ref["name"] = kb_id
|
||||||
|
resolved_id = self._resolve_kb(kb_ref, workspace_id, [])
|
||||||
|
if resolved_id:
|
||||||
|
resolved_kbs.append({**kb, "kb_id": resolved_id})
|
||||||
|
else:
|
||||||
|
warnings.append(f"[{node_label}] 知识库 '{kb_id}' 未匹配,已移除,请导入后手动配置")
|
||||||
|
config["knowledge_bases"] = resolved_kbs
|
||||||
|
resolved_nodes.append({**node, "config": config})
|
||||||
|
return resolved_nodes
|
||||||
|
|
||||||
def _resolve_knowledge_retrieval(self, kr: Optional[dict], workspace_id: uuid.UUID, warnings: list) -> Optional[dict]:
|
def _resolve_knowledge_retrieval(self, kr: Optional[dict], workspace_id: uuid.UUID, warnings: list) -> Optional[dict]:
|
||||||
if not kr:
|
if not kr:
|
||||||
return kr
|
return kr
|
||||||
|
|||||||
Reference in New Issue
Block a user