Merge pull request #113 from SuanmoSuanyangTechnology/feature/workflow-llm-memory
feat(workflow): add session context memory support to LLM nodes
This commit is contained in:
@@ -14,6 +14,7 @@ from app.core.exceptions import BusinessException
|
||||
from app.core.logging_config import get_business_logger
|
||||
from app.db import get_db, get_db_context
|
||||
from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig
|
||||
from app.schemas import DraftRunRequest
|
||||
from app.services.tool_service import ToolService
|
||||
from app.repositories.tool_repository import ToolRepository
|
||||
from app.db import get_db
|
||||
@@ -59,7 +60,7 @@ class AppChatService:
|
||||
|
||||
# 获取模型配置ID
|
||||
model_config_id = config.default_model_config_id
|
||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id)
|
||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db, model_config_id)
|
||||
# 处理系统提示词(支持变量替换)
|
||||
system_prompt = config.system_prompt
|
||||
if variables:
|
||||
@@ -210,7 +211,7 @@ class AppChatService:
|
||||
|
||||
# 获取模型配置ID
|
||||
model_config_id = config.default_model_config_id
|
||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id)
|
||||
api_key_obj = ModelApiKeyService.get_a_api_key(self.db, model_config_id)
|
||||
# 处理系统提示词(支持变量替换)
|
||||
system_prompt = config.system_prompt
|
||||
if variables:
|
||||
@@ -511,7 +512,6 @@ class AppChatService:
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
except (GeneratorExit, asyncio.CancelledError):
|
||||
# 生成器被关闭或任务被取消,正常退出
|
||||
logger.debug("多 Agent 流式聊天被中断")
|
||||
@@ -537,83 +537,19 @@ class AppChatService:
|
||||
) -> Dict[str, Any]:
|
||||
"""聊天(非流式)"""
|
||||
workflow_service = WorkflowService(self.db)
|
||||
|
||||
input_data = {"message":message, "variables": variables,
|
||||
"conversation_id": str(conversation_id)}
|
||||
inconfig = workflow_service.get_workflow_config(app_id)
|
||||
|
||||
# 2. 创建执行记录
|
||||
execution = workflow_service.create_execution(
|
||||
workflow_config_id=inconfig.id,
|
||||
app_id=app_id,
|
||||
trigger_type="manual",
|
||||
triggered_by=None,
|
||||
conversation_id=conversation_id,
|
||||
input_data=input_data
|
||||
payload = DraftRunRequest(
|
||||
message=message,
|
||||
variables=variables,
|
||||
conversation_id=str(conversation_id),
|
||||
stream=True,
|
||||
user_id=user_id
|
||||
)
|
||||
return await workflow_service.run(
|
||||
app_id=app_id,
|
||||
payload=payload,
|
||||
config=config,
|
||||
workspace_id=workspace_id,
|
||||
)
|
||||
|
||||
# 3. 构建工作流配置字典
|
||||
workflow_config_dict = {
|
||||
"nodes": config.nodes,
|
||||
"edges": config.edges,
|
||||
"variables": config.variables,
|
||||
"execution_config": config.execution_config
|
||||
}
|
||||
|
||||
# 4. 获取工作空间 ID(从 app 获取)
|
||||
|
||||
# 5. 执行工作流
|
||||
from app.core.workflow.executor import execute_workflow
|
||||
|
||||
try:
|
||||
# 更新状态为运行中
|
||||
workflow_service.update_execution_status(execution.execution_id, "running")
|
||||
|
||||
result = await execute_workflow(
|
||||
workflow_config=workflow_config_dict,
|
||||
input_data=input_data,
|
||||
execution_id=execution.execution_id,
|
||||
workspace_id=str(workspace_id),
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
# 更新执行结果
|
||||
if result.get("status") == "completed":
|
||||
workflow_service.update_execution_status(
|
||||
execution.execution_id,
|
||||
"completed",
|
||||
output_data=result.get("node_outputs", {})
|
||||
)
|
||||
else:
|
||||
workflow_service.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
error_message=result.get("error")
|
||||
)
|
||||
|
||||
# 返回增强的响应结构
|
||||
return {
|
||||
"execution_id": execution.execution_id,
|
||||
"status": result.get("status"),
|
||||
"output": result.get("output"), # 最终输出(字符串)
|
||||
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||
"conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID
|
||||
"error_message": result.get("error"),
|
||||
"elapsed_time": result.get("elapsed_time"),
|
||||
"token_usage": result.get("token_usage")
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
|
||||
workflow_service.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
error_message=str(e)
|
||||
)
|
||||
raise BusinessException(
|
||||
code=BizCode.INTERNAL_ERROR,
|
||||
message=f"工作流执行失败: {str(e)}"
|
||||
)
|
||||
|
||||
async def workflow_chat_stream(
|
||||
self,
|
||||
@@ -632,62 +568,21 @@ class AppChatService:
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""聊天(流式)"""
|
||||
workflow_service = WorkflowService(self.db)
|
||||
input_data = {"message": message, "variables": variables,
|
||||
"conversation_id": str(conversation_id)}
|
||||
inconfig = workflow_service.get_workflow_config(app_id)
|
||||
# 2. 创建执行记录
|
||||
execution = workflow_service.create_execution(
|
||||
workflow_config_id=inconfig.id,
|
||||
app_id=app_id,
|
||||
trigger_type="manual",
|
||||
triggered_by=None,
|
||||
conversation_id=conversation_id,
|
||||
input_data=input_data
|
||||
payload = DraftRunRequest(
|
||||
message=message,
|
||||
variables=variables,
|
||||
conversation_id=str(conversation_id),
|
||||
stream=True,
|
||||
user_id=user_id
|
||||
)
|
||||
async for event in workflow_service.run_stream(
|
||||
app_id=app_id,
|
||||
payload=payload,
|
||||
config=config,
|
||||
workspace_id=workspace_id,
|
||||
):
|
||||
yield event
|
||||
|
||||
# 3. 构建工作流配置字典
|
||||
workflow_config_dict = {
|
||||
"nodes": config.nodes,
|
||||
"edges": config.edges,
|
||||
"variables": config.variables,
|
||||
"execution_config": config.execution_config
|
||||
}
|
||||
|
||||
# 4. 获取工作空间 ID(从 app 获取)
|
||||
|
||||
# 5. 流式执行工作流
|
||||
|
||||
try:
|
||||
# 更新状态为运行中
|
||||
workflow_service.update_execution_status(execution.execution_id, "running")
|
||||
|
||||
|
||||
# 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件)
|
||||
async for event in workflow_service._run_workflow_stream(
|
||||
workflow_config=workflow_config_dict,
|
||||
input_data=input_data,
|
||||
execution_id=execution.execution_id,
|
||||
workspace_id=str(workspace_id),
|
||||
user_id=user_id
|
||||
):
|
||||
# 直接转发 executor 的事件(已经是正确的格式)
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
|
||||
workflow_service.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
error_message=str(e)
|
||||
)
|
||||
# 发送错误事件
|
||||
yield {
|
||||
"event": "error",
|
||||
"data": {
|
||||
"execution_id": execution.execution_id,
|
||||
"error": str(e)
|
||||
}
|
||||
}
|
||||
|
||||
# ==================== 依赖注入函数 ====================
|
||||
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
工作流服务层
|
||||
"""
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
import datetime
|
||||
from typing import Any, Annotated, AsyncGenerator
|
||||
|
||||
from deprecated import deprecated
|
||||
from fastapi import Depends
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -16,15 +15,16 @@ from app.core.exceptions import BusinessException
|
||||
from app.core.workflow.validator import validate_workflow_config
|
||||
from app.db import get_db, get_db_context
|
||||
from app.models.workflow_model import WorkflowConfig, WorkflowExecution
|
||||
from app.repositories.conversation_repository import MessageRepository
|
||||
from app.models.conversation_model import Message
|
||||
from app.repositories.end_user_repository import EndUserRepository
|
||||
from app.services.multi_agent_service import convert_uuids_to_str
|
||||
from app.repositories.workflow_repository import (
|
||||
WorkflowConfigRepository,
|
||||
WorkflowExecutionRepository,
|
||||
WorkflowNodeExecutionRepository
|
||||
)
|
||||
from app.schemas import DraftRunRequest
|
||||
from app.utils.sse_utils import format_sse_message
|
||||
from app.services.multi_agent_service import convert_uuids_to_str
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -37,6 +37,7 @@ class WorkflowService:
|
||||
self.config_repo = WorkflowConfigRepository(db)
|
||||
self.execution_repo = WorkflowExecutionRepository(db)
|
||||
self.node_execution_repo = WorkflowNodeExecutionRepository(db)
|
||||
self.message_repo = MessageRepository(db)
|
||||
|
||||
# ==================== 配置管理 ====================
|
||||
|
||||
@@ -418,14 +419,13 @@ class WorkflowService:
|
||||
"""运行工作流
|
||||
|
||||
Args:
|
||||
workspace_id:
|
||||
config:
|
||||
payload:
|
||||
app_id: 应用 ID
|
||||
input_data: 输入数据(包含 message 和 variables)
|
||||
triggered_by: 触发用户 ID
|
||||
conversation_id: 会话 ID(可选)
|
||||
stream: 是否流式返回
|
||||
|
||||
Returns:
|
||||
执行结果(非流式)或生成器(流式)
|
||||
执行结果(非流式)
|
||||
|
||||
Raises:
|
||||
BusinessException: 配置不存在或执行失败时抛出
|
||||
@@ -438,7 +438,8 @@ class WorkflowService:
|
||||
code=BizCode.CONFIG_MISSING,
|
||||
message=f"工作流配置不存在: app_id={app_id}"
|
||||
)
|
||||
input_data = {"message": payload.message, "variables": payload.variables, "conversation_id": payload.conversation_id}
|
||||
input_data = {"message": payload.message, "variables": payload.variables,
|
||||
"conversation_id": payload.conversation_id}
|
||||
|
||||
# 转换 user_id 为 UUID
|
||||
triggered_by_uuid = None
|
||||
@@ -461,7 +462,7 @@ class WorkflowService:
|
||||
workflow_config_id=config.id,
|
||||
app_id=app_id,
|
||||
trigger_type="manual",
|
||||
triggered_by=triggered_by_uuid,
|
||||
triggered_by=None,
|
||||
conversation_id=conversation_id_uuid,
|
||||
input_data=input_data
|
||||
)
|
||||
@@ -500,8 +501,11 @@ class WorkflowService:
|
||||
variables = last_state.get("variables", {})
|
||||
conv_vars = variables.get("conv", {})
|
||||
input_data["conv"] = conv_vars
|
||||
input_data["conv_messages"] = last_state.get("messages") or []
|
||||
break
|
||||
|
||||
init_message_length = len(input_data.get("conv_messages", []))
|
||||
|
||||
result = await execute_workflow(
|
||||
workflow_config=workflow_config_dict,
|
||||
input_data=input_data,
|
||||
@@ -517,6 +521,17 @@ class WorkflowService:
|
||||
"completed",
|
||||
output_data=result
|
||||
)
|
||||
final_messages = result.get("messages", [])[init_message_length:]
|
||||
for message in final_messages:
|
||||
message_obj = Message(
|
||||
conversation_id=conversation_id_uuid,
|
||||
role=message["role"],
|
||||
content=message["content"],
|
||||
)
|
||||
self.message_repo.add_message(message_obj)
|
||||
self.db.commit()
|
||||
logger.info(f"Workflow Run Success, "
|
||||
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
|
||||
else:
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
@@ -529,6 +544,7 @@ class WorkflowService:
|
||||
"execution_id": execution.execution_id,
|
||||
"status": result.get("status"),
|
||||
"variables": result.get("variables"),
|
||||
"messages": result.get("messages"),
|
||||
"output": result.get("output"), # 最终输出(字符串)
|
||||
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
|
||||
"conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID
|
||||
@@ -559,6 +575,7 @@ class WorkflowService:
|
||||
"""运行工作流(流式)
|
||||
|
||||
Args:
|
||||
workspace_id:
|
||||
app_id: 应用 ID
|
||||
payload: 请求对象(包含 message, variables, conversation_id 等)
|
||||
config: 存储类型(可选)
|
||||
@@ -601,7 +618,7 @@ class WorkflowService:
|
||||
workflow_config_id=config.id,
|
||||
app_id=app_id,
|
||||
trigger_type="manual",
|
||||
triggered_by=triggered_by_uuid,
|
||||
triggered_by=None,
|
||||
conversation_id=conversation_id_uuid,
|
||||
input_data=input_data
|
||||
)
|
||||
@@ -638,17 +655,46 @@ class WorkflowService:
|
||||
variables = last_state.get("variables", {})
|
||||
conv_vars = variables.get("conv", {})
|
||||
input_data["conv"] = conv_vars
|
||||
input_data["conv_messages"] = last_state.get("messages") or []
|
||||
break
|
||||
init_message_length = len(input_data.get("conv_messages", []))
|
||||
from app.core.workflow.executor import execute_workflow_stream
|
||||
|
||||
# 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件)
|
||||
async for event in self._run_workflow_stream(
|
||||
async for event in execute_workflow_stream(
|
||||
workflow_config=workflow_config_dict,
|
||||
input_data=input_data,
|
||||
execution_id=execution.execution_id,
|
||||
workspace_id=str(workspace_id),
|
||||
user_id=end_user_id
|
||||
):
|
||||
# 直接转发 executor 的事件(已经是正确的格式)
|
||||
if event.get("event") == "workflow_end":
|
||||
|
||||
status = event.get("data", {}).get("status")
|
||||
if status == "completed":
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
"completed",
|
||||
output_data=event.get("data")
|
||||
)
|
||||
final_messages = event.get("data", {}).get("messages", [])[init_message_length:]
|
||||
for message in final_messages:
|
||||
message_obj = Message(
|
||||
conversation_id=conversation_id_uuid,
|
||||
role=message["role"],
|
||||
content=message["content"],
|
||||
)
|
||||
self.message_repo.add_message(message_obj)
|
||||
self.db.commit()
|
||||
logger.info(f"Workflow Run Success, "
|
||||
f"execution_id: {execution.execution_id}, message count: {len(final_messages)}")
|
||||
elif status == "failed":
|
||||
self.update_execution_status(
|
||||
execution.execution_id,
|
||||
"failed",
|
||||
output_data=event.get("data")
|
||||
)
|
||||
else:
|
||||
logger.error(f"unexpect workflow run status, status: {status}")
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
@@ -667,6 +713,8 @@ class WorkflowService:
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated(reason="This method is deprecated. "
|
||||
"Please use WorkflowService.run / run_stream instead.")
|
||||
async def run_workflow(
|
||||
self,
|
||||
app_id: uuid.UUID,
|
||||
@@ -819,6 +867,7 @@ class WorkflowService:
|
||||
|
||||
return clean_value(event)
|
||||
|
||||
@deprecated(reason="This method is deprecated. Please use WorkflowService.run_stream instead.")
|
||||
async def _run_workflow_stream(
|
||||
self,
|
||||
workflow_config: dict[str, Any],
|
||||
|
||||
Reference in New Issue
Block a user