From 35a06c3cbe7651e9a1cbde2bae91a43fdb12f848 Mon Sep 17 00:00:00 2001 From: zhaoying Date: Tue, 6 Jan 2026 15:03:40 +0800 Subject: [PATCH 1/6] feat(web): ai prompt api support stream --- web/src/api/prompt.ts | 5 +- .../components/AiPromptModal.tsx | 72 ++++++++++----- .../components/Editor/index.tsx | 91 +++++++++++++++++++ .../Editor/plugin/InitialValuePlugin.tsx | 25 +++++ .../Editor/plugin/InsertTextPlugin.tsx | 24 +++++ .../Editor/plugin/LineBreakPlugin.tsx | 24 +++++ web/src/views/Workflow/constant.ts | 3 +- 7 files changed, 218 insertions(+), 26 deletions(-) create mode 100644 web/src/views/ApplicationConfig/components/Editor/index.tsx create mode 100644 web/src/views/ApplicationConfig/components/Editor/plugin/InitialValuePlugin.tsx create mode 100644 web/src/views/ApplicationConfig/components/Editor/plugin/InsertTextPlugin.tsx create mode 100644 web/src/views/ApplicationConfig/components/Editor/plugin/LineBreakPlugin.tsx diff --git a/web/src/api/prompt.ts b/web/src/api/prompt.ts index 77ea1271..526f50ac 100644 --- a/web/src/api/prompt.ts +++ b/web/src/api/prompt.ts @@ -1,5 +1,6 @@ import { request } from '@/utils/request' import type { AiPromptForm } from '@/views/ApplicationConfig/types' +import { handleSSE, type SSEMessage } from '@/utils/stream' export const createPromptSessions = () => { return request.post(`/prompt/sessions`) @@ -7,6 +8,6 @@ export const createPromptSessions = () => { export const getPrompt = (session_id: string) => { return request.get(`/prompt/sessions/${session_id}`) } -export const updatePromptMessages = (session_id: string, data: AiPromptForm) => { - return request.post(`/prompt/sessions/${session_id}/messages`, data) +export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void) => { + return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage) } \ No newline at end of file diff --git a/web/src/views/ApplicationConfig/components/AiPromptModal.tsx b/web/src/views/ApplicationConfig/components/AiPromptModal.tsx index a85f5cf1..f52c0675 100644 --- a/web/src/views/ApplicationConfig/components/AiPromptModal.tsx +++ b/web/src/views/ApplicationConfig/components/AiPromptModal.tsx @@ -16,6 +16,8 @@ import ConversationEmptyIcon from '@/assets/images/conversation/conversationEmpt import type { ChatItem } from '@/components/Chat/types' import CustomSelect from '@/components/CustomSelect' import AiPromptVariableModal from './AiPromptVariableModal' +import { type SSEMessage } from '@/utils/stream' +import Editor from './Editor' interface AiPromptModalProps { refresh: (value: string) => void; @@ -35,7 +37,8 @@ const AiPromptModal = forwardRef(({ const [variables, setVariables] = useState([]) const [promptSession, setPromptSession] = useState(null) const aiPromptVariableModalRef = useRef(null) - const currentPromptRef = useRef(null) + const editorRef = useRef(null) + const currentPromptValueRef = useRef('') const values = Form.useWatch([], form) @@ -78,16 +81,45 @@ const AiPromptModal = forwardRef(({ setChatList(prev => { return [...prev, { role: 'user', content: messageContent}] }) - form.setFieldsValue({ message: undefined }) - updatePromptMessages(promptSession, values) - .then(res => { - const response = res as { prompt: string; desc: string; variables: string[] } - form.setFieldsValue({ current_prompt: response.prompt }) - setChatList(prev => { - return [...prev, { role: 'assistant', content: response.desc }] - }) - setVariables(response.variables) + form.setFieldsValue({ message: undefined, current_prompt: undefined }) + + const handleStreamMessage = (data: SSEMessage[]) => { + data.map(item => { + const { content, desc, variables } = item.data as { content: string; desc: string; variables: string[] }; + + switch (item.event) { + case 'start': + currentPromptValueRef.current = '' + break; + case 'message': + if (content) { + currentPromptValueRef.current += content; + form.setFieldsValue({ current_prompt: currentPromptValueRef.current }) + } + if (desc) { + setChatList(prev => { + return [...prev, { role: 'assistant', content: desc }] + }) + } + if (variables) { + setVariables(variables) + } + break; + case 'end': + setLoading(false) + break + } }) + }; + updatePromptMessages(promptSession, values, handleStreamMessage) + // .then(res => { + // const response = res as { prompt: string; desc: string; variables: string[] } + // form.setFieldsValue({ current_prompt: response.prompt }) + // setChatList(prev => { + // return [...prev, { role: 'assistant', content: response.desc }] + // }) + // setVariables(response.variables) + // }) .finally(() => { setLoading(false) }) @@ -101,18 +133,8 @@ const AiPromptModal = forwardRef(({ aiPromptVariableModalRef.current?.handleOpen() } const handleVariableApply = (value: string) => { - const textArea = currentPromptRef.current?.resizableTextArea?.textArea - if (textArea) { - const cursorPosition = textArea.selectionStart - const currentValue = values.current_prompt || '' - const newValue = currentValue.slice(0, cursorPosition) + value + currentValue.slice(cursorPosition) - form.setFieldValue('current_prompt', newValue) - - // 设置新的光标位置 - setTimeout(() => { - textArea.focus() - textArea.setSelectionRange(cursorPosition + value.length, cursorPosition + value.length) - }, 0) + if (editorRef.current?.insertText) { + editorRef.current.insertText(value) } else { form.setFieldValue('current_prompt', (values.current_prompt || '') + value) } @@ -191,7 +213,11 @@ const AiPromptModal = forwardRef(({ - + form.setFieldValue('current_prompt', value)} + />
diff --git a/web/src/views/ApplicationConfig/components/Editor/index.tsx b/web/src/views/ApplicationConfig/components/Editor/index.tsx new file mode 100644 index 00000000..d381e003 --- /dev/null +++ b/web/src/views/ApplicationConfig/components/Editor/index.tsx @@ -0,0 +1,91 @@ +import {forwardRef, useImperativeHandle } from 'react'; +import clsx from 'clsx'; +import { LexicalComposer } from '@lexical/react/LexicalComposer'; +import { RichTextPlugin } from '@lexical/react/LexicalRichTextPlugin'; +import { ContentEditable } from '@lexical/react/LexicalContentEditable'; +import { LexicalErrorBoundary } from '@lexical/react/LexicalErrorBoundary'; +import { $getSelection } from 'lexical'; +import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext'; +import InitialValuePlugin from './plugin/InitialValuePlugin' +import LineBreakPlugin from './plugin/LineBreakPlugin'; +import InsertTextPlugin from './plugin/InsertTextPlugin'; + +export interface EditorRef { + insertText: (text: string) => void; +} + +interface LexicalEditorProps { + className?: string; + placeholder?: string; + value?: string; + onChange?: (value: string) => void; + height?: number; +} + +const theme = { + paragraph: 'editor-paragraph', + text: { + bold: 'editor-text-bold', + italic: 'editor-text-italic', + }, +}; + +const EditorContent = forwardRef(({ + className = '', + value, + placeholder = "请输入内容...", + onChange, +}, ref) => { + const [editor] = useLexicalComposerContext(); + + useImperativeHandle(ref, () => ({ + insertText: (text: string) => { + editor.update(() => { + const selection = $getSelection(); + if (selection) { + selection.insertText(text); + } + }); + } + }), [editor]); + + return ( +
+ + } + placeholder={ +
+ {placeholder} +
+ } + ErrorBoundary={LexicalErrorBoundary} + /> + + + +
+ ); +}); + +const Editor = forwardRef((props, ref) => { + const initialConfig = { + namespace: 'Editor', + theme, + nodes: [], + onError: (error: Error) => { + console.error(error); + }, + }; + + return ( + + + + ); +}); + +export default Editor; \ No newline at end of file diff --git a/web/src/views/ApplicationConfig/components/Editor/plugin/InitialValuePlugin.tsx b/web/src/views/ApplicationConfig/components/Editor/plugin/InitialValuePlugin.tsx new file mode 100644 index 00000000..b1054055 --- /dev/null +++ b/web/src/views/ApplicationConfig/components/Editor/plugin/InitialValuePlugin.tsx @@ -0,0 +1,25 @@ +import { type FC, useEffect } from 'react'; +import { $getRoot, $createParagraphNode, $createTextNode } from 'lexical'; +import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext'; + +// 设置初始值的插件 +const InitialValuePlugin: FC<{ value?: string }> = ({ value }) => { + const [editor] = useLexicalComposerContext(); + + useEffect(() => { + if (value) { + editor.update(() => { + const root = $getRoot(); + root.clear(); + const paragraph = $createParagraphNode(); + const textNode = $createTextNode(value); + paragraph.append(textNode); + root.append(paragraph); + }); + } + }, [editor, value]); + + return null; +}; + +export default InitialValuePlugin \ No newline at end of file diff --git a/web/src/views/ApplicationConfig/components/Editor/plugin/InsertTextPlugin.tsx b/web/src/views/ApplicationConfig/components/Editor/plugin/InsertTextPlugin.tsx new file mode 100644 index 00000000..ca75c393 --- /dev/null +++ b/web/src/views/ApplicationConfig/components/Editor/plugin/InsertTextPlugin.tsx @@ -0,0 +1,24 @@ +import { forwardRef, useImperativeHandle } from 'react'; +import { $getSelection } from 'lexical'; +import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext'; +import type { EditorRef } from '../index' + +// 插入文本的插件 +const InsertTextPlugin = forwardRef((_, ref) => { + const [editor] = useLexicalComposerContext(); + + useImperativeHandle(ref, () => ({ + insertText: (text: string) => { + editor.update(() => { + const selection = $getSelection(); + if (selection) { + selection.insertText(text); + } + }); + } + }), [editor]); + + return null; +}); + +export default InsertTextPlugin; \ No newline at end of file diff --git a/web/src/views/ApplicationConfig/components/Editor/plugin/LineBreakPlugin.tsx b/web/src/views/ApplicationConfig/components/Editor/plugin/LineBreakPlugin.tsx new file mode 100644 index 00000000..63d1ffc4 --- /dev/null +++ b/web/src/views/ApplicationConfig/components/Editor/plugin/LineBreakPlugin.tsx @@ -0,0 +1,24 @@ +import { type FC, useEffect } from 'react'; +import { $getRoot } from 'lexical'; +import { useLexicalComposerContext } from '@lexical/react/LexicalComposerContext'; + +// 处理换行的插件 +const LineBreakPlugin: FC<{ onChange?: (value: string) => void }> = ({ onChange }) => { + const [editor] = useLexicalComposerContext(); + + useEffect(() => { + return editor.registerUpdateListener(({ editorState }) => { + editorState.read(() => { + const root = $getRoot(); + const textContent = root.getTextContent(); + // 将\n转换为实际换行 + const processedContent = textContent.replace(/\\n/g, '\n'); + onChange?.(processedContent); + }); + }); + }, [editor, onChange]); + + return null; +}; + +export default LineBreakPlugin; \ No newline at end of file diff --git a/web/src/views/Workflow/constant.ts b/web/src/views/Workflow/constant.ts index c398eb70..babc8614 100644 --- a/web/src/views/Workflow/constant.ts +++ b/web/src/views/Workflow/constant.ts @@ -394,7 +394,8 @@ export const nodeLibrary: NodeLibrary[] = [ defaultValue: {} }, retry: { - type: 'define', + type: 'switch', + defaultValue: false }, error_handle: { type: 'define', From 9d0622b6cc0b8631a397dcc866613fba89a6895e Mon Sep 17 00:00:00 2001 From: zhaoying Date: Tue, 6 Jan 2026 15:36:25 +0800 Subject: [PATCH 2/6] feat(web): user summary api update --- web/src/i18n/en.ts | 2 + web/src/i18n/zh.ts | 2 + .../UserMemoryDetail/components/AboutMe.tsx | 40 ++++++++++++++++--- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/web/src/i18n/en.ts b/web/src/i18n/en.ts index d186368b..630c6c7e 100644 --- a/web/src/i18n/en.ts +++ b/web/src/i18n/en.ts @@ -1218,6 +1218,8 @@ export const en = { key_findings: 'Key Findings', behavior_pattern: 'Behavior Pattern', growth_trajectory: 'Growth Trajectory', + personality: 'Personality Traits', + core_values: 'Core Values', }, space: { createSpace: 'Create Space', diff --git a/web/src/i18n/zh.ts b/web/src/i18n/zh.ts index 028bd1df..b50ed1d8 100644 --- a/web/src/i18n/zh.ts +++ b/web/src/i18n/zh.ts @@ -1299,6 +1299,8 @@ export const zh = { key_findings: '关键发现', behavior_pattern: '行为模式', growth_trajectory: '成长轨迹', + personality: '性格特点', + core_values: '核心价值观', }, space: { createSpace: '创建空间', diff --git a/web/src/views/UserMemoryDetail/components/AboutMe.tsx b/web/src/views/UserMemoryDetail/components/AboutMe.tsx index ba7e68fe..f2c94814 100644 --- a/web/src/views/UserMemoryDetail/components/AboutMe.tsx +++ b/web/src/views/UserMemoryDetail/components/AboutMe.tsx @@ -5,16 +5,25 @@ import { Skeleton } from 'antd'; import RbCard from '@/components/RbCard/Card' import Empty from '@/components/Empty'; +import RbAlert from '@/components/RbAlert'; import { getUserSummary, } from '@/api/memory' import type { AboutMeRef } from '../types' + +interface Data { + user_summary: string; + personality: string; + core_values: string; + one_sentence: string; + [key: string]: string; +} const AboutMe = forwardRef((_props, ref) => { const { t } = useTranslation() const { id } = useParams() const [loading, setLoading] = useState(false) - const [data, setData] = useState(null) + const [data, setData] = useState({} as Data) useEffect(() => { if (!id) return @@ -27,7 +36,7 @@ const AboutMe = forwardRef((_props, ref) => { setLoading(true) getUserSummary(id) .then((res) => { - setData((res as { summary?: string }).summary || null) + setData((res as Data) || null) }) .finally(() => { setLoading(false) @@ -44,10 +53,29 @@ const AboutMe = forwardRef((_props, ref) => { > {loading ? - : data - ?
- {data || '-'} -
+ : Object.keys(data).filter(key => data[key] !== null).length > 0 + ? <> + {data.user_summary && +
+ {data.user_summary} +
+ } + {data.personality && <> +
{t('userMemory.personality')}
+
+ {data.personality} +
+ } + {data.core_values && <> +
{t('userMemory.core_values')}
+
+ {data.core_values} +
+ } + {data.one_sentence && + {data.one_sentence} + } + : } From a940717ed009b0e675d7a7fe99135e1fd551fc88 Mon Sep 17 00:00:00 2001 From: Mark Date: Tue, 6 Jan 2026 17:11:52 +0800 Subject: [PATCH 3/6] [add] publish and share run add workflow type app --- .../controllers/public_share_controller.py | 74 +++- api/app/services/app_chat_service.py | 378 ++++++------------ api/app/services/app_service.py | 22 + api/app/services/multi_agent_orchestrator.py | 1 + api/app/services/workflow_service.py | 3 +- api/app/utils/app_config_utils.py | 28 +- 6 files changed, 249 insertions(+), 257 deletions(-) diff --git a/api/app/controllers/public_share_controller.py b/api/app/controllers/public_share_controller.py index 5fe4eb56..adb199fb 100644 --- a/api/app/controllers/public_share_controller.py +++ b/api/app/controllers/public_share_controller.py @@ -1,4 +1,5 @@ import hashlib +import json import uuid from typing import Annotated from fastapi import APIRouter, Depends, Query, Request @@ -18,7 +19,7 @@ from app.services.conversation_service import ConversationService from app.services.release_share_service import ReleaseShareService from app.services.shared_chat_service import SharedChatService from app.services.app_chat_service import AppChatService, get_app_chat_service -from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release +from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release router = APIRouter(prefix="/public/share", tags=["Public Share"]) logger = get_business_logger() @@ -288,7 +289,7 @@ async def chat( password = None # Token 认证不需要密码 # end_user_id = user_id other_id = user_id - + # 提前验证和准备(在流式响应开始前完成) # 这样可以确保错误能正确返回,而不是在流式响应中间出错 from app.models.app_model import AppType @@ -364,6 +365,9 @@ async def chat( config = release.config or {} if not config.get("sub_agents"): raise BusinessException("多 Agent 应用未配置子 Agent", BizCode.AGENT_CONFIG_MISSING) + elif app_type == AppType.WORKFLOW: + # Multi-Agent 类型:验证多 Agent 配置 + pass else: raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) @@ -469,6 +473,7 @@ async def chat( ) return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) elif app_type == AppType.MULTI_AGENT: + # config = workflow_config_4_app_release(release) config = multi_agent_config_4_app_release(release) if payload.stream: async def event_generator(): @@ -553,8 +558,71 @@ async def chat( # ) # return success(data=conversation_schema.ChatResponse(**result)) + elif app_type == AppType.WORKFLOW: + + config = workflow_config_4_app_release(release) + if payload.stream: + async def event_generator(): + async for event in app_chat_service.workflow_chat_stream( + + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=new_end_user.id, # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=release.app_id, + workspace_id=workspace_id + ): + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + # 转换为标准 SSE 格式(字符串) + sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" + yield sse_message + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) + + # 多 Agent 非流式返回 + result = await app_chat_service.workflow_chat( + + message=payload.message, + conversation_id=conversation.id, # 使用已创建的会话 ID + user_id=new_end_user.id, # 转换为字符串 + variables=payload.variables, + config=config, + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=release.app_id, + workspace_id=workspace_id + ) + logger.debug( + "工作流试运行返回结果", + extra={ + "result_type": str(type(result)), + "has_response": "response" in result if isinstance(result, dict) else False + } + ) + return success( + data=result, + msg="工作流任务执行成功" + ) + # return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) + else: from app.core.exceptions import BusinessException from app.core.error_codes import BizCode raise BusinessException(f"不支持的应用类型: {app_type}", BizCode.APP_TYPE_NOT_SUPPORTED) - pass diff --git a/api/app/services/app_chat_service.py b/api/app/services/app_chat_service.py index 29e92cf6..6b7b3103 100644 --- a/api/app/services/app_chat_service.py +++ b/api/app/services/app_chat_service.py @@ -9,15 +9,18 @@ from fastapi import Depends from sqlalchemy.orm import Session from app.core.agent.langchain_agent import LangChainAgent +from app.core.error_codes import BizCode +from app.core.exceptions import BusinessException from app.core.logging_config import get_business_logger -from app.db import get_db -from app.models import MultiAgentConfig, AgentConfig +from app.db import get_db, get_db_context +from app.models import MultiAgentConfig, AgentConfig, WorkflowConfig from app.schemas.prompt_schema import render_prompt_message, PromptMessageRole from app.services.conversation_service import ConversationService from app.services.draft_run_service import create_knowledge_retrieval_tool, create_long_term_memory_tool from app.services.draft_run_service import create_web_search_tool from app.services.model_service import ModelApiKeyService from app.services.multi_agent_orchestrator import MultiAgentOrchestrator +from app.services.workflow_service import WorkflowService logger = get_business_logger() @@ -479,7 +482,9 @@ class AppChatService: self, message: str, conversation_id: uuid.UUID, - config: AgentConfig, + config: WorkflowConfig, + app_id: uuid.UUID, + workspace_id: uuid.UUID, user_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, web_search: bool = False, @@ -488,281 +493,158 @@ class AppChatService: user_rag_memory_id: Optional[str] = None, ) -> Dict[str, Any]: """聊天(非流式)""" + workflow_service = WorkflowService(self.db) - start_time = time.time() - config_id = None + input_data = {"message":message, "variables": variables, + "conversation_id": str(conversation_id)} + inconfig = workflow_service.get_workflow_config(app_id) - if variables is None: - variables = {} + # 2. 创建执行记录 + execution = workflow_service.create_execution( + workflow_config_id=inconfig.id, + app_id=app_id, + trigger_type="manual", + triggered_by=None, + conversation_id=conversation_id, + input_data=input_data + ) - # 获取模型配置ID - model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) - # 处理系统提示词(支持变量替换) - system_prompt = config.get("system_prompt", "") - if variables: - system_prompt_rendered = render_prompt_message( - system_prompt, - PromptMessageRole.USER, - variables + # 3. 构建工作流配置字典 + workflow_config_dict = { + "nodes": config.nodes, + "edges": config.edges, + "variables": config.variables, + "execution_config": config.execution_config + } + + # 4. 获取工作空间 ID(从 app 获取) + + # 5. 执行工作流 + from app.core.workflow.executor import execute_workflow + + try: + # 更新状态为运行中 + workflow_service.update_execution_status(execution.execution_id, "running") + + result = await execute_workflow( + workflow_config=workflow_config_dict, + input_data=input_data, + execution_id=execution.execution_id, + workspace_id=str(workspace_id), + user_id=user_id ) - system_prompt = system_prompt_rendered.get_text_content() or system_prompt - # 准备工具列表 - tools = [] - - # 添加知识库检索工具 - knowledge_retrieval = config.get("knowledge_retrieval") - if knowledge_retrieval: - knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) - kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] - if kb_ids: - kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) - tools.append(kb_tool) - - # 添加长期记忆工具 - memory_flag = False - if memory == True: - memory_config = config.get("memory", {}) - if memory_config.get("enabled") and user_id: - memory_flag = True - memory_tool = create_long_term_memory_tool(memory_config, user_id) - tools.append(memory_tool) - - web_tools = config.get("tools") - web_search_choice = web_tools.get("web_search", {}) - web_search_enable = web_search_choice.get("enabled", False) - if web_search == True: - if web_search_enable == True: - search_tool = create_web_search_tool({}) - tools.append(search_tool) - - logger.debug( - "已添加网络搜索工具", - extra={ - "tool_count": len(tools) - } + # 更新执行结果 + if result.get("status") == "completed": + workflow_service.update_execution_status( + execution.execution_id, + "completed", + output_data=result.get("node_outputs", {}) + ) + else: + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=result.get("error") ) - # 获取模型参数 - model_parameters = config.get("model_parameters", {}) + # 返回增强的响应结构 + return { + "execution_id": execution.execution_id, + "status": result.get("status"), + "output": result.get("output"), # 最终输出(字符串) + "output_data": result.get("node_outputs", {}), # 所有节点输出(详细数据) + "conversation_id": result.get("conversation_id"), # 所有节点输出(详细数据)payload., # 会话 ID + "error_message": result.get("error"), + "elapsed_time": result.get("elapsed_time"), + "token_usage": result.get("token_usage") + } - # 创建 LangChain Agent - agent = LangChainAgent( - model_name=api_key_obj.model_name, - api_key=api_key_obj.api_key, - provider=api_key_obj.provider, - api_base=api_key_obj.api_base, - temperature=model_parameters.get("temperature", 0.7), - max_tokens=model_parameters.get("max_tokens", 2000), - system_prompt=system_prompt, - tools=tools, - - ) - - # 加载历史消息 - history = [] - memory_config = {"enabled": True, 'max_history': 10} - if memory_config.get("enabled"): - messages = self.conversation_service.get_messages( - conversation_id=conversation_id, - limit=memory_config.get("max_history", 10) + except Exception as e: + logger.error(f"工作流执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=str(e) + ) + raise BusinessException( + code=BizCode.INTERNAL_ERROR, + message=f"工作流执行失败: {str(e)}" ) - history = [ - {"role": msg.role, "content": msg.content} - for msg in messages - ] - - # 调用 Agent - result = await agent.chat( - message=message, - history=history, - context=None, - end_user_id=user_id, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - config_id=config_id, - memory_flag=memory_flag - ) - - # 保存消息 - self.conversation_service.save_conversation_messages( - conversation_id=conversation_id, - user_message=message, - assistant_message=result["content"] - ) - - elapsed_time = time.time() - start_time - - return { - "conversation_id": conversation_id, - "message": result["content"], - "usage": result.get("usage", { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0 - }), - "elapsed_time": elapsed_time - } async def workflow_chat_stream( self, message: str, conversation_id: uuid.UUID, - config: AgentConfig, + config: WorkflowConfig, + app_id: uuid.UUID, + workspace_id: uuid.UUID, user_id: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, web_search: bool = False, memory: bool = True, storage_type: Optional[str] = None, user_rag_memory_id: Optional[str] = None, + ) -> AsyncGenerator[str, None]: """聊天(流式)""" + workflow_service = WorkflowService(self.db) + input_data = {"message": message, "variables": variables, + "conversation_id": str(conversation_id)} + inconfig = workflow_service.get_workflow_config(app_id) + # 2. 创建执行记录 + execution = workflow_service.create_execution( + workflow_config_id=inconfig.id, + app_id=app_id, + trigger_type="manual", + triggered_by=None, + conversation_id=conversation_id, + input_data=input_data + ) + + # 3. 构建工作流配置字典 + workflow_config_dict = { + "nodes": config.nodes, + "edges": config.edges, + "variables": config.variables, + "execution_config": config.execution_config + } + + # 4. 获取工作空间 ID(从 app 获取) + + # 5. 流式执行工作流 try: - start_time = time.time() - config_id = None + # 更新状态为运行中 + workflow_service.update_execution_status(execution.execution_id, "running") - if variables is None: - variables = {} - # 获取模型配置ID - model_config_id = config.default_model_config_id - api_key_obj = ModelApiKeyService.get_a_api_key(self.db ,model_config_id) - # 处理系统提示词(支持变量替换) - system_prompt = config.get("system_prompt", "") - if variables: - system_prompt_rendered = render_prompt_message( - system_prompt, - PromptMessageRole.USER, - variables - ) - system_prompt = system_prompt_rendered.get_text_content() or system_prompt - - # 准备工具列表 - tools = [] - - # 添加知识库检索工具 - knowledge_retrieval = config.get("knowledge_retrieval") - if knowledge_retrieval: - knowledge_bases = knowledge_retrieval.get("knowledge_bases", []) - kb_ids = [kb.get("kb_id") for kb in knowledge_bases if kb.get("kb_id")] - if kb_ids: - kb_tool = create_knowledge_retrieval_tool(knowledge_retrieval, kb_ids, user_id) - tools.append(kb_tool) - - # 添加长期记忆工具 - memory_flag = False - if memory: - memory_config = config.get("memory", {}) - if memory_config.get("enabled") and user_id: - memory_flag = True - memory_tool = create_long_term_memory_tool(memory_config, user_id) - tools.append(memory_tool) - - web_tools = config.get("tools") - web_search_choice = web_tools.get("web_search", {}) - web_search_enable = web_search_choice.get("enabled", False) - if web_search == True: - if web_search_enable == True: - search_tool = create_web_search_tool({}) - tools.append(search_tool) - - logger.debug( - "已添加网络搜索工具", - extra={ - "tool_count": len(tools) - } - ) - - # 获取模型参数 - model_parameters = config.get("model_parameters", {}) - - # 创建 LangChain Agent - agent = LangChainAgent( - model_name=api_key_obj.model_name, - api_key=api_key_obj.api_key, - provider=api_key_obj.provider, - api_base=api_key_obj.api_base, - temperature=model_parameters.get("temperature", 0.7), - max_tokens=model_parameters.get("max_tokens", 2000), - system_prompt=system_prompt, - tools=tools, - streaming=True - ) - - # 加载历史消息 - history = [] - memory_config = {"enabled": True, 'max_history': 10} - if memory_config.get("enabled"): - messages = self.conversation_service.get_messages( - conversation_id=conversation_id, - limit=memory_config.get("max_history", 10) - ) - history = [ - {"role": msg.role, "content": msg.content} - for msg in messages - ] - - # 发送开始事件 - yield f"event: start\ndata: {json.dumps({'conversation_id': str(conversation_id)}, ensure_ascii=False)}\n\n" - - # 流式调用 Agent - full_content = "" - async for chunk in agent.chat_stream( - message=message, - history=history, - context=None, - end_user_id=user_id, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id, - config_id=config_id, - memory_flag=memory_flag + # 调用流式执行(executor 会发送 workflow_start 和 workflow_end 事件) + async for event in workflow_service._run_workflow_stream( + workflow_config=workflow_config_dict, + input_data=input_data, + execution_id=execution.execution_id, + workspace_id=str(workspace_id), + user_id=user_id ): - full_content += chunk - # 发送消息块事件 - yield f"event: message\ndata: {json.dumps({'content': chunk}, ensure_ascii=False)}\n\n" + # 直接转发 executor 的事件(已经是正确的格式) + yield event - elapsed_time = time.time() - start_time - - # 保存消息 - self.conversation_service.add_message( - conversation_id=conversation_id, - role="user", - content=message - ) - - self.conversation_service.add_message( - conversation_id=conversation_id, - role="assistant", - content=full_content, - meta_data={ - "model": api_key_obj.model_name, - "usage": {} - } - ) - - # 发送结束事件 - end_data = {"elapsed_time": elapsed_time, "message_length": len(full_content)} - yield f"event: end\ndata: {json.dumps(end_data, ensure_ascii=False)}\n\n" - - logger.info( - "流式聊天完成", - extra={ - "conversation_id": str(conversation_id), - "elapsed_time": elapsed_time, - "message_length": len(full_content) - } - ) - - except (GeneratorExit, asyncio.CancelledError): - # 生成器被关闭或任务被取消,正常退出 - logger.debug("流式聊天被中断") - raise except Exception as e: - logger.error(f"流式聊天失败: {str(e)}", exc_info=True) + logger.error(f"工作流流式执行失败: execution_id={execution.execution_id}, error={e}", exc_info=True) + workflow_service.update_execution_status( + execution.execution_id, + "failed", + error_message=str(e) + ) # 发送错误事件 - yield f"event: error\ndata: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n" + yield { + "event": "error", + "data": { + "execution_id": execution.execution_id, + "error": str(e) + } + } # ==================== 依赖注入函数 ==================== diff --git a/api/app/services/app_service.py b/api/app/services/app_service.py index 95bcc07a..38097c4e 100644 --- a/api/app/services/app_service.py +++ b/api/app/services/app_service.py @@ -21,6 +21,7 @@ from app.core.exceptions import ( BusinessException, ) from app.core.logging_config import get_business_logger +from app.core.workflow.validator import WorkflowValidator from app.db import get_db from app.models import App, AgentConfig, AppRelease, MultiAgentConfig, WorkflowConfig from app.models.app_model import AppStatus, AppType @@ -31,6 +32,7 @@ from app.schemas.workflow_schema import WorkflowConfigUpdate from app.services.agent_config_converter import AgentConfigConverter from app.models import AppShare, Workspace from app.services.model_service import ModelApiKeyService +from app.services.workflow_service import WorkflowService # 获取业务日志器 logger = get_business_logger() @@ -1225,6 +1227,26 @@ class AppService: "orchestration_mode": multi_agent_cfg.orchestration_mode } ) + elif app.type == AppType.WORKFLOW: + service = WorkflowService(self.db) + workflow_cfg = service.get_workflow_config(app_id) + if not workflow_cfg: + raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING) + + config = { + "nodes": workflow_cfg.nodes, + "edges": workflow_cfg.edges, + "variables": workflow_cfg.variables, + "execution_config": workflow_cfg.execution_config, + "triggers": workflow_cfg.triggers + } + + is_valid, errors = WorkflowValidator.validate_for_publish(config) + if not is_valid: + raise BusinessException("应用缺少有效配置,无法发布", BizCode.CONFIG_MISSING) + logger.info( + "应用发布配置准备完成" + ) now = datetime.datetime.now() version = self._get_next_version(app_id) diff --git a/api/app/services/multi_agent_orchestrator.py b/api/app/services/multi_agent_orchestrator.py index 08ae7e57..fd3ce229 100644 --- a/api/app/services/multi_agent_orchestrator.py +++ b/api/app/services/multi_agent_orchestrator.py @@ -1293,6 +1293,7 @@ class MultiAgentOrchestrator: conversation_id: 会话 ID user_id: 用户 ID + Returns: 执行结果 """ diff --git a/api/app/services/workflow_service.py b/api/app/services/workflow_service.py index d96efdf7..68d6279b 100644 --- a/api/app/services/workflow_service.py +++ b/api/app/services/workflow_service.py @@ -17,6 +17,7 @@ from app.core.workflow.validator import validate_workflow_config from app.db import get_db, get_db_context from app.models.workflow_model import WorkflowConfig, WorkflowExecution from app.repositories.end_user_repository import EndUserRepository +from app.services.multi_agent_service import convert_uuids_to_str from app.repositories.workflow_repository import ( WorkflowConfigRepository, WorkflowExecutionRepository, @@ -364,7 +365,7 @@ class WorkflowService: execution.status = status if output_data is not None: - execution.output_data = output_data + execution.output_data = convert_uuids_to_str(output_data) if error_message is not None: execution.error_message = error_message if error_node_id is not None: diff --git a/api/app/utils/app_config_utils.py b/api/app/utils/app_config_utils.py index 4fe692c1..97e64214 100644 --- a/api/app/utils/app_config_utils.py +++ b/api/app/utils/app_config_utils.py @@ -8,7 +8,7 @@ import uuid from typing import Dict, Any, Optional from datetime import datetime -from app.models import AppRelease +from app.models import AppRelease, WorkflowConfig from app.models.agent_app_config_model import AgentConfig from app.models.multi_agent_model import MultiAgentConfig @@ -28,7 +28,7 @@ class AgentConfigProxy: def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: config_dict = release.config - + agent_config = AgentConfig( app_id=release.app_id, system_prompt=config_dict.get("system_prompt"), @@ -45,10 +45,10 @@ def agent_config_4_app_release(release: AppRelease ) -> AgentConfig: def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: config_dict = release.config - + agent_config = MultiAgentConfig( - app_id=release.app_id, + app_id=release.app_id, default_model_config_id=release.default_model_config_id, model_parameters=config_dict.get("model_parameters"), master_agent_id=config_dict.get("master_agent_id"), @@ -58,11 +58,29 @@ def multi_agent_config_4_app_release(release: AppRelease ) -> MultiAgentConfig: routing_rules=config_dict.get("routing_rules"), execution_config=config_dict.get("execution_config", {}), aggregation_strategy=config_dict.get("aggregation_strategy", "merge"), - + ) return agent_config +def workflow_config_4_app_release(release: AppRelease ) -> WorkflowConfig: + + config_dict = release.config + + + config = WorkflowConfig( + id=release.id, + app_id=release.app_id, + nodes=config_dict.get("nodes", []), + edges=config_dict.get("edges", []), + variables=config_dict.get("variables", []), + execution_config=config_dict.get("execution_config", {}), + triggers=config_dict.get("triggers", []) + + ) + + return config + def dict_to_multi_agent_config(config_dict: Dict[str, Any], app_id: Optional[uuid.UUID] = None): """Convert dict to MultiAgentConfig model object From 6783375a140b38e22b2f96228da0f171ea8f7d1c Mon Sep 17 00:00:00 2001 From: Mark Date: Tue, 6 Jan 2026 17:55:42 +0800 Subject: [PATCH 4/6] [fix] model_parameters --- .../controllers/service/app_api_controller.py | 49 +++++++++++++------ api/app/services/agent_config_converter.py | 7 ++- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/api/app/controllers/service/app_api_controller.py b/api/app/controllers/service/app_api_controller.py index 5a78a28b..54af0b57 100644 --- a/api/app/controllers/service/app_api_controller.py +++ b/api/app/controllers/service/app_api_controller.py @@ -1,4 +1,5 @@ """App 服务接口 - 基于 API Key 认证""" +import json from typing import Annotated from fastapi import APIRouter, Depends, Request, Body @@ -21,7 +22,7 @@ from app.schemas.api_key_schema import ApiKeyAuth from app.services import workspace_service from app.services.app_chat_service import AppChatService, get_app_chat_service from app.services.conversation_service import ConversationService, get_conversation_service -from app.utils.app_config_utils import dict_to_multi_agent_config, dict_to_workflow_config, agent_config_4_app_release, multi_agent_config_4_app_release +from app.utils.app_config_utils import dict_to_multi_agent_config, workflow_config_4_app_release, agent_config_4_app_release, multi_agent_config_4_app_release from app.services.app_service import get_app_service, AppService router = APIRouter(prefix="/app", tags=["V1 - App API"]) @@ -226,22 +227,29 @@ async def chat( return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) elif app_type == AppType.WORKFLOW: # 多 Agent 流式返回 - config = dict_to_workflow_config(app.current_release.config,app.id) + config = workflow_config_4_app_release(app.current_release) if payload.stream: async def event_generator(): async for event in app_chat_service.workflow_chat_stream( message=payload.message, conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=end_user_id, # 转换为字符串 + user_id=new_end_user.id, # 转换为字符串 variables=payload.variables, config=config, - web_search=web_search, - memory=memory, - storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id + web_search=payload.web_search, + memory=payload.memory, + storage_type=storage_type, + user_rag_memory_id=user_rag_memory_id, + app_id=app.app_id, + workspace_id=workspace_id ): - yield event + event_type = event.get("event", "message") + event_data = event.get("data", {}) + + # 转换为标准 SSE 格式(字符串) + sse_message = f"event: {event_type}\ndata: {json.dumps(event_data)}\n\n" + yield sse_message return StreamingResponse( event_generator(), @@ -253,21 +261,32 @@ async def chat( } ) - # 非流式返回 + # 多 Agent 非流式返回 result = await app_chat_service.workflow_chat( message=payload.message, conversation_id=conversation.id, # 使用已创建的会话 ID - user_id=end_user_id, # 转换为字符串 + user_id=new_end_user.id, # 转换为字符串 variables=payload.variables, config=config, - web_search=web_search, - memory=memory, + web_search=payload.web_search, + memory=payload.memory, storage_type=storage_type, - user_rag_memory_id=user_rag_memory_id + user_rag_memory_id=user_rag_memory_id, + app_id=app.app_id, + workspace_id=workspace_id + ) + logger.debug( + "工作流试运行返回结果", + extra={ + "result_type": str(type(result)), + "has_response": "response" in result if isinstance(result, dict) else False + } + ) + return success( + data=result, + msg="工作流任务执行成功" ) - - return success(data=conversation_schema.ChatResponse(**result).model_dump(mode="json")) else: from app.core.exceptions import BusinessException from app.core.error_codes import BizCode diff --git a/api/app/services/agent_config_converter.py b/api/app/services/agent_config_converter.py index 262c1c04..3ab14157 100644 --- a/api/app/services/agent_config_converter.py +++ b/api/app/services/agent_config_converter.py @@ -86,7 +86,12 @@ class AgentConfigConverter: # 1. 解析模型参数配置 if model_parameters: from app.schemas.app_schema import ModelParameters - result["model_parameters"] = ModelParameters(**model_parameters) + if isinstance(model_parameters, ModelParameters): + result["model_parameters"] = model_parameters + elif isinstance(model_parameters, dict): + result["model_parameters"] = ModelParameters(**model_parameters) + else: + result["model_parameters"] = ModelParameters() # 2. 解析知识库检索配置 if knowledge_retrieval: From 3183f3953581442b26e111677b082e1fee835c0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=90=E5=8A=9B=E9=BD=90?= <162269739+lanceyq@users.noreply.github.com> Date: Tue, 6 Jan 2026 18:03:28 +0800 Subject: [PATCH 5/6] Fix/Restore user information archive and one-sentence summary (#37) * [fix]fix memory insights * [fix]fix memory insights * [fix]Based on the correction of the code by sourcery-ai * [fix]Restore user information archive and one-sentence summary --- .../controllers/user_memory_controllers.py | 3 +- .../utils/prompt/prompts/user_summary.jinja2 | 32 +++++++------------ api/app/services/user_memory_service.py | 22 +++++++++++++ 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index c54ebbd1..da12cbf6 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -11,6 +11,7 @@ from app.db import get_db from app.core.logging_config import get_api_logger from app.core.response_utils import success, fail from app.core.error_codes import BizCode +from app.core.api_key_utils import timestamp_to_datetime from app.services.user_memory_service import ( UserMemoryService, analytics_memory_types, @@ -356,7 +357,7 @@ async def update_end_user_profile( if 'hire_date' in update_data: hire_date_timestamp = update_data['hire_date'] if hire_date_timestamp is not None: - update_data['hire_date'] = UserMemoryService.timestamp_to_datetime(hire_date_timestamp) + update_data['hire_date'] = timestamp_to_datetime(hire_date_timestamp) # 如果是 None,保持 None(允许清空) for field, value in update_data.items(): diff --git a/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 b/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 index 373ab31e..2f452c53 100644 --- a/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 +++ b/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 @@ -85,33 +85,21 @@ Example Output: ===End of Example=== -===Reflection Process=== +===Internal Quality Checks (DO NOT OUTPUT)=== -After generating the profile, perform the following self-review steps: +Before generating your final output, internally verify: +1. All content is grounded in provided data (no fabrication) +2. Format follows the specified structure with correct headers +3. Tone is objective, third-person, and neutral +4. All four sections are complete and within character limits -**Step 1: Data Grounding Check** -- Verify all statements are supported by the provided entities and statements -- Ensure no fabricated or speculated information is included -- Confirm all claims can be traced back to the input data - -**Step 2: Format Compliance** -- Verify each section follows the specified format with section headers -- Check character count limits for each section -- Ensure proper use of section markers (【】) - -**Step 3: Tone and Style Review** -- Confirm objective third-person perspective is maintained -- Check for excessive adjectives or empty phrases -- Verify neutral and restrained tone throughout - -**Step 4: Completeness Check** -- Ensure all four sections are present and complete -- Verify each section addresses its specific focus area -- Confirm the one-sentence summary effectively captures the user's essence +**IMPORTANT: These checks are for your internal use only. DO NOT include them in your output.** ===Output Requirements=== +**CRITICAL: Your response must ONLY contain the four sections below. Do not include any reflection, self-review, or meta-commentary.** + **LANGUAGE REQUIREMENT:** - The output language should ALWAYS be Chinese (Simplified) - All section content must be in Chinese @@ -122,3 +110,5 @@ After generating the profile, perform the following self-review steps: - Content follows immediately after the header - Sections are separated by blank lines - Strictly adhere to character limits for each section +- **DO NOT include any text after the 【一句话总结】 section** +- **DO NOT output reflection steps, self-review, or verification notes** diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 6fd72fce..40851835 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -1054,6 +1054,28 @@ async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str, core_values = core_values_match.group(1).strip() if core_values_match else "" one_sentence = one_sentence_match.group(1).strip() if one_sentence_match else "" + # 6) 清理可能包含的反思内容(防御性编程) + # 如果 LLM 仍然输出了反思内容,在这里过滤掉 + def clean_reflection_content(text: str) -> str: + """移除可能包含的反思内容""" + if not text: + return text + # 移除 "---" 之后的所有内容(通常是反思部分的开始) + if '---' in text: + text = text.split('---')[0].strip() + # 移除 "**Step" 开头的内容 + if '**Step' in text: + text = text.split('**Step')[0].strip() + # 移除 "Self-Review" 相关内容 + if 'Self-Review' in text or 'self-review' in text: + text = re.sub(r'[\-\*]*\s*Self-Review.*$', '', text, flags=re.IGNORECASE | re.DOTALL).strip() + return text + + user_summary = clean_reflection_content(user_summary) + personality = clean_reflection_content(personality) + core_values = clean_reflection_content(core_values) + one_sentence = clean_reflection_content(one_sentence) + return { "user_summary": user_summary, "personality": personality, From eabaae4a8f375e99f26f366fafa8957e8c795859 Mon Sep 17 00:00:00 2001 From: Mark Date: Tue, 6 Jan 2026 18:58:36 +0800 Subject: [PATCH 6/6] [fix] model parameter error --- api/app/controllers/app_controller.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/api/app/controllers/app_controller.py b/api/app/controllers/app_controller.py index 698f061d..d8479f97 100644 --- a/api/app/controllers/app_controller.py +++ b/api/app/controllers/app_controller.py @@ -728,9 +728,23 @@ async def draft_run_compare( from app.core.exceptions import ResourceNotFoundException raise ResourceNotFoundException("模型配置", str(model_item.model_config_id)) + # 获取 agent_cfg.model_parameters,如果是 ModelParameters 对象则转为字典 + agent_model_params = agent_cfg.model_parameters + if hasattr(agent_model_params, 'model_dump'): + agent_model_params = agent_model_params.model_dump() + elif not isinstance(agent_model_params, dict): + agent_model_params = {} + + # 获取 model_item.model_parameters,如果是 ModelParameters 对象则转为字典 + item_model_params = model_item.model_parameters + if hasattr(item_model_params, 'model_dump'): + item_model_params = item_model_params.model_dump() + elif not isinstance(item_model_params, dict): + item_model_params = {} + merged_parameters = { - **(agent_cfg.model_parameters or {}), - **(model_item.model_parameters or {}) + **(agent_model_params or {}), + **(item_model_params or {}) } model_configs.append({