From ba30161559ea2c15164934826689c38521d0ae49 Mon Sep 17 00:00:00 2001 From: zhaoying Date: Thu, 15 Jan 2026 14:58:54 +0800 Subject: [PATCH] fix(web): stream api support refresh token --- web/src/utils/stream.ts | 131 ++++++++++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/web/src/utils/stream.ts b/web/src/utils/stream.ts index 7688cdd5..e4179e25 100644 --- a/web/src/utils/stream.ts +++ b/web/src/utils/stream.ts @@ -1,8 +1,47 @@ import { message } from 'antd'; import i18n from '@/i18n' import { cookieUtils } from './request' +import { refreshToken } from '@/api/user' +import { clearAuthData } from './auth' const API_PREFIX = '/api' +// Token refresh state +let isRefreshing = false; +let refreshPromise: Promise | null = null; + +// Refresh token function for SSE +const refreshTokenForSSE = async (): Promise => { + if (isRefreshing && refreshPromise) { + return refreshPromise; + } + + isRefreshing = true; + refreshPromise = (async () => { + try { + const refresh_token = cookieUtils.get('refreshToken'); + if (!refresh_token) { + throw new Error(i18n.t('common.refreshTokenNotExist')); + } + const response: any = await refreshToken(); + const newToken = response.access_token; + cookieUtils.set('authToken', newToken); + return newToken; + } catch (error) { + clearAuthData(); + message.warning(i18n.t('common.loginExpired')); + if (!window.location.hash.includes('#/login')) { + window.location.href = `/#/login`; + } + throw error; + } finally { + isRefreshing = false; + refreshPromise = null; + } + })(); + + return refreshPromise; +}; + export interface SSEMessage { event?: string data?: string | object @@ -66,62 +105,66 @@ function parseDataContent(dataContent: string): string | object { } } +const makeSSERequest = async (url: string, data: any, token: string, config = { headers: {} }) => { + return fetch(`${API_PREFIX}${url}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}`, + ...config.headers, + }, + body: JSON.stringify(data) + }); +}; export const handleSSE = async (url: string, data: any, onMessage?: (data: SSEMessage[]) => void, config = { headers: {} }) => { try { - const token = cookieUtils.get('authToken'); - const response = await fetch(`${API_PREFIX}${url}`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${token}`, - ...config.headers, - }, - body: JSON.stringify(data) - }); + let token = cookieUtils.get('authToken'); + let response = await makeSSERequest(url, data, token || '', config); - const { status } = response - - switch(status) { + switch (response.status) { case 401: if (url?.includes('/public')) { return message.warning(i18n.t('common.publicApiCannotRefreshToken')); } - window.location.href = `/#/login`; - break; - default: - if (!response.body) throw new Error('No response body'); - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; // 添加缓冲区来处理不完整的消息 - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - const chunk = decoder.decode(value, { stream: true }); - buffer += chunk; - - // 处理完整的事件 - const events = buffer.split('\n\n'); - buffer = events.pop() || ''; // 保留最后一个可能不完整的事件 - - for (const event of events) { - if (event.trim() && onMessage) { - onMessage(parseSSEToJSON(event) ?? {}); - } - } - } - - // 处理剩余的缓冲区内容 - if (buffer.trim() && onMessage) { - onMessage(parseSSEToJSON(buffer) ?? {}); + try { + const newToken = await refreshTokenForSSE(); + response = await makeSSERequest(url, data, newToken, config); + } catch (refreshError) { + return; } break; } + if (!response.body) throw new Error('No response body'); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; // 添加缓冲区来处理不完整的消息 + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; + + // 处理完整的事件 + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; // 保留最后一个可能不完整的事件 + + for (const event of events) { + if (event.trim() && onMessage) { + onMessage(parseSSEToJSON(event) ?? {}); + } + } + } + + // 处理剩余的缓冲区内容 + if (buffer.trim() && onMessage) { + onMessage(parseSSEToJSON(buffer) ?? {}); + } } catch (error) { console.error('Request failed:', error); throw error; } -} \ No newline at end of file +}; \ No newline at end of file