fix(workflow): Fix workflow release process and API call issues

This commit is contained in:
Eternity
2026-01-15 11:21:50 +08:00
parent 88abdc49fe
commit 034559aac7
8 changed files with 314 additions and 284 deletions

View File

@@ -8,9 +8,10 @@ from sqlalchemy.orm import Session
from app.core.logging_config import get_business_logger
from app.core.response_utils import success
from app.db import get_db
from app.db import get_db, get_db_read
from app.dependencies import get_share_user_id, ShareTokenData
from app.repositories import knowledge_repository
from app.repositories.workflow_repository import WorkflowConfigRepository
from app.schemas import release_share_schema, conversation_schema
from app.schemas.response_schema import PageData, PageMeta
from app.services import workspace_service
@@ -19,7 +20,8 @@ from app.services.conversation_service import ConversationService
from app.services.release_share_service import ReleaseShareService
from app.services.shared_chat_service import SharedChatService
from app.services.app_chat_service import AppChatService, get_app_chat_service
from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release
from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, \
agent_config_4_app_release, multi_agent_config_4_app_release
router = APIRouter(prefix="/public/share", tags=["Public Share"])
logger = get_business_logger()
@@ -183,7 +185,6 @@ def get_embed_code(
return success(data=embed_code)
# ---------- 会话管理接口 ----------
@router.get(
@@ -313,7 +314,7 @@ async def chat(
)
end_user_id = str(new_end_user.id)
appid=share.app_id
appid = share.app_id
"""获取存储类型和工作空间的ID"""
# 直接通过 SQLAlchemy 查询 app
@@ -427,7 +428,7 @@ async def chat(
async for event in app_chat_service.agnet_chat_stream(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id= str(new_end_user.id), # 转换为字符串
user_id=str(new_end_user.id), # 转换为字符串
variables=payload.variables,
web_search=payload.web_search,
config=agent_config,
@@ -561,13 +562,14 @@ async def chat(
# return success(data=conversation_schema.ChatResponse(**result))
elif app_type == AppType.WORKFLOW:
config = workflow_config_4_app_release(release)
if not config.id:
with get_db_read() as db:
source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id)
config.id = source_config.id
if payload.stream:
async def event_generator():
async for event in app_chat_service.workflow_chat_stream(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=end_user_id, # 转换为字符串
@@ -578,7 +580,8 @@ async def chat(
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=release.app_id,
workspace_id=workspace_id
workspace_id=workspace_id,
release_id=release.id
):
event_type = event.get("event", "message")
event_data = event.get("data", {})
@@ -610,7 +613,8 @@ async def chat(
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=release.app_id,
workspace_id=workspace_id
workspace_id=workspace_id,
release_id=release.id
)
logger.debug(
"工作流试运行返回结果",

View File

@@ -242,8 +242,9 @@ async def chat(
memory=payload.memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=app.app_id,
workspace_id=workspace_id
app_id=app.id,
workspace_id=workspace_id,
release_id=app.current_release.id,
):
event_type = event.get("event", "message")
event_data = event.get("data", {})
@@ -274,8 +275,9 @@ async def chat(
memory=payload.memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=app.app_id,
workspace_id=workspace_id
app_id=app.id,
workspace_id=workspace_id,
release_id=app.current_release.id
)
logger.debug(
"工作流试运行返回结果",

View File

@@ -10,9 +10,8 @@ from app.core.workflow.nodes.base_node import BaseNode, WorkflowState
from app.core.workflow.nodes.knowledge import KnowledgeRetrievalNodeConfig
from app.db import get_db_read
from app.models import knowledge_model, knowledgeshare_model, ModelType
from app.repositories import knowledge_repository
from app.repositories import knowledge_repository, knowledgeshare_repository
from app.schemas.chunk_schema import RetrieveType
from app.services import knowledge_service, knowledgeshare_service
from app.services.model_service import ModelConfigService
logger = logging.getLogger(__name__)
@@ -96,7 +95,7 @@ class KnowledgeRetrievalNode(BaseNode):
filters = self._build_kb_filter(kb_ids, knowledge_model.PermissionType.Share)
share_ids = knowledge_service.knowledge_repository.get_chunked_knowledgeids(
share_ids = knowledge_repository.get_chunked_knowledgeids(
db=db,
filters=filters
)
@@ -105,7 +104,7 @@ class KnowledgeRetrievalNode(BaseNode):
filters = [
knowledgeshare_model.KnowledgeShare.target_kb_id.in_(kb_ids)
]
items = knowledgeshare_service.knowledgeshare_repository.get_source_kb_ids_by_target_kb_id(
items = knowledgeshare_repository.get_source_kb_ids_by_target_kb_id(
db=db,
filters=filters
)

View File

@@ -75,6 +75,14 @@ class WorkflowExecution(Base):
nullable=False,
index=True
)
release_id = Column(
UUID(as_uuid=True),
ForeignKey("app_releases.id", ondelete="CASCADE"),
nullable=True,
index=True
)
app_id = Column(
UUID(as_uuid=True),
ForeignKey("apps.id", ondelete="CASCADE"),

View File

@@ -527,6 +527,7 @@ class AppChatService:
conversation_id: uuid.UUID,
config: WorkflowConfig,
app_id: uuid.UUID,
release_id: uuid.UUID,
workspace_id: uuid.UUID,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
@@ -549,6 +550,7 @@ class AppChatService:
payload=payload,
config=config,
workspace_id=workspace_id,
release_id=release_id,
)
async def workflow_chat_stream(
@@ -557,6 +559,7 @@ class AppChatService:
conversation_id: uuid.UUID,
config: WorkflowConfig,
app_id: uuid.UUID,
release_id: uuid.UUID,
workspace_id: uuid.UUID,
user_id: str = None,
variables: Optional[Dict[str, Any]] = None,
@@ -565,7 +568,7 @@ class AppChatService:
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
) -> AsyncGenerator[str, None]:
) -> AsyncGenerator[dict, None]:
"""聊天(流式)"""
workflow_service = WorkflowService(self.db)
payload = DraftRunRequest(
@@ -580,6 +583,7 @@ class AppChatService:
payload=payload,
config=config,
workspace_id=workspace_id,
release_id=release_id
):
yield event

View File

@@ -129,7 +129,7 @@ class AppService:
Raises:
ResourceNotFoundException: 当应用不存在时
"""
app = get_apps_by_id(self.db,app_id)
app = get_apps_by_id(self.db, app_id)
if not app:
logger.warning("应用不存在", extra={"app_id": str(app_id)})
raise ResourceNotFoundException("应用", str(app_id))
@@ -227,7 +227,6 @@ class AppService:
if not model_api_key:
raise ResourceNotFoundException("模型配置", str(multi_agent_config.default_model_config_id))
# 3. 检查子 Agent 配置
if not multi_agent_config.sub_agents or len(multi_agent_config.sub_agents) == 0:
raise BusinessException(
@@ -875,7 +874,8 @@ class AppService:
self._validate_workspace_access(app, workspace_id)
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active==True).order_by(AgentConfig.updated_at.desc())
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(
AgentConfig.updated_at.desc())
agent_cfg: Optional[AgentConfig] = self.db.scalars(stmt).first()
now = datetime.datetime.now()
@@ -948,7 +948,8 @@ class AppService:
# 只读操作,允许访问共享应用
self._validate_app_accessible(app, workspace_id)
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(AgentConfig.updated_at.desc())
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(
AgentConfig.updated_at.desc())
config = self.db.scalars(stmt).first()
if config:
@@ -1200,7 +1201,8 @@ class AppService:
default_model_config_id = None
if app.type == AppType.AGENT:
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(AgentConfig.updated_at.desc())
stmt = select(AgentConfig).where(AgentConfig.app_id == app_id, AgentConfig.is_active == True).order_by(
AgentConfig.updated_at.desc())
agent_cfg = self.db.scalars(stmt).first()
if not agent_cfg:
raise BusinessException("Agent 应用缺少配置,无法发布", BizCode.AGENT_CONFIG_MISSING)
@@ -1237,7 +1239,6 @@ class AppService:
# 4. 构建配置快照
config = {
"model_parameters": model_parameters_to_dict(multi_agent_cfg.model_parameters),
"master_agent_id": str(multi_agent_cfg.master_agent_id),
@@ -1264,6 +1265,7 @@ class AppService:
raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING)
config = {
"id": workflow_cfg.id,
"nodes": workflow_cfg.nodes,
"edges": workflow_cfg.edges,
"variables": workflow_cfg.variables,
@@ -1285,7 +1287,7 @@ class AppService:
id=uuid.uuid4(),
app_id=app_id,
version=version,
version_name = version_name,
version_name=version_name,
release_notes=release_notes,
name=app.name,
description=app.description,
@@ -1457,7 +1459,6 @@ class AppService:
BusinessException: 当应用不在指定工作空间或目标工作空间无效时
"""
logger.info(
"分享应用",
extra={
@@ -2009,7 +2010,8 @@ def create_app(db: Session, *, user_id: uuid.UUID, workspace_id: uuid.UUID, data
return service.create_app(user_id=user_id, workspace_id=workspace_id, data=data)
def update_app(db: Session, *, app_id: uuid.UUID, data: app_schema.AppUpdate, workspace_id: uuid.UUID | None = None) -> App:
def update_app(db: Session, *, app_id: uuid.UUID, data: app_schema.AppUpdate,
workspace_id: uuid.UUID | None = None) -> App:
"""更新应用(向后兼容接口)"""
service = AppService(db)
return service.update_app(app_id=app_id, data=data, workspace_id=workspace_id)
@@ -2021,12 +2023,15 @@ def delete_app(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None
return service.delete_app(app_id=app_id, workspace_id=workspace_id)
def update_agent_config(db: Session, *, app_id: uuid.UUID, data: app_schema.AgentConfigUpdate, workspace_id: uuid.UUID | None = None) -> AgentConfig:
def update_agent_config(db: Session, *, app_id: uuid.UUID, data: app_schema.AgentConfigUpdate,
workspace_id: uuid.UUID | None = None) -> AgentConfig:
"""更新 Agent 配置(向后兼容接口)"""
service = AppService(db)
return service.update_agent_config(app_id=app_id, data=data, workspace_id=workspace_id)
def update_workflow_config(db: Session, *, app_id: uuid.UUID, data: WorkflowConfigUpdate, workspace_id: uuid.UUID | None = None) -> WorkflowConfig:
def update_workflow_config(db: Session, *, app_id: uuid.UUID, data: WorkflowConfigUpdate,
workspace_id: uuid.UUID | None = None) -> WorkflowConfig:
"""更新 Agent 配置(向后兼容接口)"""
service = AppService(db)
return service.update_workflow_config(app_id=app_id, data=data, workspace_id=workspace_id)
@@ -2040,6 +2045,7 @@ def get_agent_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID
service = AppService(db)
return service.get_agent_config(app_id=app_id, workspace_id=workspace_id)
def get_workflow_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None = None) -> WorkflowConfig:
"""获取 Agent 配置(向后兼容接口)
@@ -2049,13 +2055,16 @@ def get_workflow_config(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UU
return service.get_workflow_config(app_id=app_id, workspace_id=workspace_id)
def publish(db: Session, *, app_id: uuid.UUID, publisher_id: uuid.UUID, workspace_id: uuid.UUID | None = None,version_name:str, release_notes: Optional[str] = None) -> AppRelease:
def publish(db: Session, *, app_id: uuid.UUID, publisher_id: uuid.UUID, workspace_id: uuid.UUID | None = None,
version_name: str, release_notes: Optional[str] = None) -> AppRelease:
"""发布应用(向后兼容接口)"""
service = AppService(db)
return service.publish(app_id=app_id, publisher_id=publisher_id,version_name = version_name, workspace_id=workspace_id, release_notes=release_notes)
return service.publish(app_id=app_id, publisher_id=publisher_id, version_name=version_name,
workspace_id=workspace_id, release_notes=release_notes)
def get_current_release(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None = None) -> Optional[AppRelease]:
def get_current_release(db: Session, *, app_id: uuid.UUID, workspace_id: uuid.UUID | None = None) -> Optional[
AppRelease]:
"""获取当前发布版本(向后兼容接口)"""
service = AppService(db)
return service.get_current_release(app_id=app_id, workspace_id=workspace_id)
@@ -2156,8 +2165,6 @@ async def draft_run_stream(
yield event
# ==================== 依赖注入函数 ====================
def get_app_service(

View File

@@ -4,7 +4,7 @@
import datetime
import logging
import uuid
from typing import Any, Annotated, AsyncGenerator
from typing import Any, Annotated, AsyncGenerator, Optional
from deprecated import deprecated
from fastapi import Depends
@@ -266,6 +266,7 @@ class WorkflowService:
workflow_config_id: uuid.UUID,
app_id: uuid.UUID,
trigger_type: str,
release_id: uuid.UUID | None = None,
triggered_by: uuid.UUID | None = None,
conversation_id: uuid.UUID | None = None,
input_data: dict[str, Any] | None = None
@@ -273,6 +274,7 @@ class WorkflowService:
"""创建工作流执行记录
Args:
release_id: 应用发布 ID
workflow_config_id: 工作流配置 ID
app_id: 应用 ID
trigger_type: 触发类型
@@ -289,6 +291,7 @@ class WorkflowService:
execution = WorkflowExecution(
workflow_config_id=workflow_config_id,
app_id=app_id,
release_id=release_id,
conversation_id=conversation_id,
execution_id=execution_id,
trigger_type=trigger_type,
@@ -414,12 +417,14 @@ class WorkflowService:
payload: DraftRunRequest,
config: WorkflowConfig,
workspace_id: uuid.UUID,
release_id: uuid.UUID | None = None,
):
"""运行工作流
Args:
workspace_id:
config:
release_id: 发布 ID
workspace_id:工作空间 ID
config: 配置
payload:
app_id: 应用 ID
@@ -463,7 +468,8 @@ class WorkflowService:
trigger_type="manual",
triggered_by=None,
conversation_id=conversation_id_uuid,
input_data=input_data
input_data=input_data,
release_id=release_id,
)
# 3. 构建工作流配置字典
@@ -562,10 +568,12 @@ class WorkflowService:
payload: DraftRunRequest,
config: WorkflowConfig,
workspace_id: uuid.UUID,
release_id: Optional[uuid.UUID] = None,
):
"""运行工作流(流式)
Args:
release_id: 发布id
workspace_id:
app_id: 应用 ID
payload: 请求对象(包含 message, variables, conversation_id 等)
@@ -611,7 +619,8 @@ class WorkflowService:
trigger_type="manual",
triggered_by=None,
conversation_id=conversation_id_uuid,
input_data=input_data
input_data=input_data,
release_id=release_id,
)
# 3. 构建工作流配置字典

View File

@@ -120,12 +120,9 @@ def multi_agent_config_4_app_release(release: AppRelease) -> MultiAgentConfig:
def workflow_config_4_app_release(release: AppRelease) -> WorkflowConfig:
config_dict = release.config
with get_db_read() as db:
source_config = WorkflowConfigRepository(db).get_by_app_id(release.app_id)
source_config_id = source_config.id
config = WorkflowConfig(
id=source_config_id,
id=config_dict.get("id"),
app_id=release.app_id,
nodes=config_dict.get("nodes", []),
edges=config_dict.get("edges", []),