fix(web): stream support abort

This commit is contained in:
zhaoying
2026-04-21 15:00:28 +08:00
parent c50969dea4
commit a2df14f658
12 changed files with 109 additions and 54 deletions

View File

@@ -53,12 +53,12 @@ export const saveWorkflowConfig = (app_id: string, values: WorkflowConfig) => {
return request.put(`/apps/${app_id}/workflow`, values)
}
// Model comparison test run
export const runCompare = (app_id: string, values: Record<string, unknown>, onMessage?: (data: SSEMessage[]) => void) => {
return handleSSE(`/apps/${app_id}/draft/run/compare`, values, onMessage)
export const runCompare = (app_id: string, values: Record<string, unknown>, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => {
return handleSSE(`/apps/${app_id}/draft/run/compare`, values, onMessage, undefined, onAbort)
}
// Test run
export const draftRun = (app_id: string, values: Record<string, unknown>, onMessage?: (data: SSEMessage[]) => void) => {
return handleSSE(`/apps/${app_id}/draft/run`, values, onMessage)
export const draftRun = (app_id: string, values: Record<string, unknown>, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => {
return handleSSE(`/apps/${app_id}/draft/run`, values, onMessage, undefined, onAbort)
}
// Delete application
export const deleteApplication = (app_id: string) => {
@@ -93,12 +93,12 @@ export const getConversationHistory = (share_token: string, data: { page: number
})
}
// Send conversation
export const sendConversation = (values: QueryParams, onMessage: (data: SSEMessage[]) => void, shareToken: string) => {
export const sendConversation = (values: QueryParams, onMessage: (data: SSEMessage[]) => void, shareToken: string, onAbort?: (abort: () => void) => void) => {
return handleSSE(`/public/share/chat`, values, onMessage, {
headers: {
'Authorization': `Bearer ${shareToken}`
}
})
}, onAbort)
}
// Get conversation details
export const getConversationDetail = (share_token: string, conversation_id: string) => {

View File

@@ -274,8 +274,8 @@ export const updateMemoryExtractionConfig = (values: ExtractionConfigForm) => {
return request.post('/memory-storage/update_config_extracted', values)
}
// Memory Extraction Engine - Pilot run
export const pilotRunMemoryExtractionConfig = (values: { config_id: number | string; dialogue_text: string; custom_text?: string; }, onMessage?: (data: SSEMessage[]) => void) => {
return handleSSE('/memory-storage/pilot_run', values, onMessage)
export const pilotRunMemoryExtractionConfig = (values: { config_id: number | string; dialogue_text: string; custom_text?: string; }, onMessage?: (data: SSEMessage[]) => void, onAbort?: (abort: () => void) => void) => {
return handleSSE('/memory-storage/pilot_run', values, onMessage, undefined, onAbort)
}
// Emotion Engine - Get configuration
export const getMemoryEmotionConfig = (config_id: number | string) => {

View File

@@ -14,8 +14,8 @@ export const createPromptSessions = () => {
return request.post(`/prompt/sessions`)
}
// Get prompt optimization
export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void) => {
return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage)
export const updatePromptMessages = (session_id: string, data: AiPromptForm, onMessage?: (data: SSEMessage[]) => void, config?: any, onAbort?: (abort: () => void) => void) => {
return handleSSE(`/prompt/sessions/${session_id}/messages`, data, onMessage, config, onAbort)
}
// Prompt release list
export const getPromptReleaseListUrl = '/prompt/releases/list'

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-02 16:35:43
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-18 14:32:40
* @Last Modified time: 2026-04-21 14:20:39
*/
/**
* Server-Sent Events (SSE) Stream Utility Module
@@ -148,7 +148,7 @@ function parseDataContent(dataContent: string): string | object {
* @param config - Additional request configuration
* @returns Fetch response
*/
const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }) => {
const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }, signal?: AbortSignal) => {
return fetch(`${API_PREFIX}${url}`, {
method: 'POST',
headers: {
@@ -156,7 +156,8 @@ const makeSSERequest = async (url: string, data: any, token: string, config = {
'Authorization': `Bearer ${token}`,
...config.headers,
},
body: JSON.stringify(data)
body: JSON.stringify(data),
signal,
});
};
@@ -167,10 +168,14 @@ const makeSSERequest = async (url: string, data: any, token: string, config = {
* @param onMessage - Callback for each parsed message
* @param config - Additional request configuration
*/
export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }) => {
export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }, onAbort?: (abort: () => void) => void) => {
const controller = new AbortController();
const abort = () => controller.abort();
onAbort?.(abort);
try {
let token = cookieUtils.get('authToken');
let response = await makeSSERequest(url, data, token || '', config);
let response = await makeSSERequest(url, data, token || '', config, controller.signal);
switch (response.status) {
case 500:
@@ -199,7 +204,7 @@ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMe
}
try {
const newToken = await refreshTokenForSSE();
response = await makeSSERequest(url, data, newToken, config);
response = await makeSSERequest(url, data, newToken, config, controller.signal);
} catch (refreshError) {
return;
}
@@ -211,30 +216,37 @@ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMe
const decoder = new TextDecoder();
let buffer = ''; // Buffer for handling incomplete messages
while (true) {
const { done, value } = await reader.read();
if (done) break;
try {
while (true) {
const { done, value } = await reader.read();
if (done || controller.signal.aborted) break;
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
// Process complete events
const events = buffer.split('\n\n');
buffer = events.pop() || ''; // Keep last potentially incomplete event
// Process complete events
const events = buffer.split('\n\n');
buffer = events.pop() || ''; // Keep last potentially incomplete event
for (const event of events) {
if (event.trim() && onMessage) {
onMessage(parseSSEToJSON(event) ?? {});
for (const event of events) {
if (event.trim() && onMessage) {
onMessage(parseSSEToJSON(event) ?? {});
}
}
}
}
// Process remaining buffer content
if (buffer.trim() && onMessage) {
onMessage(parseSSEToJSON(buffer) ?? {});
// Process remaining buffer content
if (!controller.signal.aborted && buffer.trim() && onMessage) {
onMessage(parseSSEToJSON(buffer) ?? {});
}
} finally {
reader.cancel();
}
} catch (error: any) {
if (error?.name !== 'AbortError') {
console.error('Request failed:', error);
throw error;
}
} catch (error) {
console.error('Request failed:', error);
throw error;
}
};

View File

@@ -92,6 +92,7 @@ const TestChat: FC<TestChatProps> = ({
const audioPollingRef = useRef<Map<string, ReturnType<typeof setInterval>>>(new Map())
const streamLoadingRef = useRef(false)
const [audioStatusMap, setAudioStatusMap] = useState<Record<string, string>>({})
const abortRef = useRef<(() => void) | null>(null)
useEffect(() => {
getVariables()
@@ -99,6 +100,8 @@ const TestChat: FC<TestChatProps> = ({
useEffect(() => {
return () => {
abortRef.current?.()
abortRef.current = null
audioPollingRef.current.forEach(timer => clearInterval(timer))
audioPollingRef.current.clear()
}
@@ -262,7 +265,8 @@ const TestChat: FC<TestChatProps> = ({
draftRun(
application.id,
formatParams((msg || message) as string, conversationId, files, params),
handleStreamMessage
handleStreamMessage,
(abort) => { abortRef.current = abort }
)
.catch(() => {
updateErrorAssistantMessage(0)
@@ -373,7 +377,8 @@ const TestChat: FC<TestChatProps> = ({
draftRun(
application.id,
formatParams((msg || message) as string, conversationId, files, params),
handleWorkflowStreamMessage
handleWorkflowStreamMessage,
(abort) => { abortRef.current = abort }
)
.catch((error) => {
const errorInfo = JSON.parse(error.message)

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 16:26:44
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-20 13:53:05
* @Last Modified time: 2026-04-21 14:50:21
*/
/**
* AI Prompt Assistant Modal
@@ -61,11 +61,14 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
const aiPromptVariableModalRef = useRef<AiPromptVariableModalRef>(null)
const editorRef = useRef<any>(null)
const currentPromptValueRef = useRef<string>('')
const abortRef = useRef<(() => void) | null>(null)
const values = Form.useWatch([], form)
/** Close modal and reset state */
const handleClose = () => {
abortRef.current?.()
abortRef.current = null
setVisible(false);
setLoading(false)
setChatList([])
@@ -148,7 +151,7 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
updatePromptMessages(promptSession, {
...values,
skill: source === 'skills'
}, handleStreamMessage)
}, handleStreamMessage, undefined, abort => { abortRef.current = abort })
.finally(() => {
setLoading(false)
})
@@ -221,7 +224,7 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
</Form.Item>
<ChatContent
classNames="rb:h-105.5 rb:pb-[15px]!"
classNames="rb:h-[calc(100vh-330px)] rb:pb-[15px]!"
contentClassNames="rb:max-w-75!"
empty={<Empty url={ConversationEmptyIcon} title={t(`${source}.promptChatEmpty`)} isNeedSubTitle={false} size={[140, 100]} className="rb:h-full" />}
data={chatList || []}
@@ -292,10 +295,10 @@ const AiPromptModal = forwardRef<AiPromptModalRef, AiPromptModalProps>(({
{values?.current_prompt
? <Editor
ref={editorRef}
className="rb:h-119 rb:bg-white! rb:border-none! rb:p-0!"
className="rb:h-[calc(100vh-278px)] rb:bg-white! rb:border-none! rb:p-0!"
onChange={(value) => form.setFieldValue('current_prompt', value)}
/>
: <Empty url={analysisEmptyIcon} title={t(`${source}.promptOptimizationEmpty`)} isNeedSubTitle={false} size={[270, 170]} className="rb:h-119 rb:w-70 rb:mx-auto! rb:text-center! rb:text-[12px]! rb:leading-4!" />
: <Empty url={analysisEmptyIcon} title={t(`${source}.promptOptimizationEmpty`)} isNeedSubTitle={false} size={[270, 170]} className="rb:h-[calc(100vh-278px)] rb:w-70 rb:mx-auto! rb:text-center! rb:text-[12px]! rb:leading-4!" />
}
</Form.Item>
</div>

View File

@@ -73,11 +73,14 @@ const Chat: FC<ChatProps> = ({
const [message, setMessage] = useState<string | undefined>(undefined)
const [features, setFeatures] = useState<FeaturesConfigForm>({} as FeaturesConfigForm)
const [audioStatusMap, setAudioStatusMap] = useState<Record<string, string>>({})
const abortRef = useRef<(() => void) | null>(null)
useEffect(() => {
setCompareLoading(false)
setLoading(false)
return () => {
abortRef.current?.()
abortRef.current = null
audioPollingRef.current.forEach(timer => clearInterval(timer))
audioPollingRef.current.clear()
}
@@ -85,6 +88,8 @@ const Chat: FC<ChatProps> = ({
useEffect(() => {
return () => {
abortRef.current?.()
abortRef.current = null
audioPollingRef.current.forEach(timer => clearInterval(timer))
audioPollingRef.current.clear()
}
@@ -393,7 +398,7 @@ const Chat: FC<ChatProps> = ({
parallel: true,
stream: true,
timeout: 60,
}, handleStreamMessage)
}, handleStreamMessage, (abort) => { abortRef.current = abort })
.catch(() => {
setLoading(false)
setCompareLoading(false)
@@ -537,7 +542,8 @@ const Chat: FC<ChatProps> = ({
}
}),
},
handleStreamMessage
handleStreamMessage,
(abort) => { abortRef.current = abort }
)
.catch(() => {
setLoading(false)

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 16:58:03
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-13 18:32:58
* @Last Modified time: 2026-04-21 14:27:15
*/
/**
* Conversation Page
@@ -53,6 +53,7 @@ const Conversation: FC = () => {
const scrollRef = useRef<HTMLDivElement>(null);
const toolbarRef = useRef<ChatToolbarRef>(null)
const audioPollingRef = useRef<Map<string, ReturnType<typeof setInterval>>>(new Map())
const abortRef = useRef<(() => void) | null>(null)
const [shareToken, setShareToken] = useState<string | null>(localStorage.getItem(`shareToken_${token}`))
const [fileList, setFileList] = useState<any[]>([])
const [webSearch, setWebSearch] = useState(false)
@@ -67,6 +68,8 @@ const Conversation: FC = () => {
useEffect(() => {
return () => {
abortRef.current?.()
abortRef.current = null
audioPollingRef.current.forEach((timer) => clearInterval(timer))
audioPollingRef.current.clear()
}
@@ -150,6 +153,8 @@ const Conversation: FC = () => {
const handleChangeHistory = (id: string | null) => {
if (id !== conversation_id) setConversationId(id)
if (!id) setMessage('')
abortRef.current?.()
abortRef.current = null
}
useEffect(() => {
@@ -406,7 +411,7 @@ const Conversation: FC = () => {
}),
variables: params,
thinking,
}, handleStreamMessage, shareToken)
}, handleStreamMessage, shareToken, (abort) => { abortRef.current = abort })
.catch(() => {
setLoading(false)
streamLoadingRef.current = false

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 17:30:11
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-26 15:46:30
* @Last Modified time: 2026-04-21 14:54:14
*/
/**
* Result Component
@@ -10,7 +10,7 @@
* Shows text preprocessing, knowledge extraction, node/edge creation, and deduplication
*/
import { type FC, useState } from 'react'
import { type FC, useState, useRef, useEffect } from 'react'
import { useParams } from 'react-router-dom'
import { useTranslation } from 'react-i18next'
import { Space, Button, Progress, Form, Input, Flex } from 'antd'
@@ -105,7 +105,14 @@ const Result: FC<ResultProps> = ({ loading, handleSave }) => {
const [runForm] = Form.useForm()
const customText = Form.useWatch(['custom_text'], runForm)
const abortRef = useRef<(() => void) | null>(null)
useEffect(() => {
return () => {
abortRef.current?.()
abortRef.current = null;
}
}, [])
/** Run pilot test */
const handleRun = () => {
if(!id) return
@@ -229,11 +236,13 @@ const Result: FC<ResultProps> = ({ loading, handleSave }) => {
})
}
setRunLoading(true)
abortRef.current?.()
abortRef.current = null;
pilotRunMemoryExtractionConfig({
config_id: id,
dialogue_text: t('memoryExtractionEngine.exampleText'),
custom_text: runForm.getFieldValue('custom_text')
}, handleStreamMessage)
}, handleStreamMessage, (abort) => { abortRef.current = abort })
.finally(() => {
setRunLoading(false)
})

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 17:44:15
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-03-27 15:14:58
* @Last Modified time: 2026-04-21 14:24:00
*/
/**
* Prompt Editor Component
@@ -46,9 +46,17 @@ const Prompt: FC = () => {
const promptSaveModalRef = useRef<PromptSaveModalRef>(null)
const editorRef = useRef<any>(null)
const currentPromptValueRef = useRef<string>(undefined)
const abortRef = useRef<(() => void) | null>(null)
const values = Form.useWatch([], form)
const [editVo, setEditVo] = useState<HistoryItem | null>(null)
useEffect(() => {
return () => {
abortRef.current?.()
abortRef.current = null
}
}, [])
useEffect(() => {
setEditVo(state)
}, [state])
@@ -126,7 +134,7 @@ const Prompt: FC = () => {
}
})
};
updatePromptMessages((promptSession) as string, values, handleStreamMessage)
updatePromptMessages((promptSession) as string, values, handleStreamMessage, undefined, (abort) => { abortRef.current = abort })
.finally(() => {
setLoading(false)
})

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-06 21:10:56
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-15 15:57:35
* @Last Modified time: 2026-04-21 14:59:13
*/
/**
* Workflow Chat Component
@@ -51,6 +51,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
const { setChatHistory } = useWorkflowStore()
const conversationIdRef = useRef<string>('draft')
const toolbarRef = useRef<ChatToolbarRef>(null)
const abortRef = useRef<(() => void) | null>(null)
const [toolbarReady, setToolbarReady] = useState(false)
const toolbarCallbackRef = useCallback((node: ChatToolbarRef | null) => {
(toolbarRef as React.MutableRefObject<ChatToolbarRef | null>).current = node
@@ -65,6 +66,8 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
const [fileList, setFileList] = useState<any[]>([])
const [message, setMessage] = useState<string | undefined>(undefined)
console.log('abortRef', abortRef)
/**
* Opens the chat drawer and loads workflow variables from the start node
*/
@@ -116,6 +119,8 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
* Closes the drawer and resets all state
*/
const handleClose = () => {
abortRef.current?.()
abortRef.current = null;
setOpen(false)
setToolbarReady(false)
setChatList([])
@@ -395,7 +400,7 @@ const Chat = forwardRef<ChatRef, { appId: string; graphRef: GraphRef; data: Work
])
setLoading(true)
setStreamLoading(true)
draftRun(appId, data, handleStreamMessage)
draftRun(appId, data, handleStreamMessage, abort => { abortRef.current = abort })
.catch((error) => {
const errorInfo = JSON.parse(error.message)
setChatList(prev => {

View File

@@ -2,7 +2,7 @@
* @Author: ZhaoYing
* @Date: 2026-02-03 15:39:59
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-13 10:44:19
* @Last Modified time: 2026-04-21 14:15:33
*/
import { type FC, useEffect, useState, useMemo } from "react";
import clsx from 'clsx'
@@ -153,7 +153,9 @@ const Properties: FC<PropertiesProps> = ({
selectedNode?.setData({
...nodeData,
...allRest,
})
},
// { deep: false }
)
}
}, [values, selectedNode, form])