主题
SSE与流式响应实践避坑
1、介绍SSE
SSE(Server-Sent Events) 是一种基于HTTP协议的服务器推送技术,允许服务器实时向客户端发送事件。与WebSocket不同,SSE是单向的,服务器可以向客户端发送数据,但客户端不能向服务器发送数据。
SSE的主要特点包括:
- 实时更新:服务器可以主动推送信息到客户端,适用于需要实时数据更新的应用场景。
- 简单易用:使用HTTP协议,易于实现和集成。
- 轻量级:相较于WebSocket,SSE的实现更为简单,适合于不需要双向通信的场景。
2、使用
既然 SSE本质是一种 基于HTTP 协议的技术,那么使用也很简单,分为客户端和服务器端
- 服务器端
以nodejs 最为服务器端举例
服务器端只需要简单几步就能开启响应
js
var http = require("http");
http.createServer(function (req, res) {
var fileName = "." + req.url;
if (fileName === "./stream") {
// 1. 设置响应头
res.writeHead(200, {
"Content-Type":"text/event-stream", // 必须
"Cache-Control":"no-cache", // 避免消息被类似nginx或网关之类的中间层以及浏览器缓存,导致在前端表现为最终才一次性响应
"Connection":"keep-alive", // 开启长连接
"Access-Control-Allow-Origin": '*', // 跨域
});
// 2. 数据响应
// 服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。
// 两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
res.write("retry: 10000\n");
// event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。
res.write("event: connecttime\n");
// 数据内容用data字段表示。如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。
res.write("data: " + (new Date()) + "\n\n");
res.write("data: " + (new Date()) + "\n\n");
interval = setInterval(function () {
res.write("data: " + (new Date()) + "\n\n");
}, 1000);
// 当客户端断开连接后,服务器停止推送
req.connection.addListener("close", function () {
clearInterval(interval);
}, false);
}
}).listen(9527, "127.0.0.1");
- 客户端
正常情况下,客户端用 EventSource 对象就可以
js
// 下面的代码可以检测浏览器是否支持 SSE。
if ('EventSource' in window) {
// 使用 SSE 时,浏览器首先生成一个EventSource实例,向服务器发起连接。
var source = new EventSource(url);
// 上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。
// var source = new EventSource(url, { withCredentials: true });
}
EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。
- 0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。
- 1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。
- 2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。
- 连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数。
js
source.onopen = function (event) {
// ...
};
// 另一种写法
source.addEventListener('open', function (event) {
// ...
}, false);
- 客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性的回调函数。
js
source.onmessage = function (event) {
var data = event.data;
// handle message
};
// 另一种写法
source.addEventListener('message', function (event) {
var data = event.data; // 事件对象的data属性就是服务器端传回的数据(文本格式)。
// handle message
}, false);
- 如果发生通信错误(比如连接中断),就会触发error事件,可以在onerror属性定义回调函数。
js
source.onerror = function (event) {
// handle error event
};
// 另一种写法
source.addEventListener('error', function (event) {
// handle error event
}, false);
- close方法用于关闭 SSE 连接。
js
source.close();
- 自定义事件 默认情况下,服务器发来的数据,总是触发浏览器EventSource实例的message事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发message事件。
js
// 浏览器对 SSE 的foo事件进行监听,需要服务器发送 foo 事件才行,(见上文服务器代码的第2点:数据响应)
source.addEventListener('foo', function (event) {
var data = event.data;
// handle message
}, false);
3、EventSource局限性
EventSource API 是用于服务器发送事件(SSE)的客户端接口,有一些局限性:
它是单向通信的:只能从服务器端向客户端发送数据,若需要客户端发送数据给服务器,只有在创建连接时候,通过URL传递少量参数给服务器
它不支持发起 POST 请求,默认为GET
这对于像需要与AI进行交互的场景,用户需要发送数据给服务端,服务端拿到数据后请求大模型接口后响应数据的场景不太适合,可能下面的【流式传输】方案更适合
4、流式传输 (Stream)
首先要明确一点:SSE 是实现流式响应的一种非常流行和高效的方式。
SSE 是一种具体的、标准化的 Web 技术协议,用于实现从服务器到客户端的单向流式通信。
流式响应是一种更通用的概念和模式,指的是服务器逐步生成并发送数据,而不是一次性发送全部数据。
- 服务器端
node为例
流式传输的服务器端与SSE的服务器端没没什么太大的区别,只不过SSE服务器端是在普通流式传输的基础上规定了一些特定的格式,
- 主要区别在于 HTTP 响应头 和 发送数据的格式
- 普通流式传输响应头通常只需
Transfer-Encoding: chunked
。Content-Type
根据流的内容而定(如application/octet-stream
,text/plain
)。 - 消息格式无格式要求。直接发送原始的数据块(chunks)但需要客户端配合解析,更加灵活,而 SSE 格式固定,专为“事件消息”设计,与浏览器 EventSource API 无缝集成,开箱即用。
- 普通流式传输可以为任意的请求方式 GET、POST、PUT、....
普通流式传输代码示例:
js
const http = require('http');
const server = http.createServer((req, res) => {
if (req.url === '/stream') {
// 设置头部:表明是分块传输的普通文本
res.writeHead(200, {
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked'
});
let count = 0;
const intervalId = setInterval(() => {
count++;
// 关键区别点:直接写入原始数据
res.write(`Chunk ${count}: Current time is ${new Date().toISOString()}\n`);
if (count === 5) {
clearInterval(intervalId);
res.end(); // 结束流
}
}, 1000);
// 处理连接中断
req.on('close', () => {
clearInterval(intervalId);
res.end();
});
} else {
res.statusCode = 404;
res.end('Not found');
}
});
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
- 客户端
流式传输客户端就类似于常规的http请求,只不过有一些需要主要特别注意的点:
HTTP 分块传输编码 (Chunked Transfer Encoding)
:这是流式传输的基础。服务器必须支持并启用此功能,才能将响应分成多个“块”陆续发送。客户端库通常会自动处理这块逻辑。正确读取响应体
:不要使用那些期望一次性返回完整响应体的方法(如 .json() 或 .text()),它们会等待流结束,从而失去流式的效果。必须通过响应体的 getReader() 方法获取 ReadableStreamDefaultReader 来逐块读取。错误处理与中断
:流式请求可能需要较长时间,因此要妥善处理超时、网络错误以及用户主动取消的情况。使用 AbortController 可以中断请求和流的读取。内存管理
:流式传输的一大优势是能降低内存消耗,避免大响应体阻塞客户端。确保在读取每个数据块后及时处理,并不再需要时释放资源。
客户端方案对比
方案 | EventSource | Fetch API + ReadableStream | fetch-event-source |
---|---|---|---|
特点 | 简单粗暴 | 功能强大但复杂 | 开箱即用 |
优点 | • 浏览器自带,无需安装 • 断网自动重连 | • 支持所有请求方式 • 可自定义请求头 • 支持二进制数据 | • 功能全面 • 自动处理重连 • 企业级稳定 |
缺点 | • 只能GET请求 • 不能传自定义头 • 只支持UTF-8 • IE不支持 | • 需要手写错误处理 • 代码量大 | • 需要安装依赖 • 包体积大 |
适用场景 | 简单推送 | 复杂需求 | 企业项目 |
选型建议 | 新手首选 | 高手专用 | 推荐使用 |
实际案例 | - | dify、chatbot-ui | 京东 joyagent-jdgenie |
1)Fetch API + ReadableStream 代码示例
js
// 发起流式请求
async function startStream() {
try {
// 发起POST请求,支持自定义请求头
// 此处await是为了等待响应头,并非响应体
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-token' // 支持认证
},
body: JSON.stringify({
message: '你好,请帮我写代码'
})
});
// 检查响应状态
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
// 获取可读流
const reader = response.body.getReader();
const decoder = new TextDecoder(); // 用于解码二进制数据
// 循环读取数据块
while (true) {
const { done, value } = await reader.read();
// 流结束
if (done) {
console.log('流式传输完成');
break;
}
// 解码数据并处理
const chunk = decoder.decode(value, { stream: true });
console.log('收到数据:', chunk);
// 这里可以更新UI,比如逐字显示(需要注意处理上一个chunk的完整性,例如一个中文字符被两个chunk分成了两部分,会导致乱码)
updateUI(chunk);
}
} catch (error) {
console.error('流式请求失败:', error);
// 需要手动处理错误和重连
handleError(error);
}
}
// 手动错误处理和重连
function handleError(error) {
console.log('处理错误,准备重连...');
// 实现重连逻辑
setTimeout(() => {
startStream();
}, 1000);
}
// 更新UI的函数
function updateUI(chunk) {
const output = document.getElementById('output');
output.textContent += chunk;
}
2)fetch-event-source 代码示例
js
import { fetchEventSource } from '@microsoft/fetch-event-source';
// 使用微软的fetch-event-source库
async function startStreamWithLibrary() {
try {
await fetchEventSource('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-token' // 支持认证
},
body: JSON.stringify({
message: '你好,请帮我写代码'
}),
// 连接建立时的回调
async onopen(response) {
if (response.ok) {
console.log('连接已建立');
return; // 一切正常,继续
} else {
throw new Error(`Server error: ${response.status}`);
}
},
// 收到消息时的回调
onmessage(msg) {
try {
// 自动解析JSON数据
const data = msg.data ? JSON.parse(msg.data) : msg.data;
console.log('收到消息:', data);
// 更新UI
updateUI(data);
} catch (err) {
console.error('解析消息失败:', err);
}
},
// 连接关闭时的回调
onclose() {
console.log('连接已关闭');
// 库会自动处理重连
},
// 错误处理回调
onerror(err) {
console.error('发生错误:', err);
// 库会自动重试,抛出错误会停止重试
throw err;
}
});
} catch (error) {
console.error('流式请求失败:', error);
}
}
// 更新UI的函数
function updateUI(data) {
const output = document.getElementById('output');
if (typeof data === 'string') {
output.textContent += data;
} else {
output.textContent += data.content || JSON.stringify(data);
}
}
3)封装fetch-event-source 以及使用
- 封装
js
// src/utils/sseUtils.js
import { fetchEventSource } from '@microsoft/fetch-event-source';
/**
* 流式请求封装
* @param {string} url - 请求地址
* @param {Object} options - 配置选项
* @param {Object} options.body - 请求体
* @param {Object} options.headers - 请求头
* @param {function} options.onMessage - 消息处理回调
* @param {function} options.onOpen - 连接打开回调
* @param {function} options.onClose - 连接关闭回调
* @param {function} options.onError - 错误处理回调
* @returns {Promise} 返回一个可取消的Promise
*/
export const createSSEConnection = (url, {
body,
headers = {},
onMessage,
onOpen,
onClose,
onError
}) => {
// 创建一个AbortController用于取消请求
const ctrl = new AbortController();
// 返回一个包含取消方法的Promise
return {
promise: fetchEventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...headers
},
body: JSON.stringify(body),
signal: ctrl.signal,
async onopen(response) {
if (response.ok) {
onOpen?.();
return; // 一切正常,继续
}
throw new Error(`Server error: ${response.status}`);
},
onmessage(msg) {
try {
// 如果数据是JSON格式则解析,否则直接使用
const data = msg.data ? JSON.parse(msg.data) : msg.data;
onMessage?.(data);
} catch (err) {
console.error('Failed to parse message data', err);
}
},
onclose() {
onClose?.();
},
onerror(err) {
onError?.(err);
throw err; // 重新抛出以停止重试
}
}),
cancel: () => ctrl.abort()
};
};
- 在vue3中使用
html
<script setup>
import { ref, onUnmounted } from 'vue';
import { createSSEConnection } from '@/utils/sseUtils';
const streamData = ref(''); // 存储流式数据
const isLoading = ref(false); // 加载状态
const error = ref(null); // 错误信息
let sseConnection = null; // 存储SSE连接
// 发起流式请求
const startStream = async () => {
try {
// 重置状态
streamData.value = '';
isLoading.value = true;
error.value = null;
// 创建SSE连接
sseConnection = createSSEConnection('https://api.example.com/stream', {
body: { query: '获取流式数据' },
onMessage: (data) => {
// 企业技巧1: 使用函数式更新避免重复触发响应式
streamData.value += data;
// 企业技巧2: 自动滚动到底部 (适用于聊天场景)
nextTick(() => {
const container = document.getElementById('stream-container');
if (container) {
container.scrollTop = container.scrollHeight;
}
});
},
onOpen: () => {
console.log('连接已建立');
},
onClose: () => {
isLoading.value = false;
console.log('连接已关闭');
},
onError: (err) => {
isLoading.value = false;
error.value = err.message;
console.error('发生错误:', err);
}
});
await sseConnection.promise;
} catch (err) {
if (err.name !== 'AbortError') {
error.value = err.message;
}
} finally {
isLoading.value = false;
}
};
// 停止流式请求
const stopStream = () => {
if (sseConnection) {
sseConnection.cancel();
sseConnection = null;
}
};
// 组件卸载时自动取消请求
onUnmounted(() => {
stopStream();
});
// 企业技巧3: 使用计算属性处理流式数据
const processedStreamData = computed(() => {
return streamData.value
.replace(/\n/g, '<br>') // 换行转换
.replace(/\t/g, ' '); // 制表符转换
});
</script>
<template>
<div class="stream-container">
<h2>流式数据演示</h2>
<!-- 控制按钮 -->
<div class="controls">
<button @click="startStream" :disabled="isLoading">开始流式请求</button>
<button @click="stopStream" :disabled="!isLoading">停止</button>
</div>
<!-- 加载状态 -->
<div v-if="isLoading" class="loading">加载中...</div>
<!-- 错误显示 -->
<div v-if="error" class="error">{{ error }}</div>
<!-- 流式数据展示 -->
<div
id="stream-container"
class="stream-content"
v-html="processedStreamData"
></div>
<!-- 企业技巧4: 显示数据统计 -->
<div class="stats">
已接收: {{ streamData.length }} 字符 |
行数: {{ (streamData.match(/\n/g) || []).length + 1 }}
</div>
</div>
</template>
5、EGG 服务器流式响应
- ctx.res.write
EggJS 中使用 ctx.res.write
的主要场景:
- 实时数据推送:SSE、WebSocket 降级
- 大文件处理:文件下载、视频流传输
- AI 应用:流式对话、文本生成
- 系统监控:实时日志、进度更新
- 数据库操作:大数据量查询结果
- 错误处理:重连机制、异常恢复
ctx.res.write 不能实现效果原因
有时候在使用 ctx.res.write
可能不能正确的实现效果,原因可能有这些
- http响应头问题
EggJS 会自动设置 Content-Length 头,但流式响应时长度未知,导致客户端等待不准确。
js
// ❌ 问题代码
async badControl() {
const { ctx } = this;
// 设置响应头
ctx.set('Content-Type', 'text/plain');
ctx.set('Cache-Control', 'no-cache');
// 直接写入数据
ctx.res.write('Hello');
ctx.res.write('World');
ctx.res.end();
}
- 中间件干扰
js
// EggJS 中间件会干扰流式响应
app.use(async (ctx, next) => {
// 这个中间件可能会缓存响应
await next();
// 问题:中间件可能修改响应头或内容
if (ctx.body) {
ctx.body = 'modified: ' + ctx.body;
}
});
- 响应体处理机制冲突
js
// ❌ 错误方式
async badStream() {
const { ctx } = this;
// EggJS 期望通过 ctx.body 设置响应
ctx.body = 'some data'; // 这会与 ctx.res.write 冲突
// 直接写入会绕过 EggJS 的响应处理机制
ctx.res.write('stream data');
}
解决方案
- 正确设置响应头与状态码 在正常情况下,需要当单独设置 200 状态码, 否则流式响应 egg会默认 404,
ctx.status=200
- sse响应头
js
// 设置 SSE 响应头
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*', // 设置跨域
'Access-Control-Allow-Headers': 'Cache-Control'
});
- 设置分块传输响应头
js
// 设置分块传输响应头
ctx.set({
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache'
});
- 禁用 EggJS 的响应体处理
js
ctx.respond = false;
- 自定义中间件处理
若有自定义中间件,则在中间件中需要区分流式响应的场景
js
// ✅ 自定义中间件处理流式响应
module.exports = () => {
return async (ctx, next) => {
await next();
// 检查是否是流式响应
if (ctx.streaming) {
// 跳过默认的响应处理
return;
}
};
};
代码示例
js
async stream() {
const { ctx } = this;
try {
ctx.set({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
ctx.status = 200;
// 禁用框架默认的响应处理,直接使用原生 Node.js 响应
ctx.respond = false;
// 流式数据处理
const stream = fs.createReadStream("./zhihu.txt", {
encoding: "utf-8",
highWaterMark: 100, // chunk 块的大小
});
stream.on("data", (chunk) => {
ctx.res.write(`data: ${chunk}\n\n`);
});
stream.on("end", () => {
ctx.res.write("data: [DONE]\n\n");
ctx.res.end();
});
stream.on("error", (err) => {
ctx.res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
ctx.res.end();
});
ctx.req.on("close", () => {
stream.destroy();
});
} catch (error) {
ctx.logger.error("Stream error:", error);
ctx.res.write(`data: ${JSON.stringify({ error: "Internal error" })}\n\n`);
ctx.res.end();
}
}
- PassThrough 中转流式响应
PassThrough流 是Node.js中的一种双工流(duplex stream),它在数据流传递过程中起到无操作的中间层,将数据从可读流传递到可写流,同时不做任何修改或处理。
相对于 ctx.res.write
,在Eggjs中使用 PassThrough 流式响应更简单也更符合eggjs的设计,具体对比如下:
特性 | PassThrough | ctx.res.write |
---|---|---|
响应处理 | 通过 EggJS 正常流程 | 绕过 EggJS 响应机制 |
中间件兼容 | 完全兼容 | 需要特殊处理 |
错误处理 | 统一错误处理 | 需要手动处理 |
资源管理 | 自动管理 | 手动管理 |
代码复杂度 | 简单 | 复杂 |
代码示例
js
const { PassThrough } = require('stream');
// ✅ PassThrough - 更好的性能
async streamWithPassThrough() {
const { ctx } = this;
// 可以设置缓冲区大小,配置可选
const passThrough = new PassThrough({
highWaterMark: 16 * 1024, // 16KB 缓冲区
objectMode: false
});
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 直接响应stream,egg会自行处理
ctx.body = passThrough;
// 异步处理数据
this.processDataAsync(passThrough);
}
// 数据处理,将其他数据源write进入 PassThrough
async processDataAsync(passThrough) {
const dataSource = await this.getDataSource(); // 伪代码。获取数据源数据流,可以是三方接口,也可以是本地数据流
// 支持背压控制
dataSource.on('data', (chunk) => {
// chhunk格式需要根据getDataSource响应的格式来处理,不一定是JSON.stringify
if (!passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`)) {
// 缓冲区满了,暂停数据源
dataSource.pause();
}
});
// 缓冲区有空间时恢复数据源
passThrough.on('drain', () => {
dataSource.resume();
});
dataSource.on('end', () => {
passThrough.write('data: [DONE]\n\n');
passThrough.end();
});
}
6、Eggjs HttpClient中转发其他流式响应接口
- 基础示例
js
const { PassThrough } = require('stream');
// 代理 SSE 流式响应
async proxySSE() {
const { ctx } = this;
const passThrough = new PassThrough();
// 设置 SSE 响应头
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
ctx.body = passThrough;
try {
// 获取目标 URL
const { url } = ctx.query;
if (!url) {
passThrough.write('data: {"error": "URL parameter is required"}\n\n');
passThrough.end();
return;
}
// 使用 HttpClient 请求目标接口
const response = await ctx.curl(url, {
method: 'GET',
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache'
},
// 启用流式响应
streaming: true
});
// 检查响应状态
if (response.status !== 200) {
passThrough.write(`data: {"error": "Target service error: ${response.status}"}\n\n`);
passThrough.end();
return;
}
// 转发流式数据
response.res.on('data', (chunk) => {
passThrough.write(chunk);
});
response.res.on('end', () => {
passThrough.end();
});
response.res.on('error', (err) => {
ctx.logger.error('Proxy stream error:', err);
passThrough.write(`data: {"error": "${err.message}"}\n\n`);
passThrough.end();
});
} catch (error) {
ctx.logger.error('Proxy error:', error);
passThrough.write(`data: {"error": "Proxy failed: ${error.message}"}\n\n`);
passThrough.end();
}
}
- EGG 通用流式代理 class 封装
js
const { PassThrough } = require('stream');
class StreamProxyService {
constructor(ctx) {
this.ctx = ctx;
this.passThrough = new PassThrough();
}
// 设置响应头
setHeaders(contentType = 'text/event-stream') {
this.ctx.set({
'Content-Type': contentType,
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*', // 视情况调整
'Access-Control-Allow-Headers': 'Cache-Control' // 视情况调整
});
this.ctx.body = this.passThrough;
}
// 代理请求
async proxy(options) {
const {
url,
method = 'GET',
headers = {},
data = null,
timeout = 30000,
retries = 0,
transform = null
} = options;
if (!url) {
this.passThrough.write('data: {"error": "URL is required"}\n\n');
this.passThrough.end();
return;
}
try {
// 构建请求配置
const requestConfig = {
method,
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
...headers
},
streaming: true,
timeout
};
if (data) {
requestConfig.data = data;
}
// 发送请求
const response = await this.ctx.curl(url, requestConfig);
if (response.status !== 200) {
this.passThrough.write(`data: {"error": "Target service error: ${response.status}"}\n\n`);
this.passThrough.end();
return;
}
// 处理流式数据
this.handleStream(response.res, transform);
} catch (error) {
this.ctx.logger.error('Proxy error:', error);
this.passThrough.write(`data: {"error": "Proxy failed: ${error.message}"}\n\n`);
this.passThrough.end();
}
}
// 处理流式数据
handleStream(stream, transform) {
if (transform) {
// 应用转换
const transformStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
const transformedData = transform(data);
callback(null, transformedData);
} catch (parseError) {
callback(null, { content: chunk.toString() });
}
}
});
stream
.pipe(transformStream)
.on('data', (chunk) => {
this.passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`);
})
.on('end', () => {
this.passThrough.write('data: [DONE]\n\n');
this.passThrough.end();
})
.on('error', (err) => {
this.ctx.logger.error('Transform stream error:', err);
this.passThrough.write(`data: {"error": "${err.message}"}\n\n`);
this.passThrough.end();
});
} else {
// 直接转发
stream.on('data', (chunk) => {
this.passThrough.write(chunk);
});
stream.on('end', () => {
this.passThrough.end();
});
stream.on('error', (err) => {
this.ctx.logger.error('Stream error:', err);
this.passThrough.write(`data: {"error": "${err.message}"}\n\n`);
this.passThrough.end();
});
}
}
}
// 在控制器中使用
async proxy() {
const { ctx } = this;
const proxyService = new StreamProxyService(ctx);
proxyService.setHeaders();
await proxyService.proxy({
url: ctx.request.body.url,
method: ctx.request.body.method || 'GET',
headers: ctx.request.body.headers || {},
data: ctx.request.body.data || null,
timeout: ctx.request.body.timeout || 30000,
transform: ctx.request.body.transform || null
});
}