Merge branch 'refs/heads/develop' into feature/20260105_xjn

# Conflicts:
#	api/app/services/app_chat_service.py
This commit is contained in:
谢俊男
2026-01-06 19:46:36 +08:00
22 changed files with 613 additions and 328 deletions

View File

@@ -728,9 +728,23 @@ async def draft_run_compare(
from app.core.exceptions import ResourceNotFoundException from app.core.exceptions import ResourceNotFoundException
raise ResourceNotFoundException("模型配置", str(model_item.model_config_id)) raise ResourceNotFoundException("模型配置", str(model_item.model_config_id))
# 获取 agent_cfg.model_parameters如果是 ModelParameters 对象则转为字典
agent_model_params = agent_cfg.model_parameters
if hasattr(agent_model_params, 'model_dump'):
agent_model_params = agent_model_params.model_dump()
elif not isinstance(agent_model_params, dict):
agent_model_params = {}
# 获取 model_item.model_parameters如果是 ModelParameters 对象则转为字典
item_model_params = model_item.model_parameters
if hasattr(item_model_params, 'model_dump'):
item_model_params = item_model_params.model_dump()
elif not isinstance(item_model_params, dict):
item_model_params = {}
merged_parameters = { merged_parameters = {
**(agent_cfg.model_parameters or {}), **(agent_model_params or {}),
**(model_item.model_parameters or {}) **(item_model_params or {})
} }
model_configs.append({ model_configs.append({

View File

@@ -1,4 +1,5 @@
import hashlib import hashlib
import json
import uuid import uuid
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Depends, Query, Request from fastapi import APIRouter, Depends, Query, Request
@@ -18,7 +19,7 @@ from app.services.conversation_service import ConversationService
from app.services.release_share_service import ReleaseShareService from app.services.release_share_service import ReleaseShareService
from app.services.shared_chat_service import SharedChatService from app.services.shared_chat_service import SharedChatService
from app.services.app_chat_service import AppChatService, get_app_chat_service from app.services.app_chat_service import AppChatService, get_app_chat_service
from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release
router = APIRouter(prefix="/public/share", tags=["Public Share"]) router = APIRouter(prefix="/public/share", tags=["Public Share"])
logger = get_business_logger() logger = get_business_logger()
@@ -364,6 +365,9 @@ async def chat(
config = release.config or {} config = release.config or {}
if not config.get("sub_agents"): if not config.get("sub_agents"):
raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING) raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING)
elif app_type == AppType.WORKFLOW:
# Multi-Agent 类型:验证多 Agent 配置
pass
else: else:
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
@@ -469,6 +473,7 @@ async def chat(
) )
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
elif app_type == AppType.MULTI_AGENT: elif app_type == AppType.MULTI_AGENT:
# config = workflow_config_4_app_release(release)
config = multi_agent_config_4_app_release(release) config = multi_agent_config_4_app_release(release)
if payload.stream: if payload.stream:
async def event_generator(): async def event_generator():
@@ -553,8 +558,71 @@ async def chat(
# ) # )
# return success(data=conversation_schema.ChatResponse(**result)) # return success(data=conversation_schema.ChatResponse(**result))
elif app_type == AppType.WORKFLOW:
config = workflow_config_4_app_release(release)
if payload.stream:
async def event_generator():
async for event in app_chat_service.workflow_chat_stream(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=new_end_user.id, # 转换为字符串
variables=payload.variables,
config=config,
web_search=payload.web_search,
memory=payload.memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=release.app_id,
workspace_id=workspace_id
):
event_type = event.get("event", "message")
event_data = event.get("data", {})
# 转换为标准 SSE 格式(字符串)
sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n"
yield sse_message
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
# 多 Agent 非流式返回
result = await app_chat_service.workflow_chat(
message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=new_end_user.id, # 转换为字符串
variables=payload.variables,
config=config,
web_search=payload.web_search,
memory=payload.memory,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
app_id=release.app_id,
workspace_id=workspace_id
)
logger.debug(
"工作流试运行返回结果",
extra={
"result_type": str(type(result)),
"has_response": "response" in result if isinstance(result, dict) else False
}
)
return success(
data=result,
msg="工作流任务执行成功"
)
# return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
else: else:
from app.core.exceptions import BusinessException from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED)
pass

View File

@@ -1,4 +1,5 @@
"""App 服务接口 - 基于 API Key 认证""" """App 服务接口 - 基于 API Key 认证"""
import json
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Depends, Request, Body from fastapi import APIRouter, Depends, Request, Body
@@ -21,7 +22,7 @@ from app.schemas.api_key_schema import ApiKeyAuth
from app.services import workspace_service from app.services import workspace_service
from app.services.app_chat_service import AppChatService, get_app_chat_service from app.services.app_chat_service import AppChatService, get_app_chat_service
from app.services.conversation_service import ConversationService, get_conversation_service from app.services.conversation_service import ConversationService, get_conversation_service
from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release
from app.services.app_service import get_app_service, AppService from app.services.app_service import get_app_service, AppService
router = APIRouter(prefix="/app", tags=["V1 - App API"]) router = APIRouter(prefix="/app", tags=["V1 - App API"])
@@ -228,22 +229,29 @@ async def chat(
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
elif app_type == AppType.WORKFLOW: elif app_type == AppType.WORKFLOW:
# 多 Agent 流式返回 # 多 Agent 流式返回
config = dict_to_workflow_config(app.current_release.config,app.id) config = workflow_config_4_app_release(app.current_release)
if payload.stream: if payload.stream:
async def event_generator(): async def event_generator():
async for event in app_chat_service.workflow_chat_stream( async for event in app_chat_service.workflow_chat_stream(
message=payload.message, message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=end_user_id, # 转换为字符串 user_id=new_end_user.id, # 转换为字符串
variables=payload.variables, variables=payload.variables,
config=config, config=config,
web_search=web_search, web_search=payload.web_search,
memory=memory, memory=payload.memory,
storage_type=storage_type, storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id user_rag_memory_id=user_rag_memory_id,
app_id=app.app_id,
workspace_id=workspace_id
): ):
yield event event_type = event.get("event", "message")
event_data = event.get("data", {})
# 转换为标准 SSE 格式(字符串)
sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n"
yield sse_message
return StreamingResponse( return StreamingResponse(
event_generator(), event_generator(),
@@ -255,21 +263,32 @@ async def chat(
} }
) )
# 非流式返回 # 多 Agent 非流式返回
result = await app_chat_service.workflow_chat( result = await app_chat_service.workflow_chat(
message=payload.message, message=payload.message,
conversation_id=conversation.id, # 使用已创建的会话 ID conversation_id=conversation.id, # 使用已创建的会话 ID
user_id=end_user_id, # 转换为字符串 user_id=new_end_user.id, # 转换为字符串
variables=payload.variables, variables=payload.variables,
config=config, config=config,
web_search=web_search, web_search=payload.web_search,
memory=memory, memory=payload.memory,
storage_type=storage_type, storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id user_rag_memory_id=user_rag_memory_id,
app_id=app.app_id,
workspace_id=workspace_id
)
logger.debug(
"工作流试运行返回结果",
extra={
"result_type": str(type(result)),
"has_response": "response" in result if isinstance(result, dict) else False
}
)
return success(
data=result,
msg="工作流任务执行成功"
) )
return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json"))
else: else:
from app.core.exceptions import BusinessException from app.core.exceptions import BusinessException
from app.core.error_codes import BizCode from app.core.error_codes import BizCode

View File

@@ -11,6 +11,7 @@ from app.db import get_db
from app.core.logging_config import get_api_logger from app.core.logging_config import get_api_logger
from app.core.response_utils import success, fail from app.core.response_utils import success, fail
from app.core.error_codes import BizCode from app.core.error_codes import BizCode
from app.core.api_key_utils import timestamp_to_datetime
from app.services.user_memory_service import ( from app.services.user_memory_service import (
UserMemoryService, UserMemoryService,
analytics_memory_types, analytics_memory_types,
@@ -356,7 +357,7 @@ async def update_end_user_profile(
if 'hire_date' in update_data: if 'hire_date' in update_data:
hire_date_timestamp = update_data['hire_date'] hire_date_timestamp = update_data['hire_date']
if hire_date_timestamp is not None: if hire_date_timestamp is not None:
update_data['hire_date'] = UserMemoryService.timestamp_to_datetime(hire_date_timestamp) update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp)
# 如果是 None保持 None允许清空 # 如果是 None保持 None允许清空
for field, value in update_data.items(): for field, value in update_data.items():

View File

@@ -85,33 +85,21 @@ Example Output:
===End of Example=== ===End of Example===
===Reflection Process=== ===Internal Quality Checks (DO NOT OUTPUT)===
After generating the profile, perform the following self-review steps: Before generating your final output, internally verify:
1. All content is grounded in provided data (no fabrication)
2. Format follows the specified structure with correct headers
3. Tone is objective, third-person, and neutral
4. All four sections are complete and within character limits
**Step 1: Data Grounding Check** **IMPORTANT: These checks are for your internal use only. DO NOT include them in your output.**
- Verify all statements are supported by the provided entities and statements
- Ensure no fabricated or speculated information is included
- Confirm all claims can be traced back to the input data
**Step 2: Format Compliance**
- Verify each section follows the specified format with section headers
- Check character count limits for each section
- Ensure proper use of section markers (【】)
**Step 3: Tone and Style Review**
- Confirm objective third-person perspective is maintained
- Check for excessive adjectives or empty phrases
- Verify neutral and restrained tone throughout
**Step 4: Completeness Check**
- Ensure all four sections are present and complete
- Verify each section addresses its specific focus area
- Confirm the one-sentence summary effectively captures the user's essence
===Output Requirements=== ===Output Requirements===
**CRITICAL: Your response must ONLY contain the four sections below. Do not include any reflection, self-review, or meta-commentary.**
**LANGUAGE REQUIREMENT:** **LANGUAGE REQUIREMENT:**
- The output language should ALWAYS be Chinese (Simplified) - The output language should ALWAYS be Chinese (Simplified)
- All section content must be in Chinese - All section content must be in Chinese
@@ -122,3 +110,5 @@ After generating the profile, perform the following self-review steps:
- Content follows immediately after the header - Content follows immediately after the header
- Sections are separated by blank lines - Sections are separated by blank lines
- Strictly adhere to character limits for each section - Strictly adhere to character limits for each section
- **DO NOT include any text after the 【一句话总结】 section**
- **DO NOT output reflection steps, self-review, or verification notes**

View File

@@ -86,7 +86,12 @@ class AgentConfigConverter:
# 1. 解析模型参数配置 # 1. 解析模型参数配置
if model_parameters: if model_parameters:
from app.schemas.app_schema import ModelParameters from app.schemas.app_schema import ModelParameters
result["model_parameters"] = ModelParameters(**model_parameters) if isinstance(model_parameters, ModelParameters):
result["model_parameters"] = model_parameters
elif isinstance(model_parameters, dict):
result["model_parameters"] = ModelParameters(**model_parameters)
else:
result["model_parameters"] = ModelParameters()
# 2. 解析知识库检索配置 # 2. 解析知识库检索配置
if knowledge_retrieval: if knowledge_retrieval:

View File

@@ -9,7 +9,11 @@ from fastapi import Depends
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.core.agent.langchain_agent import LangChainAgent from app.core.agent.langchain_agent import LangChainAgent
from app.core.error_codes import BizCode
from app.core.exceptions import BusinessException
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.db import get_db, get_db_context
from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig
from app.services.tool_service import ToolService from app.services.tool_service import ToolService
from app.repositories.tool_repository import ToolRepository from app.repositories.tool_repository import ToolRepository
from app.db import get_db from app.db import get_db
@@ -20,6 +24,7 @@ from app.services.draft_run_service import create_knowledge_retrieval_tool, crea
from app.services.draft_run_service import create_web_search_tool from app.services.draft_run_service import create_web_search_tool
from app.services.model_service import ModelApiKeyService from app.services.model_service import ModelApiKeyService
from app.services.multi_agent_orchestrator import MultiAgentOrchestrator from app.services.multi_agent_orchestrator import MultiAgentOrchestrator
from app.services.workflow_service import WorkflowService
logger = get_business_logger() logger = get_business_logger()
@@ -100,6 +105,21 @@ class AppChatService:
memory_tool = create_long_term_memory_tool(memory_config, user_id) memory_tool = create_long_term_memory_tool(memory_config, user_id)
tools.append(memory_tool) tools.append(memory_tool)
# web_tools = config.tools
# web_search_choice = web_tools.get("web_search", {})
# web_search_enable = web_search_choice.get("enabled", False)
# if web_search == True:
# if web_search_enable == True:
# search_tool = create_web_search_tool({})
# tools.append(search_tool)
#
# logger.debug(
# "已添加网络搜索工具",
# extra={
# "tool_count": len(tools)
# }
# )
# 获取模型参数 # 获取模型参数
model_parameters = config.model_parameters model_parameters = config.model_parameters
@@ -482,7 +502,9 @@ class AppChatService:
self, self,
message: str, message: str,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
config: AgentConfig, config: WorkflowConfig,
app_id: uuid.UUID,
workspace_id: uuid.UUID,
user_id: Optional[str] = None, user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None, variables: Optional[Dict[str, Any]] = None,
web_search: bool = False, web_search: bool = False,
@@ -491,280 +513,158 @@ class AppChatService:
user_rag_memory_id: Optional[str] = None, user_rag_memory_id: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""聊天(非流式)""" """聊天(非流式)"""
workflow_service = WorkflowService(self.db)
start_time = time.time() input_data = {"message":message, "variables": variables,
config_id = None "conversation_id": str(conversation_id)}
inconfig = workflow_service.get_workflow_config(app_id)
if variables is None: # 2. 创建执行记录
variables = {} execution = workflow_service.create_execution(
workflow_config_id=inconfig.id,
app_id=app_id,
trigger_type="manual",
triggered_by=None,
conversation_id=conversation_id,
input_data=input_data
)
# 获取模型配置ID # 3. 构建工作流配置字典
model_config_id = config.default_model_config_id workflow_config_dict = {
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) "nodes": config.nodes,
# 处理系统提示词(支持变量替换) "edges": config.edges,
system_prompt = config.get("system_prompt", "") "variables": config.variables,
if variables: "execution_config": config.execution_config
system_prompt_rendered = render_prompt_message( }
system_prompt,
PromptMessageRole.USER, # 4. 获取工作空间 ID从 app 获取)
variables
# 5. 执行工作流
from app.core.workflow.executor import execute_workflow
try:
# 更新状态为运行中
workflow_service.update_execution_status(execution.execution_id, "running")
result = await execute_workflow(
workflow_config=workflow_config_dict,
input_data=input_data,
execution_id=execution.execution_id,
workspace_id=str(workspace_id),
user_id=user_id
) )
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
# 准备工具列表 # 更新执行结果
tools = [] if result.get("status") == "completed":
workflow_service.update_execution_status(
# 添加知识库检索工具 execution.execution_id,
knowledge_retrieval = config.get("knowledge_retrieval") "completed",
if knowledge_retrieval: output_data=result.get("node_outputs", {})
knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) )
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] else:
if kb_ids: workflow_service.update_execution_status(
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) execution.execution_id,
tools.append(kb_tool) "failed",
error_message=result.get("error")
# 添加长期记忆工具
memory_flag = False
if memory == True:
memory_config = config.get("memory", {})
if memory_config.get("enabled") and user_id:
memory_flag = True
memory_tool = create_long_term_memory_tool(memory_config, user_id)
tools.append(memory_tool)
web_tools = config.get("tools")
web_search_choice = web_tools.get("web_search", {})
web_search_enable = web_search_choice.get("enabled", False)
if web_search == True:
if web_search_enable == True:
search_tool = create_web_search_tool({})
tools.append(search_tool)
logger.debug(
"已添加网络搜索工具",
extra={
"tool_count": len(tools)
}
) )
# 获取模型参数 # 返回增强的响应结构
model_parameters = config.get("model_parameters", {}) return {
"execution_id": execution.execution_id,
"status": result.get("status"),
"output": result.get("output"), # 最终输出(字符串)
"output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据)
"conversation_id": result.get("conversation_id"), # 所有节点输出详细数据payload., # 会话 ID
"error_message": result.get("error"),
"elapsed_time": result.get("elapsed_time"),
"token_usage": result.get("token_usage")
}
# 创建 LangChain Agent except Exception as e:
agent = LangChainAgent( logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
model_name=api_key_obj.model_name, workflow_service.update_execution_status(
api_key=api_key_obj.api_key, execution.execution_id,
provider=api_key_obj.provider, "failed",
api_base=api_key_obj.api_base, error_message=str(e)
temperature=model_parameters.get("temperature", 0.7), )
max_tokens=model_parameters.get("max_tokens", 2000), raise BusinessException(
system_prompt=system_prompt, code=BizCode.INTERNAL_ERROR,
tools=tools, message=f"工作流执行失败: {str(e)}"
)
# 加载历史消息
history = []
memory_config = {"enabled": True, 'max_history': 10}
if memory_config.get("enabled"):
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=memory_config.get("max_history", 10)
) )
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# 调用 Agent
result = await agent.chat(
message=message,
history=history,
context=None,
end_user_id=user_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
)
# 保存消息
self.conversation_service.save_conversation_messages(
conversation_id=conversation_id,
user_message=message,
assistant_message=result["content"]
)
elapsed_time = time.time() - start_time
return {
"conversation_id": conversation_id,
"message": result["content"],
"usage": result.get("usage", {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}),
"elapsed_time": elapsed_time
}
async def workflow_chat_stream( async def workflow_chat_stream(
self, self,
message: str, message: str,
conversation_id: uuid.UUID, conversation_id: uuid.UUID,
config: AgentConfig, config: WorkflowConfig,
app_id: uuid.UUID,
workspace_id: uuid.UUID,
user_id: Optional[str] = None, user_id: Optional[str] = None,
variables: Optional[Dict[str, Any]] = None, variables: Optional[Dict[str, Any]] = None,
web_search: bool = False, web_search: bool = False,
memory: bool = True, memory: bool = True,
storage_type: Optional[str] = None, storage_type: Optional[str] = None,
user_rag_memory_id: Optional[str] = None, user_rag_memory_id: Optional[str] = None,
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
"""聊天(流式)""" """聊天(流式)"""
workflow_service = WorkflowService(self.db)
input_data = {"message": message, "variables": variables,
"conversation_id": str(conversation_id)}
inconfig = workflow_service.get_workflow_config(app_id)
# 2. 创建执行记录
execution = workflow_service.create_execution(
workflow_config_id=inconfig.id,
app_id=app_id,
trigger_type="manual",
triggered_by=None,
conversation_id=conversation_id,
input_data=input_data
)
# 3. 构建工作流配置字典
workflow_config_dict = {
"nodes": config.nodes,
"edges": config.edges,
"variables": config.variables,
"execution_config": config.execution_config
}
# 4. 获取工作空间 ID从 app 获取)
# 5. 流式执行工作流
try: try:
start_time = time.time() # 更新状态为运行中
config_id = None workflow_service.update_execution_status(execution.execution_id, "running")
if variables is None:
variables = {}
# 获取模型配置ID # 调用流式执行executor 会发送 workflow_start 和 workflow_end 事件)
model_config_id = config.default_model_config_id async for event in workflow_service._run_workflow_stream(
api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) workflow_config=workflow_config_dict,
# 处理系统提示词(支持变量替换) input_data=input_data,
system_prompt = config.get("system_prompt", "") execution_id=execution.execution_id,
if variables: workspace_id=str(workspace_id),
system_prompt_rendered = render_prompt_message( user_id=user_id
system_prompt,
PromptMessageRole.USER,
variables
)
system_prompt = system_prompt_rendered.get_text_content() or system_prompt
# 准备工具列表
tools = []
# 获取工具服务
tool_service = ToolService(self.db)
# 从配置中获取启用的工具
if hasattr(config, 'tools') and config.tools:
for tool_id, tool_config in config.tools.items():
if tool_config.get("enabled", False):
# 根据工具名称查找工具实例
tool_instance = tool_service._get_tool_instance(tool_id, ToolRepository.get_tenant_id_by_workspace_id(self.db, workspace_id))
if tool_instance:
# 转换为LangChain工具
langchain_tool = tool_instance.to_langchain_tool(tool_config.get("config", {}).get("operation", None))
tools.append(langchain_tool)
# 添加知识库检索工具
knowledge_retrieval = config.get("knowledge_retrieval")
if knowledge_retrieval:
knowledge_bases = knowledge_retrieval.get("knowledge_bases", [])
kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")]
if kb_ids:
kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id)
tools.append(kb_tool)
# 添加长期记忆工具
memory_flag = False
if memory:
memory_config = config.get("memory", {})
if memory_config.get("enabled") and user_id:
memory_flag = True
memory_tool = create_long_term_memory_tool(memory_config, user_id)
tools.append(memory_tool)
# 获取模型参数
model_parameters = config.get("model_parameters", {})
# 创建 LangChain Agent
agent = LangChainAgent(
model_name=api_key_obj.model_name,
api_key=api_key_obj.api_key,
provider=api_key_obj.provider,
api_base=api_key_obj.api_base,
temperature=model_parameters.get("temperature", 0.7),
max_tokens=model_parameters.get("max_tokens", 2000),
system_prompt=system_prompt,
tools=tools,
streaming=True
)
# 加载历史消息
history = []
memory_config = {"enabled": True, 'max_history': 10}
if memory_config.get("enabled"):
messages = self.conversation_service.get_messages(
conversation_id=conversation_id,
limit=memory_config.get("max_history", 10)
)
history = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n"
# 流式调用 Agent
full_content = ""
async for chunk in agent.chat_stream(
message=message,
history=history,
context=None,
end_user_id=user_id,
storage_type=storage_type,
user_rag_memory_id=user_rag_memory_id,
config_id=config_id,
memory_flag=memory_flag
): ):
full_content += chunk # 直接转发 executor 的事件(已经是正确的格式)
# 发送消息块事件 yield event
yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n"
elapsed_time = time.time() - start_time
# 保存消息
self.conversation_service.add_message(
conversation_id=conversation_id,
role="user",
content=message
)
self.conversation_service.add_message(
conversation_id=conversation_id,
role="assistant",
content=full_content,
meta_data={
"model": api_key_obj.model_name,
"usage": {}
}
)
# 发送结束事件
end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)}
yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n"
logger.info(
"流式聊天完成",
extra={
"conversation_id": str(conversation_id),
"elapsed_time": elapsed_time,
"message_length": len(full_content)
}
)
except (GeneratorExit, asyncio.CancelledError):
# 生成器被关闭或任务被取消,正常退出
logger.debug("流式聊天被中断")
raise
except Exception as e: except Exception as e:
logger.error(f"流式聊天失败: {str(e)}", exc_info=True) logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True)
workflow_service.update_execution_status(
execution.execution_id,
"failed",
error_message=str(e)
)
# 发送错误事件 # 发送错误事件
yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" yield {
"event": "error",
"data": {
"execution_id": execution.execution_id,
"error": str(e)
}
}
# ==================== 依赖注入函数 ==================== # ==================== 依赖注入函数 ====================

View File

@@ -21,6 +21,7 @@ from app.core.exceptions import (
BusinessException, BusinessException,
) )
from app.core.logging_config import get_business_logger from app.core.logging_config import get_business_logger
from app.core.workflow.validator import WorkflowValidator
from app.db import get_db from app.db import get_db
from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig
from app.models.app_model import AppStatus, AppType from app.models.app_model import AppStatus, AppType
@@ -31,6 +32,7 @@ from app.schemas.workflow_schema import WorkflowConfigUpdate
from app.services.agent_config_converter import AgentConfigConverter from app.services.agent_config_converter import AgentConfigConverter
from app.models import AppShare, Workspace from app.models import AppShare, Workspace
from app.services.model_service import ModelApiKeyService from app.services.model_service import ModelApiKeyService
from app.services.workflow_service import WorkflowService
# 获取业务日志器 # 获取业务日志器
logger = get_business_logger() logger = get_business_logger()
@@ -1225,6 +1227,26 @@ class AppService:
"orchestration_mode": multi_agent_cfg.orchestration_mode "orchestration_mode": multi_agent_cfg.orchestration_mode
} }
) )
elif app.type == AppType.WORKFLOW:
service = WorkflowService(self.db)
workflow_cfg = service.get_workflow_config(app_id)
if not workflow_cfg:
raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING)
config = {
"nodes": workflow_cfg.nodes,
"edges": workflow_cfg.edges,
"variables": workflow_cfg.variables,
"execution_config": workflow_cfg.execution_config,
"triggers": workflow_cfg.triggers
}
is_valid, errors = WorkflowValidator.validate_for_publish(config)
if not is_valid:
raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING)
logger.info(
"应用发布配置准备完成"
)
now = datetime.datetime.now() now = datetime.datetime.now()
version = self._get_next_version(app_id) version = self._get_next_version(app_id)

View File

@@ -1293,6 +1293,7 @@ class MultiAgentOrchestrator:
conversation_id: 会话 ID conversation_id: 会话 ID
user_id: 用户 ID user_id: 用户 ID
Returns: Returns:
执行结果 执行结果
""" """

View File

@@ -1054,6 +1054,28 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str,
core_values = core_values_match.group(1).strip() if core_values_match else "" core_values = core_values_match.group(1).strip() if core_values_match else ""
one_sentence = one_sentence_match.group(1).strip() if one_sentence_match else "" one_sentence = one_sentence_match.group(1).strip() if one_sentence_match else ""
# 6) 清理可能包含的反思内容(防御性编程)
# 如果 LLM 仍然输出了反思内容,在这里过滤掉
def clean_reflection_content(text: str) -> str:
"""移除可能包含的反思内容"""
if not text:
return text
# 移除 "---" 之后的所有内容(通常是反思部分的开始)
if '---' in text:
text = text.split('---')[0].strip()
# 移除 "**Step" 开头的内容
if '**Step' in text:
text = text.split('**Step')[0].strip()
# 移除 "Self-Review" 相关内容
if 'Self-Review' in text or 'self-review' in text:
text = re.sub(r'[\-\*]*\s*Self-Review.*$', '', text, flags=re.IGNORECASE | re.DOTALL).strip()
return text
user_summary = clean_reflection_content(user_summary)
personality = clean_reflection_content(personality)
core_values = clean_reflection_content(core_values)
one_sentence = clean_reflection_content(one_sentence)
return { return {
"user_summary": user_summary, "user_summary": user_summary,
"personality": personality, "personality": personality,

View File

@@ -17,6 +17,7 @@ from app.core.workflow.validator import validate_workflow_config
from app.db import get_db, get_db_context from app.db import get_db, get_db_context
from app.models.workflow_model import WorkflowConfig, WorkflowExecution from app.models.workflow_model import WorkflowConfig, WorkflowExecution
from app.repositories.end_user_repository import EndUserRepository from app.repositories.end_user_repository import EndUserRepository
from app.services.multi_agent_service import convert_uuids_to_str
from app.repositories.workflow_repository import ( from app.repositories.workflow_repository import (
WorkflowConfigRepository, WorkflowConfigRepository,
WorkflowExecutionRepository, WorkflowExecutionRepository,
@@ -364,7 +365,7 @@ class WorkflowService:
execution.status = status execution.status = status
if output_data is not None: if output_data is not None:
execution.output_data = output_data execution.output_data = convert_uuids_to_str(output_data)
if error_message is not None: if error_message is not None:
execution.error_message = error_message execution.error_message = error_message
if error_node_id is not None: if error_node_id is not None:

View File

@@ -8,7 +8,7 @@ import uuid
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from datetime import datetime from datetime import datetime
from app.models import AppRelease from app.models import AppRelease, WorkflowConfig
from app.models.agent_app_config_model import AgentConfig from app.models.agent_app_config_model import AgentConfig
from app.models.multi_agent_model import MultiAgentConfig from app.models.multi_agent_model import MultiAgentConfig
@@ -63,6 +63,24 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig:
return agent_config return agent_config
def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig:
config_dict = release.config
config = WorkflowConfig(
id=release.id,
app_id=release.app_id,
nodes=config_dict.get("nodes", []),
edges=config_dict.get("edges", []),
variables=config_dict.get("variables", []),
execution_config=config_dict.get("execution_config", {}),
triggers=config_dict.get("triggers", [])
)
return config
def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None):
"""Convert dict to MultiAgentConfig model object """Convert dict to MultiAgentConfig model object

View File

@@ -1,5 +1,6 @@
import { request } from '@/utils/request' import { request } from '@/utils/request'
import type { AiPromptForm } from '@/views/ApplicationConfig/types' import type { AiPromptForm } from '@/views/ApplicationConfig/types'
import { handleSSE, type SSEMessage } from '@/utils/stream'
export const createPromptSessions = () => { export const createPromptSessions = () => {
return request.post(`/prompt/sessions`) return request.post(`/prompt/sessions`)
@@ -7,6 +8,6 @@ export const createPromptSessions = () => {
export const getPrompt = (session_id: string) => { export const getPrompt = (session_id: string) => {
return request.get(`/prompt/sessions/${session_id}`) return request.get(`/prompt/sessions/${session_id}`)
} }
export const updatePromptMessages = (session_id: string, data: AiPromptForm) => { export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void) => {
return request.post(`/prompt/sessions/${session_id}/messages`, data) return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage)
} }

View File

@@ -1223,6 +1223,8 @@ export const en = {
key_findings: 'Key Findings', key_findings: 'Key Findings',
behavior_pattern: 'Behavior Pattern', behavior_pattern: 'Behavior Pattern',
growth_trajectory: 'Growth Trajectory', growth_trajectory: 'Growth Trajectory',
personality: 'Personality Traits',
core_values: 'Core Values',
}, },
space: { space: {
createSpace: 'Create Space', createSpace: 'Create Space',

View File

@@ -1304,6 +1304,8 @@ export const zh = {
key_findings: '关键发现', key_findings: '关键发现',
behavior_pattern: '行为模式', behavior_pattern: '行为模式',
growth_trajectory: '成长轨迹', growth_trajectory: '成长轨迹',
personality: '性格特点',
core_values: '核心价值观',
}, },
space: { space: {
createSpace: '创建空间', createSpace: '创建空间',

View File

@@ -16,6 +16,8 @@ import ConversationEmptyIcon from '@/assets/images/conversation/conversationEmpt
import type { ChatItem } from '@/components/Chat/types' import type { ChatItem } from '@/components/Chat/types'
import CustomSelect from '@/components/CustomSelect' import CustomSelect from '@/components/CustomSelect'
import AiPromptVariableModal from './AiPromptVariableModal' import AiPromptVariableModal from './AiPromptVariableModal'
import { type SSEMessage } from '@/utils/stream'
import Editor from './Editor'
interface AiPromptModalProps { interface AiPromptModalProps {
refresh: (value: string) => void; refresh: (value: string) => void;
@@ -35,7 +37,8 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
const [variables, setVariables] = useState<string[]>([]) const [variables, setVariables] = useState<string[]>([])
const [promptSession, setPromptSession] = useState<string | null>(null) const [promptSession, setPromptSession] = useState<string | null>(null)
const aiPromptVariableModalRef = useRef<AiPromptVariableModalRef>(null) const aiPromptVariableModalRef = useRef<AiPromptVariableModalRef>(null)
const currentPromptRef = useRef<any>(null) const editorRef = useRef<any>(null)
const currentPromptValueRef = useRef<string>('')
const values = Form.useWatch([], form) const values = Form.useWatch([], form)
@@ -78,16 +81,45 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
setChatList(prev => { setChatList(prev => {
return [...prev, { role: 'user', content: messageContent}] return [...prev, { role: 'user', content: messageContent}]
}) })
form.setFieldsValue({ message: undefined }) form.setFieldsValue({ message: undefined, current_prompt: undefined })
updatePromptMessages(promptSession, values)
.then(res => { const handleStreamMessage = (data: SSEMessage[]) => {
const response = res as { prompt: string; desc: string; variables: string[] } data.map(item => {
form.setFieldsValue({ current_prompt: response.prompt }) const { content, desc, variables } = item.data as { content: string; desc: string; variables: string[] };
setChatList(prev => {
return [...prev, { role: 'assistant', content: response.desc }] switch (item.event) {
}) case 'start':
setVariables(response.variables) currentPromptValueRef.current = ''
break;
case 'message':
if (content) {
currentPromptValueRef.current += content;
form.setFieldsValue({ current_prompt: currentPromptValueRef.current })
}
if (desc) {
setChatList(prev => {
return [...prev, { role: 'assistant', content: desc }]
})
}
if (variables) {
setVariables(variables)
}
break;
case 'end':
setLoading(false)
break
}
}) })
};
updatePromptMessages(promptSession, values, handleStreamMessage)
// .then(res => {
// const response = res as { prompt: string; desc: string; variables: string[] }
// form.setFieldsValue({ current_prompt: response.prompt })
// setChatList(prev => {
// return [...prev, { role: 'assistant', content: response.desc }]
// })
// setVariables(response.variables)
// })
.finally(() => { .finally(() => {
setLoading(false) setLoading(false)
}) })
@@ -101,18 +133,8 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
aiPromptVariableModalRef.current?.handleOpen() aiPromptVariableModalRef.current?.handleOpen()
} }
const handleVariableApply = (value: string) => { const handleVariableApply = (value: string) => {
const textArea = currentPromptRef.current?.resizableTextArea?.textArea if (editorRef.current?.insertText) {
if (textArea) { editorRef.current.insertText(value)
const cursorPosition = textArea.selectionStart
const currentValue = values.current_prompt || ''
const newValue = currentValue.slice(0, cursorPosition) + value + currentValue.slice(cursorPosition)
form.setFieldValue('current_prompt', newValue)
// 设置新的光标位置
setTimeout(() => {
textArea.focus()
textArea.setSelectionRange(cursorPosition + value.length, cursorPosition + value.length)
}, 0)
} else { } else {
form.setFieldValue('current_prompt', (values.current_prompt || '') + value) form.setFieldValue('current_prompt', (values.current_prompt || '') + value)
} }
@@ -191,7 +213,11 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
</Col> </Col>
</Row> </Row>
<Form.Item name="current_prompt"> <Form.Item name="current_prompt">
<Input.TextArea ref={currentPromptRef} className="rb:bg-[#FBFDFF]! rb:h-100.5!" /> <Editor
ref={editorRef}
className="rb:h-100.5 "
onChange={(value) => form.setFieldValue('current_prompt', value)}
/>
</Form.Item> </Form.Item>
<div className="rb:grid rb:grid-cols-2 rb:gap-4 rb:mt-6"> <div className="rb:grid rb:grid-cols-2 rb:gap-4 rb:mt-6">
<Button block disabled={!values?.current_prompt} onClick={handleCopy}>{t('common.copy')}</Button> <Button block disabled={!values?.current_prompt} onClick={handleCopy}>{t('common.copy')}</Button>

View File

@@ -0,0 +1,91 @@
import {forwardRef, useImperativeHandle } from 'react';
import clsx from 'clsx';
import { LexicalComposer } from '@lexical/react/LexicalComposer';
import { RichTextPlugin } from '@lexical/react/LexicalRichTextPlugin';
import { ContentEditable } from '@lexical/react/LexicalContentEditable';
import { LexicalErrorBoundary } from '@lexical/react/LexicalErrorBoundary';
import { $getSelection } from 'lexical';
import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext';
import InitialValuePlugin from './plugin/InitialValuePlugin'
import LineBreakPlugin from './plugin/LineBreakPlugin';
import InsertTextPlugin from './plugin/InsertTextPlugin';
export interface EditorRef {
insertText: (text: string) => void;
}
interface LexicalEditorProps {
className?: string;
placeholder?: string;
value?: string;
onChange?: (value: string) => void;
height?: number;
}
const theme = {
paragraph: 'editor-paragraph',
text: {
bold: 'editor-text-bold',
italic: 'editor-text-italic',
},
};
const EditorContent = forwardRef<EditorRef, LexicalEditorProps>(({
className = '',
value,
placeholder = "请输入内容...",
onChange,
}, ref) => {
const [editor] = useLexicalComposerContext();
useImperativeHandle(ref, () => ({
insertText: (text: string) => {
editor.update(() => {
const selection = $getSelection();
if (selection) {
selection.insertText(text);
}
});
}
}), [editor]);
return (
<div style={{ position: 'relative' }}>
<RichTextPlugin
contentEditable={
<ContentEditable
className={clsx("rb:outline-none rb:resize-none rb:text-[14px] rb:leading-5 rb:px-4 rb:py-5 rb:bg-[#FBFDFF] rb:border rb:border-[#DFE4ED] rb:rounded-lg rb:overflow-auto", className)}
/>
}
placeholder={
<div className="rb:absolute rb:px-4 rb:py-5 rb:text-[14px] rb:text-[#5B6167] rb:leading-5 rb:pointer-none">
{placeholder}
</div>
}
ErrorBoundary={LexicalErrorBoundary}
/>
<LineBreakPlugin onChange={onChange} />
<InitialValuePlugin value={value} />
<InsertTextPlugin />
</div>
);
});
const Editor = forwardRef<EditorRef, LexicalEditorProps>((props, ref) => {
const initialConfig = {
namespace: 'Editor',
theme,
nodes: [],
onError: (error: Error) => {
console.error(error);
},
};
return (
<LexicalComposer initialConfig={initialConfig}>
<EditorContent {...props} ref={ref} />
</LexicalComposer>
);
});
export default Editor;

View File

@@ -0,0 +1,25 @@
import { type FC, useEffect } from 'react';
import { $getRoot, $createParagraphNode, $createTextNode } from 'lexical';
import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext';
// 设置初始值的插件
const InitialValuePlugin: FC<{ value?: string }> = ({ value }) => {
const [editor] = useLexicalComposerContext();
useEffect(() => {
if (value) {
editor.update(() => {
const root = $getRoot();
root.clear();
const paragraph = $createParagraphNode();
const textNode = $createTextNode(value);
paragraph.append(textNode);
root.append(paragraph);
});
}
}, [editor, value]);
return null;
};
export default InitialValuePlugin

View File

@@ -0,0 +1,24 @@
import { forwardRef, useImperativeHandle } from 'react';
import { $getSelection } from 'lexical';
import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext';
import type { EditorRef } from '../index'
// 插入文本的插件
const InsertTextPlugin = forwardRef<EditorRef>((_, ref) => {
const [editor] = useLexicalComposerContext();
useImperativeHandle(ref, () => ({
insertText: (text: string) => {
editor.update(() => {
const selection = $getSelection();
if (selection) {
selection.insertText(text);
}
});
}
}), [editor]);
return null;
});
export default InsertTextPlugin;

View File

@@ -0,0 +1,24 @@
import { type FC, useEffect } from 'react';
import { $getRoot } from 'lexical';
import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext';
// 处理换行的插件
const LineBreakPlugin: FC<{ onChange?: (value: string) => void }> = ({ onChange }) => {
const [editor] = useLexicalComposerContext();
useEffect(() => {
return editor.registerUpdateListener(({ editorState }) => {
editorState.read(() => {
const root = $getRoot();
const textContent = root.getTextContent();
// 将\n转换为实际换行
const processedContent = textContent.replace(/\\n/g, '\n');
onChange?.(processedContent);
});
});
}, [editor, onChange]);
return null;
};
export default LineBreakPlugin;

View File

@@ -5,16 +5,25 @@ import { Skeleton } from 'antd';
import RbCard from '@/components/RbCard/Card' import RbCard from '@/components/RbCard/Card'
import Empty from '@/components/Empty'; import Empty from '@/components/Empty';
import RbAlert from '@/components/RbAlert';
import { import {
getUserSummary, getUserSummary,
} from '@/api/memory' } from '@/api/memory'
import type { AboutMeRef } from '../types' import type { AboutMeRef } from '../types'
interface Data {
user_summary: string;
personality: string;
core_values: string;
one_sentence: string;
[key: string]: string;
}
const AboutMe = forwardRef<AboutMeRef>((_props, ref) => { const AboutMe = forwardRef<AboutMeRef>((_props, ref) => {
const { t } = useTranslation() const { t } = useTranslation()
const { id } = useParams() const { id } = useParams()
const [loading, setLoading] = useState<boolean>(false) const [loading, setLoading] = useState<boolean>(false)
const [data, setData] = useState<string | null>(null) const [data, setData] = useState<Data>({} as Data)
useEffect(() => { useEffect(() => {
if (!id) return if (!id) return
@@ -27,7 +36,7 @@ const AboutMe = forwardRef<AboutMeRef>((_props, ref) => {
setLoading(true) setLoading(true)
getUserSummary(id) getUserSummary(id)
.then((res) => { .then((res) => {
setData((res as { summary?: string }).summary || null) setData((res as Data) || null)
}) })
.finally(() => { .finally(() => {
setLoading(false) setLoading(false)
@@ -44,10 +53,29 @@ const AboutMe = forwardRef<AboutMeRef>((_props, ref) => {
> >
{loading {loading
? <Skeleton className="rb:mt-4" /> ? <Skeleton className="rb:mt-4" />
: data : Object.keys(data).filter(key => data[key] !== null).length > 0
? <div className="rb:font-regular rb:leading-5 rb:text-[#5B6167]"> ? <>
{data || '-'} {data.user_summary &&
</div> <div className="rb:font-regular rb:leading-5 rb:text-[#5B6167]">
{data.user_summary}
</div>
}
{data.personality && <>
<div className="rb:pt-4 rb:font-medium rb:leading-5 rb:mb-2">{t('userMemory.personality')}</div>
<div className="rb:font-regular rb:leading-5 rb:text-[#5B6167]">
{data.personality}
</div>
</>}
{data.core_values && <>
<div className="rb:pt-4 rb:font-medium rb:leading-5 rb:mb-2">{t('userMemory.core_values')}</div>
<div className="rb:font-regular rb:leading-5 rb:text-[#5B6167]">
{data.core_values}
</div>
</>}
{data.one_sentence &&
<RbAlert className="rb:mt-4">{data.one_sentence}</RbAlert>
}
</>
: <Empty size={88} className="rb:mt-12 rb:mb-20.25" /> : <Empty size={88} className="rb:mt-12 rb:mb-20.25" />
} }
</RbCard> </RbCard>

View File

@@ -394,7 +394,8 @@ export const nodeLibrary: NodeLibrary[] = [
defaultValue: {} defaultValue: {}
}, },
retry: { retry: {
type: 'define', type: 'switch',
defaultValue: false
}, },
error_handle: { error_handle: {
type: 'define', type: 'define',