[add] publish and share run add workflow type app
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from fastapi import APIRouter, Depends, Query, Request
|
from fastapi import APIRouter, Depends, Query, Request
|
||||||
@@ -18,7 +19,7 @@ from app.services.conversation_service import ConversationService
|
|||||||
from app.services.release_share_service import ReleaseShareService
|
from app.services.release_share_service import ReleaseShareService
|
||||||
from app.services.shared_chat_service import SharedChatService
|
from app.services.shared_chat_service import SharedChatService
|
||||||
from app.services.app_chat_service import AppChatService, get_app_chat_service
|
from app.services.app_chat_service import AppChatService, get_app_chat_service
|
||||||
from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release
|
from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release
|
||||||
|
|
||||||
router = APIRouter(prefix="/public/share", tags=["Public Share"])
|
router = APIRouter(prefix="/public/share", tags=["Public Share"])
|
||||||
logger = get_business_logger()
|
logger = get_business_logger()
|
||||||
@@ -288,7 +289,7 @@ async def chat(
|
|||||||
password = None # Token 认证不需要密码
|
password = None # Token 认证不需要密码
|
||||||
# end_user_id = user_id
|
# end_user_id = user_id
|
||||||
other_id = user_id
|
other_id = user_id
|
||||||
|
|
||||||
# 提前验证和准备(在流式响应开始前完成)
|
# 提前验证和准备(在流式响应开始前完成)
|
||||||
# 这样可以确保错误能正确返回,而不是在流式响应中间出错
|
# 这样可以确保错误能正确返回,而不是在流式响应中间出错
|
||||||
from app.models.app_model import AppType
|
from app.models.app_model import AppType
|
||||||
@@ -364,6 +365,9 @@ async def chat(
|
|||||||
config = release.config or {}
|
config = release.config or {}
|
||||||
if not config.get("sub_agents"):
|
if not config.get("sub_agents"):
|
||||||
raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING)
|
raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING)
|
||||||
|
elif app_type == AppType.WORKFLOW:
|
||||||
|
# Multi-Agent 类型:验证多 Agent 配置
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
|
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
|
||||||
|
|
||||||
@@ -469,6 +473,7 @@ async def chat(
|
|||||||
)
|
)
|
||||||
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
|
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
|
||||||
elif app_type == AppType.MULTI_AGENT:
|
elif app_type == AppType.MULTI_AGENT:
|
||||||
|
# config = workflow_config_4_app_release(release)
|
||||||
config = multi_agent_config_4_app_release(release)
|
config = multi_agent_config_4_app_release(release)
|
||||||
if payload.stream:
|
if payload.stream:
|
||||||
async def event_generator():
|
async def event_generator():
|
||||||
@@ -553,8 +558,71 @@ async def chat(
|
|||||||
# )
|
# )
|
||||||
|
|
||||||
# return success(data=conversation_schema.ChatResponse(**result))
|
# return success(data=conversation_schema.ChatResponse(**result))
|
||||||
|
elif app_type == AppType.WORKFLOW:
|
||||||
|
|
||||||
|
config = workflow_config_4_app_release(release)
|
||||||
|
if payload.stream:
|
||||||
|
async def event_generator():
|
||||||
|
async for event in app_chat_service.workflow_chat_stream(
|
||||||
|
|
||||||
|
message=payload.message,
|
||||||
|
conversation_id=conversation.id, # 使用已创建的会话 ID
|
||||||
|
user_id=new_end_user.id, # 转换为字符串
|
||||||
|
variables=payload.variables,
|
||||||
|
config=config,
|
||||||
|
web_search=payload.web_search,
|
||||||
|
memory=payload.memory,
|
||||||
|
storage_type=storage_type,
|
||||||
|
user_rag_memory_id=user_rag_memory_id,
|
||||||
|
app_id=release.app_id,
|
||||||
|
workspace_id=workspace_id
|
||||||
|
):
|
||||||
|
event_type = event.get("event", "message")
|
||||||
|
event_data = event.get("data", {})
|
||||||
|
|
||||||
|
# 转换为标准 SSE 格式(字符串)
|
||||||
|
sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n"
|
||||||
|
yield sse_message
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# 多 Agent 非流式返回
|
||||||
|
result = await app_chat_service.workflow_chat(
|
||||||
|
|
||||||
|
message=payload.message,
|
||||||
|
conversation_id=conversation.id, # 使用已创建的会话 ID
|
||||||
|
user_id=new_end_user.id, # 转换为字符串
|
||||||
|
variables=payload.variables,
|
||||||
|
config=config,
|
||||||
|
web_search=payload.web_search,
|
||||||
|
memory=payload.memory,
|
||||||
|
storage_type=storage_type,
|
||||||
|
user_rag_memory_id=user_rag_memory_id,
|
||||||
|
app_id=release.app_id,
|
||||||
|
workspace_id=workspace_id
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
"工作流试运行返回结果",
|
||||||
|
extra={
|
||||||
|
"result_type": str(type(result)),
|
||||||
|
"has_response": "response" in result if isinstance(result, dict) else False
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return success(
|
||||||
|
data=result,
|
||||||
|
msg="工作流任务执行成功"
|
||||||
|
)
|
||||||
|
# return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
from app.core.exceptions import BusinessException
|
from app.core.exceptions import BusinessException
|
||||||
from app.core.error_codes import BizCode
|
from app.core.error_codes import BizCode
|
||||||
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
|
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
|
||||||
pass
|
|
||||||
|
|||||||
@@ -9,15 +9,18 @@ from fastapi import Depends
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app.core.agent.langchain_agent import LangChainAgent
|
from app.core.agent.langchain_agent import LangChainAgent
|
||||||
|
from app.core.error_codes import BizCode
|
||||||
|
from app.core.exceptions import BusinessException
|
||||||
from app.core.logging_config import get_business_logger
|
from app.core.logging_config import get_business_logger
|
||||||
from app.db import get_db
|
from app.db import get_db, get_db_context
|
||||||
from app.models import MultiAgentConfig, AgentConfig
|
from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig
|
||||||
from app.schemas.prompt_schema import render_prompt_message, PromptMessageRole
|
from app.schemas.prompt_schema import render_prompt_message, PromptMessageRole
|
||||||
from app.services.conversation_service import ConversationService
|
from app.services.conversation_service import ConversationService
|
||||||
from app.services.draft_run_service import create_knowledge_retrieval_tool, create_long_term_memory_tool
|
from app.services.draft_run_service import create_knowledge_retrieval_tool, create_long_term_memory_tool
|
||||||
from app.services.draft_run_service import create_web_search_tool
|
from app.services.draft_run_service import create_web_search_tool
|
||||||
from app.services.model_service import ModelApiKeyService
|
from app.services.model_service import ModelApiKeyService
|
||||||
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
|
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
|
||||||
|
from app.services.workflow_service import WorkflowService
|
||||||
|
|
||||||
logger = get_business_logger()
|
logger = get_business_logger()
|
||||||
|
|
||||||
@@ -479,7 +482,9 @@ class AppChatService:
|
|||||||
self,
|
self,
|
||||||
message: str,
|
message: str,
|
||||||
conversation_id: uuid.UUID,
|
conversation_id: uuid.UUID,
|
||||||
config: AgentConfig,
|
config: WorkflowConfig,
|
||||||
|
app_id: uuid.UUID,
|
||||||
|
workspace_id: uuid.UUID,
|
||||||
user_id: Optional[str] = None,
|
user_id: Optional[str] = None,
|
||||||
variables: Optional[Dict[str, Any]] = None,
|
variables: Optional[Dict[str, Any]] = None,
|
||||||
web_search: bool = False,
|
web_search: bool = False,
|
||||||
@@ -488,281 +493,158 @@ class AppChatService:
|
|||||||
user_rag_memory_id: Optional[str] = None,
|
user_rag_memory_id: Optional[str] = None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""聊天(非流式)"""
|
"""聊天(非流式)"""
|
||||||
|
workflow_service = WorkflowService(self.db)
|
||||||
|
|
||||||
start_time = time.time()
|
input_data = {"message":message, "variables": variables,
|
||||||
config_id = None
|
"conversation_id": str(conversation_id)}
|
||||||
|
inconfig = workflow_service.get_workflow_config(app_id)
|
||||||
|
|
||||||
if variables is None:
|
# 2. 创建执行记录
|
||||||
variables = {}
|
execution = workflow_service.create_execution(
|
||||||
|
workflow_config_id=inconfig.id,
|
||||||
|
app_id=app_id,
|
||||||
|
trigger_type="manual",
|
||||||
|
triggered_by=None,
|
||||||
|
conversation_id=conversation_id,
|
||||||
|
input_data=input_data
|
||||||
|
)
|
||||||
|
|
||||||
# 获取模型配置ID
|
# 3. 构建工作流配置字典
|
||||||
model_config_id = config.default_model_config_id
|
workflow_config_dict = {
|
||||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id)
|
"nodes": config.nodes,
|
||||||
# 处理系统提示词(支持变量替换)
|
"edges": config.edges,
|
||||||
system_prompt = config.get("system_prompt", "")
|
"variables": config.variables,
|
||||||
if variables:
|
"execution_config": config.execution_config
|
||||||
system_prompt_rendered = render_prompt_message(
|
}
|
||||||
system_prompt,
|
|
||||||
PromptMessageRole.USER,
|
# 4. 获取工作空间 ID(从 app 获取)
|
||||||
variables
|
|
||||||
|
# 5. 执行工作流
|
||||||
|
from app.core.workflow.executor import execute_workflow
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 更新状态为运行中
|
||||||
|
workflow_service.update_execution_status(execution.execution_id, "running")
|
||||||
|
|
||||||
|
result = await execute_workflow(
|
||||||
|
workflow_config=workflow_config_dict,
|
||||||
|
input_data=input_data,
|
||||||
|
execution_id=execution.execution_id,
|
||||||
|
workspace_id=str(workspace_id),
|
||||||
|
user_id=user_id
|
||||||
)
|
)
|
||||||
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
|
|
||||||
|
|
||||||
# 准备工具列表
|
# 更新执行结果
|
||||||
tools = []
|
if result.get("status") == "completed":
|
||||||
|
workflow_service.update_execution_status(
|
||||||
# 添加知识库检索工具
|
execution.execution_id,
|
||||||
knowledge_retrieval = config.get("knowledge_retrieval")
|
"completed",
|
||||||
if knowledge_retrieval:
|
output_data=result.get("node_outputs", {})
|
||||||
knowledge_bases = knowledge_retrieval.get("knowledge_bases", [])
|
)
|
||||||
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")]
|
else:
|
||||||
if kb_ids:
|
workflow_service.update_execution_status(
|
||||||
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id)
|
execution.execution_id,
|
||||||
tools.append(kb_tool)
|
"failed",
|
||||||
|
error_message=result.get("error")
|
||||||
# 添加长期记忆工具
|
|
||||||
memory_flag = False
|
|
||||||
if memory == True:
|
|
||||||
memory_config = config.get("memory", {})
|
|
||||||
if memory_config.get("enabled") and user_id:
|
|
||||||
memory_flag = True
|
|
||||||
memory_tool = create_long_term_memory_tool(memory_config, user_id)
|
|
||||||
tools.append(memory_tool)
|
|
||||||
|
|
||||||
web_tools = config.get("tools")
|
|
||||||
web_search_choice = web_tools.get("web_search", {})
|
|
||||||
web_search_enable = web_search_choice.get("enabled", False)
|
|
||||||
if web_search == True:
|
|
||||||
if web_search_enable == True:
|
|
||||||
search_tool = create_web_search_tool({})
|
|
||||||
tools.append(search_tool)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"已添加网络搜索工具",
|
|
||||||
extra={
|
|
||||||
"tool_count": len(tools)
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# 获取模型参数
|
# 返回增强的响应结构
|
||||||
model_parameters = config.get("model_parameters", {})
|
return {
|
||||||
|
"execution_id": execution.execution_id,
|
||||||
|
"status": result.get("status"),
|
||||||
|
"output": result.get("output"), # 最终输出(字符串)
|
||||||
|
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||||
|
"conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID
|
||||||
|
"error_message": result.get("error"),
|
||||||
|
"elapsed_time": result.get("elapsed_time"),
|
||||||
|
"token_usage": result.get("token_usage")
|
||||||
|
}
|
||||||
|
|
||||||
# 创建 LangChain Agent
|
except Exception as e:
|
||||||
agent = LangChainAgent(
|
logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
|
||||||
model_name=api_key_obj.model_name,
|
workflow_service.update_execution_status(
|
||||||
api_key=api_key_obj.api_key,
|
execution.execution_id,
|
||||||
provider=api_key_obj.provider,
|
"failed",
|
||||||
api_base=api_key_obj.api_base,
|
error_message=str(e)
|
||||||
temperature=model_parameters.get("temperature", 0.7),
|
)
|
||||||
max_tokens=model_parameters.get("max_tokens", 2000),
|
raise BusinessException(
|
||||||
system_prompt=system_prompt,
|
code=BizCode.INTERNAL_ERROR,
|
||||||
tools=tools,
|
message=f"工作流执行失败: {str(e)}"
|
||||||
|
|
||||||
)
|
|
||||||
|
|
||||||
# 加载历史消息
|
|
||||||
history = []
|
|
||||||
memory_config = {"enabled": True, 'max_history': 10}
|
|
||||||
if memory_config.get("enabled"):
|
|
||||||
messages = self.conversation_service.get_messages(
|
|
||||||
conversation_id=conversation_id,
|
|
||||||
limit=memory_config.get("max_history", 10)
|
|
||||||
)
|
)
|
||||||
history = [
|
|
||||||
{"role": msg.role, "content": msg.content}
|
|
||||||
for msg in messages
|
|
||||||
]
|
|
||||||
|
|
||||||
# 调用 Agent
|
|
||||||
result = await agent.chat(
|
|
||||||
message=message,
|
|
||||||
history=history,
|
|
||||||
context=None,
|
|
||||||
end_user_id=user_id,
|
|
||||||
storage_type=storage_type,
|
|
||||||
user_rag_memory_id=user_rag_memory_id,
|
|
||||||
config_id=config_id,
|
|
||||||
memory_flag=memory_flag
|
|
||||||
)
|
|
||||||
|
|
||||||
# 保存消息
|
|
||||||
self.conversation_service.save_conversation_messages(
|
|
||||||
conversation_id=conversation_id,
|
|
||||||
user_message=message,
|
|
||||||
assistant_message=result["content"]
|
|
||||||
)
|
|
||||||
|
|
||||||
elapsed_time = time.time() - start_time
|
|
||||||
|
|
||||||
return {
|
|
||||||
"conversation_id": conversation_id,
|
|
||||||
"message": result["content"],
|
|
||||||
"usage": result.get("usage", {
|
|
||||||
"prompt_tokens": 0,
|
|
||||||
"completion_tokens": 0,
|
|
||||||
"total_tokens": 0
|
|
||||||
}),
|
|
||||||
"elapsed_time": elapsed_time
|
|
||||||
}
|
|
||||||
|
|
||||||
async def workflow_chat_stream(
|
async def workflow_chat_stream(
|
||||||
self,
|
self,
|
||||||
message: str,
|
message: str,
|
||||||
conversation_id: uuid.UUID,
|
conversation_id: uuid.UUID,
|
||||||
config: AgentConfig,
|
config: WorkflowConfig,
|
||||||
|
app_id: uuid.UUID,
|
||||||
|
workspace_id: uuid.UUID,
|
||||||
user_id: Optional[str] = None,
|
user_id: Optional[str] = None,
|
||||||
variables: Optional[Dict[str, Any]] = None,
|
variables: Optional[Dict[str, Any]] = None,
|
||||||
web_search: bool = False,
|
web_search: bool = False,
|
||||||
memory: bool = True,
|
memory: bool = True,
|
||||||
storage_type: Optional[str] = None,
|
storage_type: Optional[str] = None,
|
||||||
user_rag_memory_id: Optional[str] = None,
|
user_rag_memory_id: Optional[str] = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[str, None]:
|
) -> AsyncGenerator[str, None]:
|
||||||
"""聊天(流式)"""
|
"""聊天(流式)"""
|
||||||
|
workflow_service = WorkflowService(self.db)
|
||||||
|
input_data = {"message": message, "variables": variables,
|
||||||
|
"conversation_id": str(conversation_id)}
|
||||||
|
inconfig = workflow_service.get_workflow_config(app_id)
|
||||||
|
# 2. 创建执行记录
|
||||||
|
execution = workflow_service.create_execution(
|
||||||
|
workflow_config_id=inconfig.id,
|
||||||
|
app_id=app_id,
|
||||||
|
trigger_type="manual",
|
||||||
|
triggered_by=None,
|
||||||
|
conversation_id=conversation_id,
|
||||||
|
input_data=input_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. 构建工作流配置字典
|
||||||
|
workflow_config_dict = {
|
||||||
|
"nodes": config.nodes,
|
||||||
|
"edges": config.edges,
|
||||||
|
"variables": config.variables,
|
||||||
|
"execution_config": config.execution_config
|
||||||
|
}
|
||||||
|
|
||||||
|
# 4. 获取工作空间 ID(从 app 获取)
|
||||||
|
|
||||||
|
# 5. 流式执行工作流
|
||||||
|
|
||||||
try:
|
try:
|
||||||
start_time = time.time()
|
# 更新状态为运行中
|
||||||
config_id = None
|
workflow_service.update_execution_status(execution.execution_id, "running")
|
||||||
|
|
||||||
if variables is None:
|
|
||||||
variables = {}
|
|
||||||
|
|
||||||
# 获取模型配置ID
|
# 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件)
|
||||||
model_config_id = config.default_model_config_id
|
async for event in workflow_service._run_workflow_stream(
|
||||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id)
|
workflow_config=workflow_config_dict,
|
||||||
# 处理系统提示词(支持变量替换)
|
input_data=input_data,
|
||||||
system_prompt = config.get("system_prompt", "")
|
execution_id=execution.execution_id,
|
||||||
if variables:
|
workspace_id=str(workspace_id),
|
||||||
system_prompt_rendered = render_prompt_message(
|
user_id=user_id
|
||||||
system_prompt,
|
|
||||||
PromptMessageRole.USER,
|
|
||||||
variables
|
|
||||||
)
|
|
||||||
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
|
|
||||||
|
|
||||||
# 准备工具列表
|
|
||||||
tools = []
|
|
||||||
|
|
||||||
# 添加知识库检索工具
|
|
||||||
knowledge_retrieval = config.get("knowledge_retrieval")
|
|
||||||
if knowledge_retrieval:
|
|
||||||
knowledge_bases = knowledge_retrieval.get("knowledge_bases", [])
|
|
||||||
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")]
|
|
||||||
if kb_ids:
|
|
||||||
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id)
|
|
||||||
tools.append(kb_tool)
|
|
||||||
|
|
||||||
# 添加长期记忆工具
|
|
||||||
memory_flag = False
|
|
||||||
if memory:
|
|
||||||
memory_config = config.get("memory", {})
|
|
||||||
if memory_config.get("enabled") and user_id:
|
|
||||||
memory_flag = True
|
|
||||||
memory_tool = create_long_term_memory_tool(memory_config, user_id)
|
|
||||||
tools.append(memory_tool)
|
|
||||||
|
|
||||||
web_tools = config.get("tools")
|
|
||||||
web_search_choice = web_tools.get("web_search", {})
|
|
||||||
web_search_enable = web_search_choice.get("enabled", False)
|
|
||||||
if web_search == True:
|
|
||||||
if web_search_enable == True:
|
|
||||||
search_tool = create_web_search_tool({})
|
|
||||||
tools.append(search_tool)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"已添加网络搜索工具",
|
|
||||||
extra={
|
|
||||||
"tool_count": len(tools)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# 获取模型参数
|
|
||||||
model_parameters = config.get("model_parameters", {})
|
|
||||||
|
|
||||||
# 创建 LangChain Agent
|
|
||||||
agent = LangChainAgent(
|
|
||||||
model_name=api_key_obj.model_name,
|
|
||||||
api_key=api_key_obj.api_key,
|
|
||||||
provider=api_key_obj.provider,
|
|
||||||
api_base=api_key_obj.api_base,
|
|
||||||
temperature=model_parameters.get("temperature", 0.7),
|
|
||||||
max_tokens=model_parameters.get("max_tokens", 2000),
|
|
||||||
system_prompt=system_prompt,
|
|
||||||
tools=tools,
|
|
||||||
streaming=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# 加载历史消息
|
|
||||||
history = []
|
|
||||||
memory_config = {"enabled": True, 'max_history': 10}
|
|
||||||
if memory_config.get("enabled"):
|
|
||||||
messages = self.conversation_service.get_messages(
|
|
||||||
conversation_id=conversation_id,
|
|
||||||
limit=memory_config.get("max_history", 10)
|
|
||||||
)
|
|
||||||
history = [
|
|
||||||
{"role": msg.role, "content": msg.content}
|
|
||||||
for msg in messages
|
|
||||||
]
|
|
||||||
|
|
||||||
# 发送开始事件
|
|
||||||
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n"
|
|
||||||
|
|
||||||
# 流式调用 Agent
|
|
||||||
full_content = ""
|
|
||||||
async for chunk in agent.chat_stream(
|
|
||||||
message=message,
|
|
||||||
history=history,
|
|
||||||
context=None,
|
|
||||||
end_user_id=user_id,
|
|
||||||
storage_type=storage_type,
|
|
||||||
user_rag_memory_id=user_rag_memory_id,
|
|
||||||
config_id=config_id,
|
|
||||||
memory_flag=memory_flag
|
|
||||||
):
|
):
|
||||||
full_content += chunk
|
# 直接转发 executor 的事件(已经是正确的格式)
|
||||||
# 发送消息块事件
|
yield event
|
||||||
yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
|
|
||||||
|
|
||||||
elapsed_time = time.time() - start_time
|
|
||||||
|
|
||||||
# 保存消息
|
|
||||||
self.conversation_service.add_message(
|
|
||||||
conversation_id=conversation_id,
|
|
||||||
role="user",
|
|
||||||
content=message
|
|
||||||
)
|
|
||||||
|
|
||||||
self.conversation_service.add_message(
|
|
||||||
conversation_id=conversation_id,
|
|
||||||
role="assistant",
|
|
||||||
content=full_content,
|
|
||||||
meta_data={
|
|
||||||
"model": api_key_obj.model_name,
|
|
||||||
"usage": {}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# 发送结束事件
|
|
||||||
end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)}
|
|
||||||
yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n"
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"流式聊天完成",
|
|
||||||
extra={
|
|
||||||
"conversation_id": str(conversation_id),
|
|
||||||
"elapsed_time": elapsed_time,
|
|
||||||
"message_length": len(full_content)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
except (GeneratorExit, asyncio.CancelledError):
|
|
||||||
# 生成器被关闭或任务被取消,正常退出
|
|
||||||
logger.debug("流式聊天被中断")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"流式聊天失败: {str(e)}", exc_info=True)
|
logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
|
||||||
|
workflow_service.update_execution_status(
|
||||||
|
execution.execution_id,
|
||||||
|
"failed",
|
||||||
|
error_message=str(e)
|
||||||
|
)
|
||||||
# 发送错误事件
|
# 发送错误事件
|
||||||
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
|
yield {
|
||||||
|
"event": "error",
|
||||||
|
"data": {
|
||||||
|
"execution_id": execution.execution_id,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
# ==================== 依赖注入函数 ====================
|
# ==================== 依赖注入函数 ====================
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from app.core.exceptions import (
|
|||||||
BusinessException,
|
BusinessException,
|
||||||
)
|
)
|
||||||
from app.core.logging_config import get_business_logger
|
from app.core.logging_config import get_business_logger
|
||||||
|
from app.core.workflow.validator import WorkflowValidator
|
||||||
from app.db import get_db
|
from app.db import get_db
|
||||||
from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig
|
from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig
|
||||||
from app.models.app_model import AppStatus, AppType
|
from app.models.app_model import AppStatus, AppType
|
||||||
@@ -31,6 +32,7 @@ from app.schemas.workflow_schema import WorkflowConfigUpdate
|
|||||||
from app.services.agent_config_converter import AgentConfigConverter
|
from app.services.agent_config_converter import AgentConfigConverter
|
||||||
from app.models import AppShare, Workspace
|
from app.models import AppShare, Workspace
|
||||||
from app.services.model_service import ModelApiKeyService
|
from app.services.model_service import ModelApiKeyService
|
||||||
|
from app.services.workflow_service import WorkflowService
|
||||||
|
|
||||||
# 获取业务日志器
|
# 获取业务日志器
|
||||||
logger = get_business_logger()
|
logger = get_business_logger()
|
||||||
@@ -1225,6 +1227,26 @@ class AppService:
|
|||||||
"orchestration_mode": multi_agent_cfg.orchestration_mode
|
"orchestration_mode": multi_agent_cfg.orchestration_mode
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
elif app.type == AppType.WORKFLOW:
|
||||||
|
service = WorkflowService(self.db)
|
||||||
|
workflow_cfg = service.get_workflow_config(app_id)
|
||||||
|
if not workflow_cfg:
|
||||||
|
raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING)
|
||||||
|
|
||||||
|
config = {
|
||||||
|
"nodes": workflow_cfg.nodes,
|
||||||
|
"edges": workflow_cfg.edges,
|
||||||
|
"variables": workflow_cfg.variables,
|
||||||
|
"execution_config": workflow_cfg.execution_config,
|
||||||
|
"triggers": workflow_cfg.triggers
|
||||||
|
}
|
||||||
|
|
||||||
|
is_valid, errors = WorkflowValidator.validate_for_publish(config)
|
||||||
|
if not is_valid:
|
||||||
|
raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING)
|
||||||
|
logger.info(
|
||||||
|
"应用发布配置准备完成"
|
||||||
|
)
|
||||||
|
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
version = self._get_next_version(app_id)
|
version = self._get_next_version(app_id)
|
||||||
|
|||||||
@@ -1293,6 +1293,7 @@ class MultiAgentOrchestrator:
|
|||||||
conversation_id: 会话 ID
|
conversation_id: 会话 ID
|
||||||
user_id: 用户 ID
|
user_id: 用户 ID
|
||||||
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
执行结果
|
执行结果
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from app.core.workflow.validator import validate_workflow_config
|
|||||||
from app.db import get_db, get_db_context
|
from app.db import get_db, get_db_context
|
||||||
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
|
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
|
||||||
from app.repositories.end_user_repository import EndUserRepository
|
from app.repositories.end_user_repository import EndUserRepository
|
||||||
|
from app.services.multi_agent_service import convert_uuids_to_str
|
||||||
from app.repositories.workflow_repository import (
|
from app.repositories.workflow_repository import (
|
||||||
WorkflowConfigRepository,
|
WorkflowConfigRepository,
|
||||||
WorkflowExecutionRepository,
|
WorkflowExecutionRepository,
|
||||||
@@ -364,7 +365,7 @@ class WorkflowService:
|
|||||||
|
|
||||||
execution.status = status
|
execution.status = status
|
||||||
if output_data is not None:
|
if output_data is not None:
|
||||||
execution.output_data = output_data
|
execution.output_data = convert_uuids_to_str(output_data)
|
||||||
if error_message is not None:
|
if error_message is not None:
|
||||||
execution.error_message = error_message
|
execution.error_message = error_message
|
||||||
if error_node_id is not None:
|
if error_node_id is not None:
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import uuid
|
|||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from app.models import AppRelease
|
from app.models import AppRelease, WorkflowConfig
|
||||||
from app.models.agent_app_config_model import AgentConfig
|
from app.models.agent_app_config_model import AgentConfig
|
||||||
from app.models.multi_agent_model import MultiAgentConfig
|
from app.models.multi_agent_model import MultiAgentConfig
|
||||||
|
|
||||||
@@ -28,7 +28,7 @@ class AgentConfigProxy:
|
|||||||
def agent_config_4_app_release(release: AppRelease ) -> AgentConfig:
|
def agent_config_4_app_release(release: AppRelease ) -> AgentConfig:
|
||||||
|
|
||||||
config_dict = release.config
|
config_dict = release.config
|
||||||
|
|
||||||
agent_config = AgentConfig(
|
agent_config = AgentConfig(
|
||||||
app_id=release.app_id,
|
app_id=release.app_id,
|
||||||
system_prompt=config_dict.get("system_prompt"),
|
system_prompt=config_dict.get("system_prompt"),
|
||||||
@@ -45,10 +45,10 @@ def agent_config_4_app_release(release: AppRelease ) -> AgentConfig:
|
|||||||
def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
|
def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
|
||||||
|
|
||||||
config_dict = release.config
|
config_dict = release.config
|
||||||
|
|
||||||
|
|
||||||
agent_config = MultiAgentConfig(
|
agent_config = MultiAgentConfig(
|
||||||
app_id=release.app_id,
|
app_id=release.app_id,
|
||||||
default_model_config_id=release.default_model_config_id,
|
default_model_config_id=release.default_model_config_id,
|
||||||
model_parameters=config_dict.get("model_parameters"),
|
model_parameters=config_dict.get("model_parameters"),
|
||||||
master_agent_id=config_dict.get("master_agent_id"),
|
master_agent_id=config_dict.get("master_agent_id"),
|
||||||
@@ -58,11 +58,29 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
|
|||||||
routing_rules=config_dict.get("routing_rules"),
|
routing_rules=config_dict.get("routing_rules"),
|
||||||
execution_config=config_dict.get("execution_config", {}),
|
execution_config=config_dict.get("execution_config", {}),
|
||||||
aggregation_strategy=config_dict.get("aggregation_strategy", "merge"),
|
aggregation_strategy=config_dict.get("aggregation_strategy", "merge"),
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return agent_config
|
return agent_config
|
||||||
|
|
||||||
|
def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig:
|
||||||
|
|
||||||
|
config_dict = release.config
|
||||||
|
|
||||||
|
|
||||||
|
config = WorkflowConfig(
|
||||||
|
id=release.id,
|
||||||
|
app_id=release.app_id,
|
||||||
|
nodes=config_dict.get("nodes", []),
|
||||||
|
edges=config_dict.get("edges", []),
|
||||||
|
variables=config_dict.get("variables", []),
|
||||||
|
execution_config=config_dict.get("execution_config", {}),
|
||||||
|
triggers=config_dict.get("triggers", [])
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
|
def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
|
||||||
"""Convert dict to MultiAgentConfig model object
|
"""Convert dict to MultiAgentConfig model object
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user