Files
MemoryBear/web/src/utils/stream.ts
2026-04-22 10:17:21 +08:00

247 lines
7.0 KiB
TypeScript

/*
* @Author: ZhaoYing
* @Date: 2026-02-02 16:35:43
* @Last Modified by: ZhaoYing
* @Last Modified time: 2026-04-22 10:16:43
*/
/**
* Server-Sent Events (SSE) Stream Utility Module
*
* Provides SSE handling with:
* - Automatic token refresh on 401 errors
* - SSE message parsing and JSON decoding
* - HTML entity decoding
* - Stream buffering for incomplete messages
*
* @module stream
*/
import { message } from 'antd';
import i18n from '@/i18n'
import { cookieUtils } from './request'
import { refreshToken } from '@/api/user'
import { clearAuthData } from './auth'
const API_PREFIX = '/api'
// Token refresh state
let isRefreshing = false;
let refreshPromise: Promise<string> | null = null;
/**
* Refresh authentication token for SSE requests
* @returns New access token
*/
const refreshTokenForSSE = async (): Promise<string> => {
if (isRefreshing && refreshPromise) {
return refreshPromise;
}
isRefreshing = true;
refreshPromise = (async () => {
try {
const refresh_token = cookieUtils.get('refreshToken');
if (!refresh_token) {
throw new Error(i18n.t('common.refreshTokenNotExist'));
}
const response: any = await refreshToken();
const newToken = response.access_token;
cookieUtils.set('authToken', newToken);
return newToken;
} catch (error) {
clearAuthData();
message.warning(i18n.t('common.loginExpired'));
if (!window.location.hash.includes('#/login')) {
window.location.href = `/#/login`;
}
throw error;
} finally {
isRefreshing = false;
refreshPromise = null;
}
})();
return refreshPromise;
};
/**
* SSE message structure
*/
export interface SSEMessage {
event?: string
data?: string | object
}
/**
* Parse SSE string format to JSON objects
* @param sseString - Raw SSE string data
* @returns Array of parsed SSE messages
*/
export function parseSSEToJSON(sseString: string) {
const events: SSEMessage[] = []
const lines = sseString.trim().split('\n')
let currentEvent: SSEMessage = {}
let dataContent = ''
for (const line of lines) {
if (line.startsWith('event:')) {
if (currentEvent.event && dataContent) {
currentEvent.data = parseDataContent(dataContent)
events.push(currentEvent)
}
currentEvent = { event: line.substring(6).trim() }
dataContent = ''
} else if (line.startsWith('data:')) {
if (dataContent) dataContent += '\n'
dataContent += line.substring(5).trim()
}
}
if (currentEvent.event && dataContent) {
currentEvent.data = parseDataContent(dataContent)
console.log('currentEvent', currentEvent)
events.push(currentEvent)
}
return events
}
/**
* Parse SSE data content with HTML entity decoding
* @param dataContent - Raw data content string
* @returns Parsed object or original string
*/
function parseDataContent(dataContent: string): string | object {
try {
// First layer: HTML entity decoding
let unescaped = dataContent
.replace(/&quot;/g, '"')
.replace(/&amp;/g, '&')
.replace(/&lt;/g, '<')
.replace(/&gt;/g, '>')
.replace(/&#39;/g, "'")
// Parse first layer JSON
const firstParse = JSON.parse(unescaped)
// If data field is a string containing JSON, parse data layer but keep chunk as string
if (firstParse.data && typeof firstParse.data === 'string' && firstParse.data.includes("{")) {
try {
firstParse.data = JSON.parse(firstParse.data)
} catch {
// Keep original string
}
}
return firstParse
} catch {
return dataContent
}
}
/**
* Make SSE request with authentication
* @param url - API endpoint
* @param data - Request payload
* @param token - Authentication token
* @param config - Additional request configuration
* @returns Fetch response
*/
const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }) => {
return fetch(`${API_PREFIX}${url}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
...config.headers,
},
body: JSON.stringify(data)
});
};
/**
* Handle SSE stream with automatic token refresh and message parsing
* @param url - API endpoint
* @param data - Request payload
* @param onMessage - Callback for each parsed message
* @param config - Additional request configuration
*/
export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }) => {
try {
let token = cookieUtils.get('authToken');
let response = await makeSSERequest(url, data, token || '', config);
switch (response.status) {
case 500:
case 502:
const errorData = await response.json();
const errorInfo = errorData.error || errorData.msg || i18n.t('common.serviceUpgrading');
message.warning(errorInfo);
throw new Error(errorData);
case 400:
const error = await response.json();
const error400 = error.error || error.msg || 'Bad Request';
message.warning(error400);
throw new Error(error);
case 403:
const errors = await response.json();
message.warning(i18n.t('common.permissionDenied'));
throw new Error(errors);
case 504:
const errorJson = await response.json();
const errorMsg = errorJson.error || errorJson.msg || i18n.t('common.serverError');
message.warning(errorMsg);
throw new Error(errorJson);
case 401:
if (url?.includes('/public')) {
return message.warning(i18n.t('common.publicApiCannotRefreshToken'));
}
try {
const newToken = await refreshTokenForSSE();
response = await makeSSERequest(url, data, newToken, config);
} catch (refreshError) {
return;
}
break;
default:
if (!response.ok) {
const defaultData = await response.json().catch(() => ({}));
const defaultMsg = defaultData.error || defaultData.msg;
if (defaultMsg) message.warning(defaultMsg);
throw new Error(defaultMsg || `HTTP ${response.status}`);
}
}
if (!response.body) throw new Error('No response body');
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = ''; // Buffer for handling incomplete messages
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
// Process complete events
const events = buffer.split('\n\n');
buffer = events.pop() || ''; // Keep last potentially incomplete event
for (const event of events) {
if (event.trim() && onMessage) {
onMessage(parseSSEToJSON(event) ?? {});
}
}
}
// Process remaining buffer content
if (buffer.trim() && onMessage) {
onMessage(parseSSEToJSON(buffer) ?? {});
}
} catch (error) {
console.error('Request failed:', error);
throw error;
}
};