From a2df14f6586541b6f63888b8d0a5d6412271a5c3 Mon Sep 17 00:00:00 2001 From: zhaoying Date: Tue, 21 Apr 2026 15:00:28 +0800 Subject: [PATCH] fix(web): stream support abort --- web/src/api/application.ts | 12 ++-- web/src/api/memory.ts | 4 +- web/src/api/prompt.ts | 4 +- web/src/utils/stream.ts | 60 +++++++++++-------- .../ApplicationConfig/TestChat/index.tsx | 9 ++- .../components/AiPromptModal.tsx | 13 ++-- .../ApplicationConfig/components/Chat.tsx | 10 +++- web/src/views/Conversation/index.tsx | 9 ++- .../components/Result.tsx | 15 ++++- web/src/views/Prompt/index.tsx | 12 +++- .../views/Workflow/components/Chat/Chat.tsx | 9 ++- .../Workflow/components/Properties/index.tsx | 6 +- 12 files changed, 109 insertions(+), 54 deletions(-) diff --git a/web/src/api/application.ts b/web/src/api/application.ts index 5614232e..6965f363 100644 --- a/web/src/api/application.ts +++ b/web/src/api/application.ts @@ -53,12 +53,12 @@ export const saveWorkflowConfig = (app_id: string, values: WorkflowConfig) => { return request.put(`/apps/${app_id}/workflow`, values) } // Model comparison test run -export const runCompare = (app_id: string, values: Record, onMessage?: (data: SSEMessage[]) => void) => { - return handleSSE(`/apps/${app_id}/draft/run/compare`, values, onMessage) +export const runCompare = (app_id: string, values: Record, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => { + return handleSSE(`/apps/${app_id}/draft/run/compare`, values, onMessage, undefined, onAbort) } // Test run -export const draftRun = (app_id: string, values: Record, onMessage?: (data: SSEMessage[]) => void) => { - return handleSSE(`/apps/${app_id}/draft/run`, values, onMessage) +export const draftRun = (app_id: string, values: Record, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => { + return handleSSE(`/apps/${app_id}/draft/run`, values, onMessage, undefined, onAbort) } // Delete application export const deleteApplication = (app_id: string) => { @@ -93,12 +93,12 @@ export const getConversationHistory = (share_token: string, data: { page: number }) } // Send conversation -export const sendConversation = (values: QueryParams, onMessage: (data: SSEMessage[]) => void, shareToken: string) => { +export const sendConversation = (values: QueryParams, onMessage: (data: SSEMessage[]) => void, shareToken: string, onAbort?: (abort: () => void) => void) => { return handleSSE(`/public/share/chat`, values, onMessage, { headers: { 'Authorization': `Bearer ${shareToken}` } - }) + }, onAbort) } // Get conversation details export const getConversationDetail = (share_token: string, conversation_id: string) => { diff --git a/web/src/api/memory.ts b/web/src/api/memory.ts index 077cdf53..77801c63 100644 --- a/web/src/api/memory.ts +++ b/web/src/api/memory.ts @@ -274,8 +274,8 @@ export const updateMemoryExtractionConfig = (values: ExtractionConfigForm) => { return request.post('/memory-storage/update_config_extracted', values) } // Memory Extraction Engine - Pilot run -export const pilotRunMemoryExtractionConfig = (values: { config_id: number | string; dialogue_text: string; custom_text?: string; }, onMessage?: (data: SSEMessage[]) => void) => { - return handleSSE('/memory-storage/pilot_run', values, onMessage) +export const pilotRunMemoryExtractionConfig = (values: { config_id: number | string; dialogue_text: string; custom_text?: string; }, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => { + return handleSSE('/memory-storage/pilot_run', values, onMessage, undefined, onAbort) } // Emotion Engine - Get configuration export const getMemoryEmotionConfig = (config_id: number | string) => { diff --git a/web/src/api/prompt.ts b/web/src/api/prompt.ts index 55398ca5..ea641c56 100644 --- a/web/src/api/prompt.ts +++ b/web/src/api/prompt.ts @@ -14,8 +14,8 @@ export const createPromptSessions = () => { return request.post(`/prompt/sessions`) } // Get prompt optimization -export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void) => { - return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage) +export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void, config?: any, onAbort?: (abort: () => void) => void) => { + return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage, config, onAbort) } // Prompt release list export const getPromptReleaseListUrl = '/prompt/releases/list' diff --git a/web/src/utils/stream.ts b/web/src/utils/stream.ts index ba966159..77459120 100644 --- a/web/src/utils/stream.ts +++ b/web/src/utils/stream.ts @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-02 16:35:43 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-18 14:32:40 + * @Last Modified time: 2026-04-21 14:20:39 */ /** * Server-Sent Events (SSE) Stream Utility Module @@ -148,7 +148,7 @@ function parseDataContent(dataContent: string): string | object { * @param config - Additional request configuration * @returns Fetch response */ -const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }) => { +const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }, signal?: AbortSignal) => { return fetch(`${API_PREFIX}${url}`, { method: 'POST', headers: { @@ -156,7 +156,8 @@ const makeSSERequest = async (url: string, data: any, token: string, config = { 'Authorization': `Bearer ${token}`, ...config.headers, }, - body: JSON.stringify(data) + body: JSON.stringify(data), + signal, }); }; @@ -167,10 +168,14 @@ const makeSSERequest = async (url: string, data: any, token: string, config = { * @param onMessage - Callback for each parsed message * @param config - Additional request configuration */ -export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }) => { +export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }, onAbort?: (abort: () => void) => void) => { + const controller = new AbortController(); + const abort = () => controller.abort(); + onAbort?.(abort); + try { let token = cookieUtils.get('authToken'); - let response = await makeSSERequest(url, data, token || '', config); + let response = await makeSSERequest(url, data, token || '', config, controller.signal); switch (response.status) { case 500: @@ -199,7 +204,7 @@ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMe } try { const newToken = await refreshTokenForSSE(); - response = await makeSSERequest(url, data, newToken, config); + response = await makeSSERequest(url, data, newToken, config, controller.signal); } catch (refreshError) { return; } @@ -211,30 +216,37 @@ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMe const decoder = new TextDecoder(); let buffer = ''; // Buffer for handling incomplete messages - while (true) { - const { done, value } = await reader.read(); - if (done) break; + try { + while (true) { + const { done, value } = await reader.read(); + if (done || controller.signal.aborted) break; - const chunk = decoder.decode(value, { stream: true }); - buffer += chunk; + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; - // Process complete events - const events = buffer.split('\n\n'); - buffer = events.pop() || ''; // Keep last potentially incomplete event + // Process complete events + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; // Keep last potentially incomplete event - for (const event of events) { - if (event.trim() && onMessage) { - onMessage(parseSSEToJSON(event) ?? {}); + for (const event of events) { + if (event.trim() && onMessage) { + onMessage(parseSSEToJSON(event) ?? {}); + } } } - } - // Process remaining buffer content - if (buffer.trim() && onMessage) { - onMessage(parseSSEToJSON(buffer) ?? {}); + // Process remaining buffer content + if (!controller.signal.aborted && buffer.trim() && onMessage) { + onMessage(parseSSEToJSON(buffer) ?? {}); + } + } finally { + reader.cancel(); + } + } catch (error: any) { + if (error?.name !== 'AbortError') { + console.error('Request failed:', error); + throw error; } - } catch (error) { - console.error('Request failed:', error); - throw error; } + }; \ No newline at end of file diff --git a/web/src/views/ApplicationConfig/TestChat/index.tsx b/web/src/views/ApplicationConfig/TestChat/index.tsx index bfb9b569..b62efc6b 100644 --- a/web/src/views/ApplicationConfig/TestChat/index.tsx +++ b/web/src/views/ApplicationConfig/TestChat/index.tsx @@ -92,6 +92,7 @@ const TestChat: FC = ({ const audioPollingRef = useRef>>(new Map()) const streamLoadingRef = useRef(false) const [audioStatusMap, setAudioStatusMap] = useState>({}) + const abortRef = useRef<(() => void) | null>(null) useEffect(() => { getVariables() @@ -99,6 +100,8 @@ const TestChat: FC = ({ useEffect(() => { return () => { + abortRef.current?.() + abortRef.current = null audioPollingRef.current.forEach(timer => clearInterval(timer)) audioPollingRef.current.clear() } @@ -262,7 +265,8 @@ const TestChat: FC = ({ draftRun( application.id, formatParams((msg || message) as string, conversationId, files, params), - handleStreamMessage + handleStreamMessage, + (abort) => { abortRef.current = abort } ) .catch(() => { updateErrorAssistantMessage(0) @@ -373,7 +377,8 @@ const TestChat: FC = ({ draftRun( application.id, formatParams((msg || message) as string, conversationId, files, params), - handleWorkflowStreamMessage + handleWorkflowStreamMessage, + (abort) => { abortRef.current = abort } ) .catch((error) => { const errorInfo = JSON.parse(error.message) diff --git a/web/src/views/ApplicationConfig/components/AiPromptModal.tsx b/web/src/views/ApplicationConfig/components/AiPromptModal.tsx index 1666e075..96a0c7b5 100644 --- a/web/src/views/ApplicationConfig/components/AiPromptModal.tsx +++ b/web/src/views/ApplicationConfig/components/AiPromptModal.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 16:26:44 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-20 13:53:05 + * @Last Modified time: 2026-04-21 14:50:21 */ /** * AI Prompt Assistant Modal @@ -61,11 +61,14 @@ const AiPromptModal = forwardRef(({ const aiPromptVariableModalRef = useRef(null) const editorRef = useRef(null) const currentPromptValueRef = useRef('') + const abortRef = useRef<(() => void) | null>(null) const values = Form.useWatch([], form) /** Close modal and reset state */ const handleClose = () => { + abortRef.current?.() + abortRef.current = null setVisible(false); setLoading(false) setChatList([]) @@ -148,7 +151,7 @@ const AiPromptModal = forwardRef(({ updatePromptMessages(promptSession, { ...values, skill: source === 'skills' - }, handleStreamMessage) + }, handleStreamMessage, undefined, abort => { abortRef.current = abort }) .finally(() => { setLoading(false) }) @@ -221,7 +224,7 @@ const AiPromptModal = forwardRef(({ } data={chatList || []} @@ -292,10 +295,10 @@ const AiPromptModal = forwardRef(({ {values?.current_prompt ? form.setFieldValue('current_prompt', value)} /> - : + : } diff --git a/web/src/views/ApplicationConfig/components/Chat.tsx b/web/src/views/ApplicationConfig/components/Chat.tsx index eb3a9ea0..6cf7b438 100644 --- a/web/src/views/ApplicationConfig/components/Chat.tsx +++ b/web/src/views/ApplicationConfig/components/Chat.tsx @@ -73,11 +73,14 @@ const Chat: FC = ({ const [message, setMessage] = useState(undefined) const [features, setFeatures] = useState({} as FeaturesConfigForm) const [audioStatusMap, setAudioStatusMap] = useState>({}) + const abortRef = useRef<(() => void) | null>(null) useEffect(() => { setCompareLoading(false) setLoading(false) return () => { + abortRef.current?.() + abortRef.current = null audioPollingRef.current.forEach(timer => clearInterval(timer)) audioPollingRef.current.clear() } @@ -85,6 +88,8 @@ const Chat: FC = ({ useEffect(() => { return () => { + abortRef.current?.() + abortRef.current = null audioPollingRef.current.forEach(timer => clearInterval(timer)) audioPollingRef.current.clear() } @@ -393,7 +398,7 @@ const Chat: FC = ({ parallel: true, stream: true, timeout: 60, - }, handleStreamMessage) + }, handleStreamMessage, (abort) => { abortRef.current = abort }) .catch(() => { setLoading(false) setCompareLoading(false) @@ -537,7 +542,8 @@ const Chat: FC = ({ } }), }, - handleStreamMessage + handleStreamMessage, + (abort) => { abortRef.current = abort } ) .catch(() => { setLoading(false) diff --git a/web/src/views/Conversation/index.tsx b/web/src/views/Conversation/index.tsx index 778279d3..a562aaeb 100644 --- a/web/src/views/Conversation/index.tsx +++ b/web/src/views/Conversation/index.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 16:58:03 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-04-13 18:32:58 + * @Last Modified time: 2026-04-21 14:27:15 */ /** * Conversation Page @@ -53,6 +53,7 @@ const Conversation: FC = () => { const scrollRef = useRef(null); const toolbarRef = useRef(null) const audioPollingRef = useRef>>(new Map()) + const abortRef = useRef<(() => void) | null>(null) const [shareToken, setShareToken] = useState(localStorage.getItem(`shareToken_${token}`)) const [fileList, setFileList] = useState([]) const [webSearch, setWebSearch] = useState(false) @@ -67,6 +68,8 @@ const Conversation: FC = () => { useEffect(() => { return () => { + abortRef.current?.() + abortRef.current = null audioPollingRef.current.forEach((timer) => clearInterval(timer)) audioPollingRef.current.clear() } @@ -150,6 +153,8 @@ const Conversation: FC = () => { const handleChangeHistory = (id: string | null) => { if (id !== conversation_id) setConversationId(id) if (!id) setMessage('') + abortRef.current?.() + abortRef.current = null } useEffect(() => { @@ -406,7 +411,7 @@ const Conversation: FC = () => { }), variables: params, thinking, - }, handleStreamMessage, shareToken) + }, handleStreamMessage, shareToken, (abort) => { abortRef.current = abort }) .catch(() => { setLoading(false) streamLoadingRef.current = false diff --git a/web/src/views/MemoryExtractionEngine/components/Result.tsx b/web/src/views/MemoryExtractionEngine/components/Result.tsx index 2fa8788f..c1dcf88b 100644 --- a/web/src/views/MemoryExtractionEngine/components/Result.tsx +++ b/web/src/views/MemoryExtractionEngine/components/Result.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 17:30:11 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-26 15:46:30 + * @Last Modified time: 2026-04-21 14:54:14 */ /** * Result Component @@ -10,7 +10,7 @@ * Shows text preprocessing, knowledge extraction, node/edge creation, and deduplication */ -import { type FC, useState } from 'react' +import { type FC, useState, useRef, useEffect } from 'react' import { useParams } from 'react-router-dom' import { useTranslation } from 'react-i18next' import { Space, Button, Progress, Form, Input, Flex } from 'antd' @@ -105,7 +105,14 @@ const Result: FC = ({ loading, handleSave }) => { const [runForm] = Form.useForm() const customText = Form.useWatch(['custom_text'], runForm) + const abortRef = useRef<(() => void) | null>(null) + useEffect(() => { + return () => { + abortRef.current?.() + abortRef.current = null; + } + }, []) /** Run pilot test */ const handleRun = () => { if(!id) return @@ -229,11 +236,13 @@ const Result: FC = ({ loading, handleSave }) => { }) } setRunLoading(true) + abortRef.current?.() + abortRef.current = null; pilotRunMemoryExtractionConfig({ config_id: id, dialogue_text: t('memoryExtractionEngine.exampleText'), custom_text: runForm.getFieldValue('custom_text') - }, handleStreamMessage) + }, handleStreamMessage, (abort) => { abortRef.current = abort }) .finally(() => { setRunLoading(false) }) diff --git a/web/src/views/Prompt/index.tsx b/web/src/views/Prompt/index.tsx index 0475b40a..9d90ee4b 100644 --- a/web/src/views/Prompt/index.tsx +++ b/web/src/views/Prompt/index.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 17:44:15 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-03-27 15:14:58 + * @Last Modified time: 2026-04-21 14:24:00 */ /** * Prompt Editor Component @@ -46,9 +46,17 @@ const Prompt: FC = () => { const promptSaveModalRef = useRef(null) const editorRef = useRef(null) const currentPromptValueRef = useRef(undefined) + const abortRef = useRef<(() => void) | null>(null) const values = Form.useWatch([], form) const [editVo, setEditVo] = useState(null) + useEffect(() => { + return () => { + abortRef.current?.() + abortRef.current = null + } + }, []) + useEffect(() => { setEditVo(state) }, [state]) @@ -126,7 +134,7 @@ const Prompt: FC = () => { } }) }; - updatePromptMessages((promptSession) as string, values, handleStreamMessage) + updatePromptMessages((promptSession) as string, values, handleStreamMessage, undefined, (abort) => { abortRef.current = abort }) .finally(() => { setLoading(false) }) diff --git a/web/src/views/Workflow/components/Chat/Chat.tsx b/web/src/views/Workflow/components/Chat/Chat.tsx index 19b06a0d..a6b4a2a8 100644 --- a/web/src/views/Workflow/components/Chat/Chat.tsx +++ b/web/src/views/Workflow/components/Chat/Chat.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-06 21:10:56 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-04-15 15:57:35 + * @Last Modified time: 2026-04-21 14:59:13 */ /** * Workflow Chat Component @@ -51,6 +51,7 @@ const Chat = forwardRef('draft') const toolbarRef = useRef(null) + const abortRef = useRef<(() => void) | null>(null) const [toolbarReady, setToolbarReady] = useState(false) const toolbarCallbackRef = useCallback((node: ChatToolbarRef | null) => { (toolbarRef as React.MutableRefObject).current = node @@ -65,6 +66,8 @@ const Chat = forwardRef([]) const [message, setMessage] = useState(undefined) + console.log('abortRef', abortRef) + /** * Opens the chat drawer and loads workflow variables from the start node */ @@ -116,6 +119,8 @@ const Chat = forwardRef { + abortRef.current?.() + abortRef.current = null; setOpen(false) setToolbarReady(false) setChatList([]) @@ -395,7 +400,7 @@ const Chat = forwardRef { abortRef.current = abort }) .catch((error) => { const errorInfo = JSON.parse(error.message) setChatList(prev => { diff --git a/web/src/views/Workflow/components/Properties/index.tsx b/web/src/views/Workflow/components/Properties/index.tsx index f826edd9..19b24ea4 100644 --- a/web/src/views/Workflow/components/Properties/index.tsx +++ b/web/src/views/Workflow/components/Properties/index.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2026-02-03 15:39:59 * @Last Modified by: ZhaoYing - * @Last Modified time: 2026-04-13 10:44:19 + * @Last Modified time: 2026-04-21 14:15:33 */ import { type FC, useEffect, useState, useMemo } from "react"; import clsx from 'clsx' @@ -153,7 +153,9 @@ const Properties: FC = ({ selectedNode?.setData({ ...nodeData, ...allRest, - }) + }, + // { deep: false } + ) } }, [values, selectedNode, form])