[add] app chat service

This commit is contained in:
Mark
2025-12-24 20:54:04 +08:00
parent bbd73d5e95
commit c21ae4c571
2 changed files with 590 additions and 2 deletions

View File

@@ -475,11 +475,298 @@ class AppChatService:
# 发送错误事件
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
async def workflow_chat(
self,
message: str,
conversation_id: uuid.UUID,
config: AgentConfig,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
web_search: bool = False,
memory: bool = True,
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]:
"""聊天(非流式)"""
start_time = time.time()
config_id = None
if variables is None:
variables = {}
# 获取模型配置ID
model_config_id = config.default_model_config_id
api_key_obj = ModelApiKeyService.get_a_api_key(model_config_id)
# 处理系统提示词(支持变量替换)
system_prompt = config.get("system_prompt", "")
if variables:
system_prompt_rendered = render_prompt_message(
system_prompt,
PromptMessageRole.USER,
variables
)
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
# 准备工具列表
tools = []
# 添加知识库检索工具
knowledge_retrieval = config.get("knowledge_retrieval")
if knowledge_retrieval:
knowledge_bases = knowledge_retrieval.get("knowledge_bases", [])
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")]
if kb_ids:
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id)
tools.append(kb_tool)
# 添加长期记忆工具
memory_flag = False
if memory == True:
memory_config = config.get("memory", {})
if memory_config.get("enabled") and user_id:
memory_flag = True
memory_tool = create_long_term_memory_tool(memory_config, user_id)
tools.append(memory_tool)
web_tools = config.get("tools")
web_search_choice = web_tools.get("web_search", {})
web_search_enable = web_search_choice.get("enabled", False)
if web_search == True:
if web_search_enable == True:
search_tool = create_web_search_tool({})
tools.append(search_tool)
logger.debug(
"已添加网络搜索工具",
extra={
"tool_count": len(tools)
}
)
# 获取模型参数
model_parameters = config.get("model_parameters", {})
# 创建 LangChain Agent
agent = LangChainAgent(
model_name=api_key_obj.model_name,
api_key=api_key_obj.api_key,
provider=api_key_obj.provider,
api_base=api_key_obj.api_base,
temperature=model_parameters.get("temperature", 0.7),
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
)
# 加载历史消息
history = []
memory_config = {"enabled": True, 'max_history': 10}
if memory_config.get("enabled"):
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=memory_config.get("max_history", 10)
)
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# 调用 Agent
result = await agent.chat(
message=message,
history=history,
context=None,
end_user_id=user_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
)
# 保存消息
self.conversation_service.save_conversation_messages(
conversation_id=conversation_id,
user_message=message,
assistant_message=result["content"]
)
elapsed_time = time.time() - start_time
return {
"conversation_id": conversation_id,
"message": result["content"],
"usage": result.get("usage", {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}),
"elapsed_time": elapsed_time
}
async def workflow_chat_stream(
self,
message: str,
conversation_id: uuid.UUID,
config: AgentConfig,
user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None,
web_search: bool = False,
memory: bool = True,
storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None,
) -> AsyncGenerator[str, None]:
"""聊天(流式)"""
try:
start_time = time.time()
config_id = None
if variables is None:
variables = {}
# 获取模型配置ID
model_config_id = config.default_model_config_id
api_key_obj = ModelApiKeyService.get_a_api_key(model_config_id)
# 处理系统提示词(支持变量替换)
system_prompt = config.get("system_prompt", "")
if variables:
system_prompt_rendered = render_prompt_message(
system_prompt,
PromptMessageRole.USER,
variables
)
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
# 准备工具列表
tools = []
# 添加知识库检索工具
knowledge_retrieval = config.get("knowledge_retrieval")
if knowledge_retrieval:
knowledge_bases = knowledge_retrieval.get("knowledge_bases", [])
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")]
if kb_ids:
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id)
tools.append(kb_tool)
# 添加长期记忆工具
memory_flag = False
if memory:
memory_config = config.get("memory", {})
if memory_config.get("enabled") and user_id:
memory_flag = True
memory_tool = create_long_term_memory_tool(memory_config, user_id)
tools.append(memory_tool)
web_tools = config.get("tools")
web_search_choice = web_tools.get("web_search", {})
web_search_enable = web_search_choice.get("enabled", False)
if web_search == True:
if web_search_enable == True:
search_tool = create_web_search_tool({})
tools.append(search_tool)
logger.debug(
"已添加网络搜索工具",
extra={
"tool_count": len(tools)
}
)
# 获取模型参数
model_parameters = config.get("model_parameters", {})
# 创建 LangChain Agent
agent = LangChainAgent(
model_name=api_key_obj.model_name,
api_key=api_key_obj.api_key,
provider=api_key_obj.provider,
api_base=api_key_obj.api_base,
temperature=model_parameters.get("temperature", 0.7),
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
streaming=True
)
# 加载历史消息
history = []
memory_config = {"enabled": True, 'max_history': 10}
if memory_config.get("enabled"):
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=memory_config.get("max_history", 10)
)
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n"
# 流式调用 Agent
full_content = ""
async for chunk in agent.chat_stream(
message=message,
history=history,
context=None,
end_user_id=user_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
):
full_content += chunk
# 发送消息块事件
yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
elapsed_time = time.time() - start_time
# 保存消息
self.conversation_service.add_message(
conversation_id=conversation_id,
role="user",
content=message
)
self.conversation_service.add_message(
conversation_id=conversation_id,
role="assistant",
content=full_content,
meta_data={
"model": api_key_obj.model_name,
"usage": {}
}
)
# 发送结束事件
end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)}
yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n"
logger.info(
"流式聊天完成",
extra={
"conversation_id": str(conversation_id),
"elapsed_time": elapsed_time,
"message_length": len(full_content)
}
)
except (GeneratorExit, asyncio.CancelledError):
# 生成器被关闭或任务被取消,正常退出
logger.debug("流式聊天被中断")
raise
except Exception as e:
logger.error(f"流式聊天失败: {str(e)}", exc_info=True)
# 发送错误事件
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
# ==================== 依赖注入函数 ====================
def get_app_chat_service(
db: Annotated[Session, Depends(get_db)]
) -> ChatService:
) -> AppChatService:
"""获取工作流服务(依赖注入)"""
return ChatService(db)
return AppChatService(db)

View File

@@ -0,0 +1,301 @@
"""
App Config Utilities
Utility functions for converting between dict and model objects for different app configurations.
"""
import uuid
from typing import Dict, Any, Optional
from datetime import datetime
class AgentConfigProxy:
"""Proxy class for AgentConfig (legacy compatibility)"""
def __init__(self, release, app, config_data):
self.id = release.id
self.app_id = release.app_id
self.app = app
self.name = release.name
self.description = release.description
self.system_prompt = config_data.get("system_prompt")
self.default_model_config_id = release.default_model_config_id
def dict_to_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
"""Convert dict to AgentConfig model object
Args:
config_dict: Configuration dictionary
app_id: Optional app ID (if not provided in dict)
Returns:
AgentConfig model instance (not yet persisted to database)
Example:
>>> config_dict = {
... "app_id": "uuid-here",
... "system_prompt": "You are a helpful assistant",
... "default_model_config_id": "model-uuid",
... "model_parameters": {"temperature": 0.7, "max_tokens": 2000},
... "knowledge_retrieval": {"enabled": True, "top_k": 5},
... "memory": {"enabled": True, "window_size": 10},
... "variables": [{"name": "user_name", "type": "string"}],
... "tools": {"enabled_tools": ["web_search", "calculator"]},
... "agent_role": "standalone",
... "agent_domain": "customer_service",
... "capabilities": ["chat", "search"]
... }
>>> agent_config = dict_to_agent_config(config_dict)
"""
from app.models.agent_app_config_model import AgentConfig
# Extract app_id
final_app_id = config_dict.get("app_id") or app_id
if not final_app_id:
raise ValueError("app_id is required")
# Convert string UUID to UUID object if needed
if isinstance(final_app_id, str):
final_app_id = uuid.UUID(final_app_id)
# Convert default_model_config_id if present
default_model_config_id = config_dict.get("default_model_config_id")
if default_model_config_id and isinstance(default_model_config_id, str):
default_model_config_id = uuid.UUID(default_model_config_id)
# Convert parent_agent_id if present
parent_agent_id = config_dict.get("parent_agent_id")
if parent_agent_id and isinstance(parent_agent_id, str):
parent_agent_id = uuid.UUID(parent_agent_id)
# Create AgentConfig instance
agent_config = AgentConfig(
id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(),
app_id=final_app_id,
system_prompt=config_dict.get("system_prompt"),
default_model_config_id=default_model_config_id,
model_parameters=config_dict.get("model_parameters"),
knowledge_retrieval=config_dict.get("knowledge_retrieval"),
memory=config_dict.get("memory"),
variables=config_dict.get("variables", []),
tools=config_dict.get("tools", {}),
agent_role=config_dict.get("agent_role"),
agent_domain=config_dict.get("agent_domain"),
parent_agent_id=parent_agent_id,
capabilities=config_dict.get("capabilities", []),
is_active=config_dict.get("is_active", True),
created_at=config_dict.get("created_at", datetime.now()),
updated_at=config_dict.get("updated_at", datetime.now())
)
return agent_config
def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
"""Convert dict to MultiAgentConfig model object
Args:
config_dict: Configuration dictionary
app_id: Optional app ID (if not provided in dict)
Returns:
MultiAgentConfig model instance (not yet persisted to database)
Example:
>>> config_dict = {
... "app_id": "uuid-here",
... "master_agent_id": "master-uuid",
... "master_agent_name": "Master Agent",
... "orchestration_mode": "conditional",
... "sub_agents": [
... {"agent_id": "sub1-uuid", "name": "Sub Agent 1", "role": "specialist", "priority": 1},
... {"agent_id": "sub2-uuid", "name": "Sub Agent 2", "role": "specialist", "priority": 2}
... ],
... "routing_rules": [
... {"condition": "intent == 'technical'", "target_agent_id": "sub1-uuid", "priority": 1}
... ],
... "execution_config": {"max_iterations": 5, "timeout": 60, "parallel_limit": 3},
... "aggregation_strategy": "merge"
... }
>>> multi_agent_config = dict_to_multi_agent_config(config_dict)
"""
from app.models.multi_agent_model import MultiAgentConfig
# Extract app_id
final_app_id = config_dict.get("app_id") or app_id
if not final_app_id:
raise ValueError("app_id is required")
# Convert string UUID to UUID object if needed
if isinstance(final_app_id, str):
final_app_id = uuid.UUID(final_app_id)
# Convert master_agent_id
master_agent_id = config_dict.get("master_agent_id")
if not master_agent_id:
raise ValueError("master_agent_id is required")
if isinstance(master_agent_id, str):
master_agent_id = uuid.UUID(master_agent_id)
# Create MultiAgentConfig instance
multi_agent_config = MultiAgentConfig(
id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(),
app_id=final_app_id,
master_agent_id=master_agent_id,
master_agent_name=config_dict.get("master_agent_name"),
orchestration_mode=config_dict.get("orchestration_mode", "conditional"),
sub_agents=config_dict.get("sub_agents", []),
routing_rules=config_dict.get("routing_rules"),
execution_config=config_dict.get("execution_config", {}),
aggregation_strategy=config_dict.get("aggregation_strategy", "merge"),
is_active=config_dict.get("is_active", True),
created_at=config_dict.get("created_at", datetime.now()),
updated_at=config_dict.get("updated_at", datetime.now())
)
return multi_agent_config
def dict_to_workflow_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
"""Convert dict to WorkflowConfig model object
Args:
config_dict: Configuration dictionary
app_id: Optional app ID (if not provided in dict)
Returns:
WorkflowConfig model instance (not yet persisted to database)
Example:
>>> config_dict = {
... "app_id": "uuid-here",
... "nodes": [
... {"id": "start", "type": "start", "config": {}},
... {"id": "llm", "type": "llm", "config": {"model": "gpt-4"}},
... {"id": "end", "type": "end", "config": {"output": "{{llm.output}}"}}
... ],
... "edges": [
... {"source": "start", "target": "llm"},
... {"source": "llm", "target": "end"}
... ],
... "variables": [
... {"name": "user_input", "type": "string", "default": ""}
... ],
... "execution_config": {
... "max_iterations": 10,
... "timeout": 300,
... "enable_streaming": True
... },
... "triggers": [
... {"type": "manual", "enabled": True}
... ]
... }
>>> workflow_config = dict_to_workflow_config(config_dict)
"""
from app.models.workflow_model import WorkflowConfig
# Extract app_id
final_app_id = config_dict.get("app_id") or app_id
if not final_app_id:
raise ValueError("app_id is required")
# Convert string UUID to UUID object if needed
if isinstance(final_app_id, str):
final_app_id = uuid.UUID(final_app_id)
# Create WorkflowConfig instance
workflow_config = WorkflowConfig(
id=uuid.UUID(config_dict["id"]) if "id" in config_dict else uuid.uuid4(),
app_id=final_app_id,
nodes=config_dict.get("nodes", []),
edges=config_dict.get("edges", []),
variables=config_dict.get("variables", []),
execution_config=config_dict.get("execution_config", {}),
triggers=config_dict.get("triggers", []),
is_active=config_dict.get("is_active", True),
created_at=config_dict.get("created_at", datetime.now()),
updated_at=config_dict.get("updated_at", datetime.now())
)
return workflow_config
def agent_config_to_dict(agent_config) -> Dict[str, Any]:
"""Convert AgentConfig model to dict
Args:
agent_config: AgentConfig model instance
Returns:
Configuration dictionary
"""
return {
"id": str(agent_config.id),
"app_id": str(agent_config.app_id),
"system_prompt": agent_config.system_prompt,
"default_model_config_id": str(agent_config.default_model_config_id) if agent_config.default_model_config_id else None,
"model_parameters": agent_config.model_parameters,
"knowledge_retrieval": agent_config.knowledge_retrieval,
"memory": agent_config.memory,
"variables": agent_config.variables,
"tools": agent_config.tools,
"agent_role": agent_config.agent_role,
"agent_domain": agent_config.agent_domain,
"parent_agent_id": str(agent_config.parent_agent_id) if agent_config.parent_agent_id else None,
"capabilities": agent_config.capabilities,
"is_active": agent_config.is_active,
"created_at": agent_config.created_at.isoformat() if agent_config.created_at else None,
"updated_at": agent_config.updated_at.isoformat() if agent_config.updated_at else None
}
def multi_agent_config_to_dict(multi_agent_config) -> Dict[str, Any]:
"""Convert MultiAgentConfig model to dict
Args:
multi_agent_config: MultiAgentConfig model instance
Returns:
Configuration dictionary
"""
return {
"id": str(multi_agent_config.id),
"app_id": str(multi_agent_config.app_id),
"master_agent_id": str(multi_agent_config.master_agent_id),
"master_agent_name": multi_agent_config.master_agent_name,
"orchestration_mode": multi_agent_config.orchestration_mode,
"sub_agents": multi_agent_config.sub_agents,
"routing_rules": multi_agent_config.routing_rules,
"execution_config": multi_agent_config.execution_config,
"aggregation_strategy": multi_agent_config.aggregation_strategy,
"is_active": multi_agent_config.is_active,
"created_at": multi_agent_config.created_at.isoformat() if multi_agent_config.created_at else None,
"updated_at": multi_agent_config.updated_at.isoformat() if multi_agent_config.updated_at else None
}
def workflow_config_to_dict(workflow_config) -> Dict[str, Any]:
"""Convert WorkflowConfig model to dict
Args:
workflow_config: WorkflowConfig model instance
Returns:
Configuration dictionary
"""
return {
"id": str(workflow_config.id),
"app_id": str(workflow_config.app_id),
"nodes": workflow_config.nodes,
"edges": workflow_config.edges,
"variables": workflow_config.variables,
"execution_config": workflow_config.execution_config,
"triggers": workflow_config.triggers,
"is_active": workflow_config.is_active,
"created_at": workflow_config.created_at.isoformat() if workflow_config.created_at else None,
"updated_at": workflow_config.updated_at.isoformat() if workflow_config.updated_at else None
}