From bca4b2245368e4f234e9685a5a3c271e5f44e28a Mon Sep 17 00:00:00 2001 From: zhaoying Date: Tue, 13 Jan 2026 10:19:04 +0800 Subject: [PATCH] fix(web): workflow's chat --- web/src/components/Chat/ChatContent.tsx | 4 +- web/src/utils/auth.ts | 4 +- web/src/utils/stream.ts | 96 ++++++++++++------- .../views/Workflow/components/Chat/Chat.tsx | 10 +- 4 files changed, 74 insertions(+), 40 deletions(-) diff --git a/web/src/components/Chat/ChatContent.tsx b/web/src/components/Chat/ChatContent.tsx index 2067f57e..11ccb5c3 100644 --- a/web/src/components/Chat/ChatContent.tsx +++ b/web/src/components/Chat/ChatContent.tsx @@ -2,7 +2,7 @@ * @Author: ZhaoYing * @Date: 2025-12-10 16:46:17 * @Last Modified by: ZhaoYing - * @Last Modified time: 2025-12-11 13:40:18 + * @Last Modified time: 2026-01-12 20:41:27 */ import { type FC, useRef, useEffect } from 'react' import clsx from 'clsx' @@ -55,7 +55,7 @@ const ChatContent: FC = ({ } {/* 消息气泡框 */} -
{ console.log("Clearing auth data and redirecting to login"); - sessionStorage.clear(); - localStorage.clear() + // sessionStorage.clear(); + // localStorage.clear() cookieUtils.clear(); } \ No newline at end of file diff --git a/web/src/utils/stream.ts b/web/src/utils/stream.ts index abaaca2d..7688cdd5 100644 --- a/web/src/utils/stream.ts +++ b/web/src/utils/stream.ts @@ -12,43 +12,57 @@ export function parseSSEToJSON(sseString: string) { const lines = sseString.trim().split('\n') let currentEvent: SSEMessage = {} + let dataContent = '' + for (const line of lines) { + if (line.startsWith('event:')) { + if (currentEvent.event && dataContent) { + currentEvent.data = parseDataContent(dataContent) + events.push(currentEvent) + } + currentEvent = { event: line.substring(6).trim() } + dataContent = '' + } else if (line.startsWith('data:')) { + if (dataContent) dataContent += '\n' + dataContent += line.substring(5).trim() + } + } + + + if (currentEvent.event && dataContent) { + currentEvent.data = parseDataContent(dataContent) + console.log('currentEvent', currentEvent) + events.push(currentEvent) + } + + return events +} + +function parseDataContent(dataContent: string): string | object { try { - for (const line of lines) { - if (line.startsWith('event:')) { - if (Object.keys(currentEvent).length > 0) { - events.push(currentEvent) - currentEvent = {} - } - currentEvent.event = line.substring(6).trim() - } else if (line.startsWith('data:')) { - const dataStr = line.substring(5).trim() - if (dataStr) { - try { - // 尝试解析为 JSON - currentEvent.data = JSON.parse(dataStr) - } catch { - // JSON 解析失败时,检查是否是被转义的 JSON 字符串 - try { - const unescaped = dataStr.replace(/"/g, '"').replace(/&/g, '&') - currentEvent.data = JSON.parse(unescaped) - } catch { - // 如果仍然失败,保存为原始字符串 - currentEvent.data = dataStr - } - } - } + // 第一层解码:HTML实体 + let unescaped = dataContent + .replace(/"/g, '"') + .replace(/&/g, '&') + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/'/g, "'") + + // 解析第一层JSON + const firstParse = JSON.parse(unescaped) + + // 如果data字段是字符串且包含JSON,解析data层但保持chunk为字符串 + if (firstParse.data && typeof firstParse.data === 'string' && firstParse.data.includes("{")) { + try { + firstParse.data = JSON.parse(firstParse.data) + } catch { + // 保持原字符串 } } - if (Object.keys(currentEvent).length > 0) { - events.push(currentEvent) - } - - return events - } catch (error) { - console.error('Parse stream error:', error) - return [] + return firstParse + } catch { + return dataContent } } @@ -80,16 +94,30 @@ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMe const reader = response.body.getReader(); const decoder = new TextDecoder(); + let buffer = ''; // 添加缓冲区来处理不完整的消息 while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value, { stream: true }); - if (onMessage) { - onMessage(parseSSEToJSON(chunk) ?? {}); + buffer += chunk; + + // 处理完整的事件 + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; // 保留最后一个可能不完整的事件 + + for (const event of events) { + if (event.trim() && onMessage) { + onMessage(parseSSEToJSON(event) ?? {}); + } } } + + // 处理剩余的缓冲区内容 + if (buffer.trim() && onMessage) { + onMessage(parseSSEToJSON(buffer) ?? {}); + } break; } } catch (error) { diff --git a/web/src/views/Workflow/components/Chat/Chat.tsx b/web/src/views/Workflow/components/Chat/Chat.tsx index 4824cc9c..0673389e 100644 --- a/web/src/views/Workflow/components/Chat/Chat.tsx +++ b/web/src/views/Workflow/components/Chat/Chat.tsx @@ -26,6 +26,7 @@ const Chat = forwardRef(({ appId const [chatList, setChatList] = useState([]) const [variables, setVariables] = useState([]) const [streamLoading, setStreamLoading] = useState(false) + const [conversationId, setConversationId] = useState(null) const handleOpen = () => { setOpen(true) @@ -100,7 +101,7 @@ const Chat = forwardRef(({ appId setStreamLoading(false) data.forEach(item => { - const { chunk } = item.data as { chunk: string; }; + const { chunk, conversation_id } = item.data as { chunk: string; conversation_id: string | null; }; switch(item.event) { case 'message': @@ -131,6 +132,10 @@ const Chat = forwardRef(({ appId setStreamLoading(false) break } + + if (conversation_id && conversationId !== conversation_id) { + setConversationId(conversation_id) + } }) } @@ -138,7 +143,8 @@ const Chat = forwardRef(({ appId draftRun(appId, { message: message, variables: params, - stream: true + stream: true, + conversation_id: conversationId }, handleStreamMessage) .finally(() => { setLoading(false)