/* * @Author: ZhaoYing * @Date: 2026-02-02 16:35:43 * @Last Modified by: ZhaoYing * @Last Modified time: 2026-02-02 16:35:43 */ /** * Server-Sent Events (SSE) Stream Utility Module * * Provides SSE handling with: * - Automatic token refresh on 401 errors * - SSE message parsing and JSON decoding * - HTML entity decoding * - Stream buffering for incomplete messages * * @module stream */ import { message } from 'antd'; import i18n from '@/i18n' import { cookieUtils } from './request' import { refreshToken } from '@/api/user' import { clearAuthData } from './auth' const API_PREFIX = '/api' // Token refresh state let isRefreshing = false; let refreshPromise: Promise | null = null; /** * Refresh authentication token for SSE requests * @returns New access token */ const refreshTokenForSSE = async (): Promise => { if (isRefreshing && refreshPromise) { return refreshPromise; } isRefreshing = true; refreshPromise = (async () => { try { const refresh_token = cookieUtils.get('refreshToken'); if (!refresh_token) { throw new Error(i18n.t('common.refreshTokenNotExist')); } const response: any = await refreshToken(); const newToken = response.access_token; cookieUtils.set('authToken', newToken); return newToken; } catch (error) { clearAuthData(); message.warning(i18n.t('common.loginExpired')); if (!window.location.hash.includes('#/login')) { window.location.href = `/#/login`; } throw error; } finally { isRefreshing = false; refreshPromise = null; } })(); return refreshPromise; }; /** * SSE message structure */ export interface SSEMessage { event?: string data?: string | object } /** * Parse SSE string format to JSON objects * @param sseString - Raw SSE string data * @returns Array of parsed SSE messages */ export function parseSSEToJSON(sseString: string) { const events: SSEMessage[] = [] const lines = sseString.trim().split('\n') let currentEvent: SSEMessage = {} let dataContent = '' for (const line of lines) { if (line.startsWith('event:')) { if (currentEvent.event && dataContent) { currentEvent.data = parseDataContent(dataContent) events.push(currentEvent) } currentEvent = { event: line.substring(6).trim() } dataContent = '' } else if (line.startsWith('data:')) { if (dataContent) dataContent += '\n' dataContent += line.substring(5).trim() } } if (currentEvent.event && dataContent) { currentEvent.data = parseDataContent(dataContent) console.log('currentEvent', currentEvent) events.push(currentEvent) } return events } /** * Parse SSE data content with HTML entity decoding * @param dataContent - Raw data content string * @returns Parsed object or original string */ function parseDataContent(dataContent: string): string | object { try { // First layer: HTML entity decoding let unescaped = dataContent .replace(/"/g, '"') .replace(/&/g, '&') .replace(/</g, '<') .replace(/>/g, '>') .replace(/'/g, "'") // Parse first layer JSON const firstParse = JSON.parse(unescaped) // If data field is a string containing JSON, parse data layer but keep chunk as string if (firstParse.data && typeof firstParse.data === 'string' && firstParse.data.includes("{")) { try { firstParse.data = JSON.parse(firstParse.data) } catch { // Keep original string } } return firstParse } catch { return dataContent } } /** * Make SSE request with authentication * @param url - API endpoint * @param data - Request payload * @param token - Authentication token * @param config - Additional request configuration * @returns Fetch response */ const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }) => { return fetch(`${API_PREFIX}${url}`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}`, ...config.headers, }, body: JSON.stringify(data) }); }; /** * Handle SSE stream with automatic token refresh and message parsing * @param url - API endpoint * @param data - Request payload * @param onMessage - Callback for each parsed message * @param config - Additional request configuration */ export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }) => { try { let token = cookieUtils.get('authToken'); let response = await makeSSERequest(url, data, token || '', config); switch (response.status) { case 500: case 502: const errorData = await response.json(); errorData.error || i18n.t('common.serviceUpgrading'); message.warning(errorData.error || i18n.t('common.serviceUpgrading')); return; case 400: const error = await response.json(); message.warning(error.error); throw error || 'Bad Request'; case 504: const errorJson = await response.json(); message.warning(errorJson.error || i18n.t('common.serverError')); return; case 401: if (url?.includes('/public')) { return message.warning(i18n.t('common.publicApiCannotRefreshToken')); } try { const newToken = await refreshTokenForSSE(); response = await makeSSERequest(url, data, newToken, config); } catch (refreshError) { return; } break; } if (!response.body) throw new Error('No response body'); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; // Buffer for handling incomplete messages while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value, { stream: true }); buffer += chunk; // Process complete events const events = buffer.split('\n\n'); buffer = events.pop() || ''; // Keep last potentially incomplete event for (const event of events) { if (event.trim() && onMessage) { onMessage(parseSSEToJSON(event) ?? {}); } } } // Process remaining buffer content if (buffer.trim() && onMessage) { onMessage(parseSSEToJSON(buffer) ?? {}); } } catch (error) { console.error('Request failed:', error); throw error; } };