Skip to content

SSE与流式响应实践避坑

1、介绍SSE

SSE(Server-Sent Events) 是一种基于HTTP协议的服务器推送技术,允许服务器实时向客户端发送事件。与WebSocket不同,SSE是单向的,服务器可以向客户端发送数据,但客户端不能向服务器发送数据。

SSE的主要特点包括:

  • 实时更新:服务器可以主动推送信息到客户端,适用于需要实时数据更新的应用场景。
  • 简单易用:使用HTTP协议,易于实现和集成。
  • 轻量级:相较于WebSocket,SSE的实现更为简单,适合于不需要双向通信的场景。

2、使用

既然 SSE本质是一种 基于HTTP 协议的技术,那么使用也很简单,分为客户端和服务器端

- 服务器端

以nodejs 最为服务器端举例

服务器端只需要简单几步就能开启响应

js
var http = require("http");

http.createServer(function (req, res) {
  var fileName = "." + req.url;

  if (fileName === "./stream") {
    // 1. 设置响应头
    res.writeHead(200, {
      "Content-Type":"text/event-stream", // 必须
      "Cache-Control":"no-cache",         // 避免消息被类似nginx或网关之类的中间层以及浏览器缓存,导致在前端表现为最终才一次性响应
      "Connection":"keep-alive",          // 开启长连接
      "Access-Control-Allow-Origin": '*', // 跨域
    });

    // 2. 数据响应

    // 服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。
    // 两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
    res.write("retry: 10000\n");

    // event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。
    res.write("event: connecttime\n");

    // 数据内容用data字段表示。如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。
    res.write("data: " + (new Date()) + "\n\n");
    res.write("data: " + (new Date()) + "\n\n");

    interval = setInterval(function () {
      res.write("data: " + (new Date()) + "\n\n");
    }, 1000);

    // 当客户端断开连接后,服务器停止推送
    req.connection.addListener("close", function () {
      clearInterval(interval);
    }, false);
  }
}).listen(9527, "127.0.0.1");

- 客户端

正常情况下,客户端用 EventSource 对象就可以

js
// 下面的代码可以检测浏览器是否支持 SSE。
if ('EventSource' in window) {
  // 使用 SSE 时,浏览器首先生成一个EventSource实例,向服务器发起连接。
  var source = new EventSource(url);
  // 上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。
  // var source = new EventSource(url, { withCredentials: true });
}

EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。

  • 0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。
  • 1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。
  • 2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。
  1. 连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数。
js
source.onopen = function (event) {
  // ...
};

// 另一种写法
source.addEventListener('open', function (event) {
  // ...
}, false);
  1. 客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性的回调函数。
js
source.onmessage = function (event) {
  var data = event.data;
  // handle message
};

// 另一种写法
source.addEventListener('message', function (event) {
  var data = event.data; // 事件对象的data属性就是服务器端传回的数据(文本格式)。
  // handle message
}, false);
  1. 如果发生通信错误(比如连接中断),就会触发error事件,可以在onerror属性定义回调函数。
js
source.onerror = function (event) {
  // handle error event
};

// 另一种写法
source.addEventListener('error', function (event) {
  // handle error event
}, false);
  1. close方法用于关闭 SSE 连接。
js
source.close();
  1. 自定义事件 默认情况下,服务器发来的数据,总是触发浏览器EventSource实例的message事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发message事件。
js
// 浏览器对 SSE 的foo事件进行监听,需要服务器发送 foo 事件才行,(见上文服务器代码的第2点:数据响应)
source.addEventListener('foo', function (event) {
  var data = event.data;
  // handle message
}, false);

3、EventSource局限性

EventSource API 是用于服务器发送事件(SSE)的客户端接口,有一些局限性:

  1. 它是单向通信的:只能从服务器端向客户端发送数据,若需要客户端发送数据给服务器,只有在创建连接时候,通过URL传递少量参数给服务器

  2. 它不支持发起 POST 请求,默认为GET

这对于像需要与AI进行交互的场景,用户需要发送数据给服务端,服务端拿到数据后请求大模型接口后响应数据的场景不太适合,可能下面的【流式传输】方案更适合

4、流式传输 (Stream)

首先要明确一点:SSE 是实现流式响应的一种非常流行和高效的方式。

  • SSE 是一种具体的、标准化的 Web 技术协议,用于实现从服务器到客户端的单向流式通信。

  • 流式响应是一种更通用的概念和模式,指的是服务器逐步生成并发送数据,而不是一次性发送全部数据。

- 服务器端

node为例

流式传输的服务器端与SSE的服务器端没没什么太大的区别,只不过SSE服务器端是在普通流式传输的基础上规定了一些特定的格式,

  1. 主要区别在于 HTTP 响应头 和 发送数据的格式
  • 普通流式传输响应头通常只需 Transfer-Encoding: chunkedContent-Type 根据流的内容而定(如 application/octet-stream, text/plain)。
  • 消息格式无格式要求。直接发送原始的数据块(chunks)但需要客户端配合解析,更加灵活,而 SSE 格式固定,专为“事件消息”设计,与浏览器 EventSource API 无缝集成,开箱即用。
  1. 普通流式传输可以为任意的请求方式 GET、POST、PUT、....

普通流式传输代码示例:

js
const http = require('http');

const server = http.createServer((req, res) => {
  if (req.url === '/stream') {
    // 设置头部:表明是分块传输的普通文本
    res.writeHead(200, {
      'Content-Type': 'text/plain',
      'Transfer-Encoding': 'chunked'
    });

    let count = 0;
    const intervalId = setInterval(() => {
      count++;
      // 关键区别点:直接写入原始数据
      res.write(`Chunk ${count}: Current time is ${new Date().toISOString()}\n`);

      if (count === 5) {
        clearInterval(intervalId);
        res.end(); // 结束流
      }
    }, 1000);

    // 处理连接中断
    req.on('close', () => {
      clearInterval(intervalId);
      res.end();
    });
  } else {
    res.statusCode = 404;
    res.end('Not found');
  }
});

server.listen(3000, () => {
  console.log('Server listening on port 3000');
});

- 客户端

流式传输客户端就类似于常规的http请求,只不过有一些需要主要特别注意的点:

  1. HTTP 分块传输编码 (Chunked Transfer Encoding):这是流式传输的基础。服务器必须支持并启用此功能,才能将响应分成多个“块”陆续发送。客户端库通常会自动处理这块逻辑。

  2. 正确读取响应体:不要使用那些期望一次性返回完整响应体的方法(如 .json() 或 .text()),它们会等待流结束,从而失去流式的效果。必须通过响应体的 getReader() 方法获取 ReadableStreamDefaultReader 来逐块读取。

  3. 错误处理与中断:流式请求可能需要较长时间,因此要妥善处理超时、网络错误以及用户主动取消的情况。使用 AbortController 可以中断请求和流的读取。

  4. 内存管理:流式传输的一大优势是能降低内存消耗,避免大响应体阻塞客户端。确保在读取每个数据块后及时处理,并不再需要时释放资源。

客户端方案对比

方案EventSourceFetch API + ReadableStreamfetch-event-source
特点简单粗暴功能强大但复杂开箱即用
优点• 浏览器自带,无需安装
• 断网自动重连
• 支持所有请求方式
• 可自定义请求头
• 支持二进制数据
• 功能全面
• 自动处理重连
• 企业级稳定
缺点• 只能GET请求
• 不能传自定义头
• 只支持UTF-8
• IE不支持
• 需要手写错误处理
• 代码量大
• 需要安装依赖
• 包体积大
适用场景简单推送复杂需求企业项目
选型建议新手首选高手专用推荐使用
实际案例-dify、chatbot-ui京东 joyagent-jdgenie

1)Fetch API + ReadableStream 代码示例

js
// 发起流式请求
async function startStream() {
  try {
    // 发起POST请求,支持自定义请求头
    // 此处await是为了等待响应头,并非响应体
    const response = await fetch('/api/chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer your-token' // 支持认证
      },
      body: JSON.stringify({
        message: '你好,请帮我写代码'
      })
    });

    // 检查响应状态
    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }

    // 获取可读流
    const reader = response.body.getReader();
    const decoder = new TextDecoder(); // 用于解码二进制数据

    // 循环读取数据块
    while (true) {
      const { done, value } = await reader.read();
      
      // 流结束
      if (done) {
        console.log('流式传输完成');
        break;
      }

      // 解码数据并处理
      const chunk = decoder.decode(value, { stream: true });
      console.log('收到数据:', chunk);
      
      // 这里可以更新UI,比如逐字显示(需要注意处理上一个chunk的完整性,例如一个中文字符被两个chunk分成了两部分,会导致乱码)
      updateUI(chunk);
    }
  } catch (error) {
    console.error('流式请求失败:', error);
    // 需要手动处理错误和重连
    handleError(error);
  }
}

// 手动错误处理和重连
function handleError(error) {
  console.log('处理错误,准备重连...');
  // 实现重连逻辑
  setTimeout(() => {
    startStream();
  }, 1000);
}

// 更新UI的函数
function updateUI(chunk) {
  const output = document.getElementById('output');
  output.textContent += chunk;
}

2)fetch-event-source 代码示例

js
import { fetchEventSource } from '@microsoft/fetch-event-source';

// 使用微软的fetch-event-source库
async function startStreamWithLibrary() {
  try {
    await fetchEventSource('/api/chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer your-token' // 支持认证
      },
      body: JSON.stringify({
        message: '你好,请帮我写代码'
      }),
      
      // 连接建立时的回调
      async onopen(response) {
        if (response.ok) {
          console.log('连接已建立');
          return; // 一切正常,继续
        } else {
          throw new Error(`Server error: ${response.status}`);
        }
      },
      
      // 收到消息时的回调
      onmessage(msg) {
        try {
          // 自动解析JSON数据
          const data = msg.data ? JSON.parse(msg.data) : msg.data;
          console.log('收到消息:', data);
          
          // 更新UI
          updateUI(data);
        } catch (err) {
          console.error('解析消息失败:', err);
        }
      },
      
      // 连接关闭时的回调
      onclose() {
        console.log('连接已关闭');
        // 库会自动处理重连
      },
      
      // 错误处理回调
      onerror(err) {
        console.error('发生错误:', err);
        // 库会自动重试,抛出错误会停止重试
        throw err;
      }
    });
  } catch (error) {
    console.error('流式请求失败:', error);
  }
}

// 更新UI的函数
function updateUI(data) {
  const output = document.getElementById('output');
  if (typeof data === 'string') {
    output.textContent += data;
  } else {
    output.textContent += data.content || JSON.stringify(data);
  }
}

3)封装fetch-event-source 以及使用

  • 封装
js
// src/utils/sseUtils.js
import { fetchEventSource } from '@microsoft/fetch-event-source';

/**
 * 流式请求封装
 * @param {string} url - 请求地址
 * @param {Object} options - 配置选项
 * @param {Object} options.body - 请求体
 * @param {Object} options.headers - 请求头
 * @param {function} options.onMessage - 消息处理回调
 * @param {function} options.onOpen - 连接打开回调
 * @param {function} options.onClose - 连接关闭回调
 * @param {function} options.onError - 错误处理回调
 * @returns {Promise} 返回一个可取消的Promise
 */
export const createSSEConnection = (url, {
  body,
  headers = {},
  onMessage,
  onOpen,
  onClose,
  onError
}) => {
  // 创建一个AbortController用于取消请求
  const ctrl = new AbortController();
  
  // 返回一个包含取消方法的Promise
  return {
    promise: fetchEventSource(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        ...headers
      },
      body: JSON.stringify(body),
      signal: ctrl.signal,
      async onopen(response) {
        if (response.ok) {
          onOpen?.();
          return; // 一切正常,继续
        }
        throw new Error(`Server error: ${response.status}`);
      },
      onmessage(msg) {
        try {
          // 如果数据是JSON格式则解析,否则直接使用
          const data = msg.data ? JSON.parse(msg.data) : msg.data;
          onMessage?.(data);
        } catch (err) {
          console.error('Failed to parse message data', err);
        }
      },
      onclose() {
        onClose?.();
      },
      onerror(err) {
        onError?.(err);
        throw err; // 重新抛出以停止重试
      }
    }),
    cancel: () => ctrl.abort()
  };
};
  • 在vue3中使用
html
<script setup>
import { ref, onUnmounted } from 'vue';
import { createSSEConnection } from '@/utils/sseUtils';

const streamData = ref(''); // 存储流式数据
const isLoading = ref(false); // 加载状态
const error = ref(null); // 错误信息
let sseConnection = null; // 存储SSE连接

// 发起流式请求
const startStream = async () => {
  try {
    // 重置状态
    streamData.value = '';
    isLoading.value = true;
    error.value = null;
    
    // 创建SSE连接
    sseConnection = createSSEConnection('https://api.example.com/stream', {
      body: { query: '获取流式数据' },
      onMessage: (data) => {
        // 企业技巧1: 使用函数式更新避免重复触发响应式
        streamData.value += data;
        
        // 企业技巧2: 自动滚动到底部 (适用于聊天场景)
        nextTick(() => {
          const container = document.getElementById('stream-container');
          if (container) {
            container.scrollTop = container.scrollHeight;
          }
        });
      },
      onOpen: () => {
        console.log('连接已建立');
      },
      onClose: () => {
        isLoading.value = false;
        console.log('连接已关闭');
      },
      onError: (err) => {
        isLoading.value = false;
        error.value = err.message;
        console.error('发生错误:', err);
      }
    });
    
    await sseConnection.promise;
  } catch (err) {
    if (err.name !== 'AbortError') {
      error.value = err.message;
    }
  } finally {
    isLoading.value = false;
  }
};

// 停止流式请求
const stopStream = () => {
  if (sseConnection) {
    sseConnection.cancel();
    sseConnection = null;
  }
};

// 组件卸载时自动取消请求
onUnmounted(() => {
  stopStream();
});

// 企业技巧3: 使用计算属性处理流式数据
const processedStreamData = computed(() => {
  return streamData.value
    .replace(/\n/g, '<br>') // 换行转换
    .replace(/\t/g, '&nbsp;&nbsp;&nbsp;&nbsp;'); // 制表符转换
});
</script>

<template>
  <div class="stream-container">
    <h2>流式数据演示</h2>
    
    <!-- 控制按钮 -->
    <div class="controls">
      <button @click="startStream" :disabled="isLoading">开始流式请求</button>
      <button @click="stopStream" :disabled="!isLoading">停止</button>
    </div>
    
    <!-- 加载状态 -->
    <div v-if="isLoading" class="loading">加载中...</div>
    
    <!-- 错误显示 -->
    <div v-if="error" class="error">{{ error }}</div>
    
    <!-- 流式数据展示 -->
    <div 
      id="stream-container"
      class="stream-content"
      v-html="processedStreamData"
    ></div>
    
    <!-- 企业技巧4: 显示数据统计 -->
    <div class="stats">
      已接收: {{ streamData.length }} 字符 | 
      行数: {{ (streamData.match(/\n/g) || []).length + 1 }}
    </div>
  </div>
</template>

5、EGG 服务器流式响应

- ctx.res.write

EggJS 中使用 ctx.res.write 的主要场景:

  • 实时数据推送:SSE、WebSocket 降级
  • 大文件处理:文件下载、视频流传输
  • AI 应用:流式对话、文本生成
  • 系统监控:实时日志、进度更新
  • 数据库操作:大数据量查询结果
  • 错误处理:重连机制、异常恢复

ctx.res.write 不能实现效果原因

有时候在使用 ctx.res.write可能不能正确的实现效果,原因可能有这些

  1. http响应头问题

EggJS 会自动设置 Content-Length 头,但流式响应时长度未知,导致客户端等待不准确。

js
// ❌ 问题代码
async badControl() {
  const { ctx } = this;
  
  // 设置响应头
  ctx.set('Content-Type', 'text/plain');
  ctx.set('Cache-Control', 'no-cache');
  
  // 直接写入数据
  ctx.res.write('Hello');
  ctx.res.write('World');
  ctx.res.end();
}
  1. 中间件干扰
js
// EggJS 中间件会干扰流式响应
app.use(async (ctx, next) => {
  // 这个中间件可能会缓存响应
  await next();
  
  // 问题:中间件可能修改响应头或内容
  if (ctx.body) {
    ctx.body = 'modified: ' + ctx.body;
  }
});
  1. 响应体处理机制冲突
js
// ❌ 错误方式
async badStream() {
  const { ctx } = this;
  
  // EggJS 期望通过 ctx.body 设置响应
  ctx.body = 'some data'; // 这会与 ctx.res.write 冲突
  
  // 直接写入会绕过 EggJS 的响应处理机制
  ctx.res.write('stream data');
}

解决方案

  1. 正确设置响应头与状态码 在正常情况下,需要当单独设置 200 状态码, 否则流式响应 egg会默认 404,ctx.status=200
  • sse响应头
js
  // 设置 SSE 响应头
  ctx.set({
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*', // 设置跨域
    'Access-Control-Allow-Headers': 'Cache-Control'
  });
  • 设置分块传输响应头
js
  // 设置分块传输响应头
  ctx.set({
    'Content-Type': 'text/plain',
    'Transfer-Encoding': 'chunked',
    'Cache-Control': 'no-cache'
  });
  1. 禁用 EggJS 的响应体处理
js
ctx.respond = false;
  1. 自定义中间件处理

若有自定义中间件,则在中间件中需要区分流式响应的场景

js
// ✅ 自定义中间件处理流式响应
module.exports = () => {
  return async (ctx, next) => {
    await next();
    
    // 检查是否是流式响应
    if (ctx.streaming) {
      // 跳过默认的响应处理
      return;
    }
  };
};

代码示例

js
async stream() {
  const { ctx } = this;

  try {
    ctx.set({
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    });
    ctx.status = 200;
    // 禁用框架默认的响应处理,直接使用原生 Node.js 响应
    ctx.respond = false;

    // 流式数据处理
    const stream = fs.createReadStream("./zhihu.txt", {
      encoding: "utf-8",
      highWaterMark: 100, // chunk 块的大小
    });

    stream.on("data", (chunk) => {
      ctx.res.write(`data: ${chunk}\n\n`);
    });

    stream.on("end", () => {
      ctx.res.write("data: [DONE]\n\n");
      ctx.res.end();
    });

    stream.on("error", (err) => {
      ctx.res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
      ctx.res.end();
    });

    ctx.req.on("close", () => {
      stream.destroy();
    });
  } catch (error) {
    ctx.logger.error("Stream error:", error);
    ctx.res.write(`data: ${JSON.stringify({ error: "Internal error" })}\n\n`);
    ctx.res.end();
  }
}

- PassThrough 中转流式响应

PassThrough流 是Node.js中的一种双工流(duplex stream),它在数据流传递过程中起到无操作的中间层,将数据从可读流传递到可写流,同时不做任何修改或处理。

相对于 ctx.res.write,在Eggjs中使用 PassThrough 流式响应更简单也更符合eggjs的设计,具体对比如下:

特性PassThroughctx.res.write
响应处理通过 EggJS 正常流程绕过 EggJS 响应机制
中间件兼容完全兼容需要特殊处理
错误处理统一错误处理需要手动处理
资源管理自动管理手动管理
代码复杂度简单复杂

代码示例

js
const { PassThrough } = require('stream');
// ✅ PassThrough - 更好的性能
async streamWithPassThrough() {
  const { ctx } = this;
  
  // 可以设置缓冲区大小,配置可选
  const passThrough = new PassThrough({
    highWaterMark: 16 * 1024, // 16KB 缓冲区 
    objectMode: false
  });
  
  ctx.set({
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });
  // 直接响应stream,egg会自行处理
  ctx.body = passThrough;
  
  // 异步处理数据
  this.processDataAsync(passThrough);
}

// 数据处理,将其他数据源write进入  PassThrough
async processDataAsync(passThrough) {
  const dataSource = await this.getDataSource(); // 伪代码。获取数据源数据流,可以是三方接口,也可以是本地数据流
  
  // 支持背压控制
  dataSource.on('data', (chunk) => {
    // chhunk格式需要根据getDataSource响应的格式来处理,不一定是JSON.stringify
    if (!passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`)) {
      // 缓冲区满了,暂停数据源
      dataSource.pause();
    }
  });
  
  // 缓冲区有空间时恢复数据源
  passThrough.on('drain', () => {
    dataSource.resume();
  });
  
  dataSource.on('end', () => {
    passThrough.write('data: [DONE]\n\n');
    passThrough.end();
  });

}

6、Eggjs HttpClient中转发其他流式响应接口

- 基础示例

js
  const { PassThrough } = require('stream');
// 代理 SSE 流式响应
async proxySSE() {
  const { ctx } = this;
  
  const passThrough = new PassThrough();
  
  // 设置 SSE 响应头
  ctx.set({
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Headers': 'Cache-Control'
  });
  
  ctx.body = passThrough;
  
  try {
    // 获取目标 URL
    const { url } = ctx.query;
    
    if (!url) {
      passThrough.write('data: {"error": "URL parameter is required"}\n\n');
      passThrough.end();
      return;
    }
    
    // 使用 HttpClient 请求目标接口
    const response = await ctx.curl(url, {
      method: 'GET',
      headers: {
        'Accept': 'text/event-stream',
        'Cache-Control': 'no-cache'
      },
      // 启用流式响应
      streaming: true
    });
    
    // 检查响应状态
    if (response.status !== 200) {
      passThrough.write(`data: {"error": "Target service error: ${response.status}"}\n\n`);
      passThrough.end();
      return;
    }
    
    // 转发流式数据
    response.res.on('data', (chunk) => {
      passThrough.write(chunk);
    });
    
    response.res.on('end', () => {
      passThrough.end();
    });
    
    response.res.on('error', (err) => {
      ctx.logger.error('Proxy stream error:', err);
      passThrough.write(`data: {"error": "${err.message}"}\n\n`);
      passThrough.end();
    });
    
  } catch (error) {
    ctx.logger.error('Proxy error:', error);
    passThrough.write(`data: {"error": "Proxy failed: ${error.message}"}\n\n`);
    passThrough.end();
  }
}

- EGG 通用流式代理 class 封装

js
const { PassThrough } = require('stream');
class StreamProxyService {
  constructor(ctx) {
    this.ctx = ctx;
    this.passThrough = new PassThrough();
  }
  
  // 设置响应头
  setHeaders(contentType = 'text/event-stream') {
    this.ctx.set({
      'Content-Type': contentType,
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'Access-Control-Allow-Origin': '*', // 视情况调整
      'Access-Control-Allow-Headers': 'Cache-Control' // 视情况调整
    });
    
    this.ctx.body = this.passThrough;
  }
  
  // 代理请求
  async proxy(options) {
    const {
      url,
      method = 'GET',
      headers = {},
      data = null,
      timeout = 30000,
      retries = 0,
      transform = null
    } = options;
    
    if (!url) {
      this.passThrough.write('data: {"error": "URL is required"}\n\n');
      this.passThrough.end();
      return;
    }
    
    try {
      // 构建请求配置
      const requestConfig = {
        method,
        headers: {
          'Accept': 'text/event-stream',
          'Cache-Control': 'no-cache',
          ...headers
        },
        streaming: true,
        timeout
      };
      
      if (data) {
        requestConfig.data = data;
      }
      
      // 发送请求
      const response = await this.ctx.curl(url, requestConfig);
      
      if (response.status !== 200) {
        this.passThrough.write(`data: {"error": "Target service error: ${response.status}"}\n\n`);
        this.passThrough.end();
        return;
      }
      
      // 处理流式数据
      this.handleStream(response.res, transform);
      
    } catch (error) {
      this.ctx.logger.error('Proxy error:', error);
      this.passThrough.write(`data: {"error": "Proxy failed: ${error.message}"}\n\n`);
      this.passThrough.end();
    }
  }
  
  // 处理流式数据
  handleStream(stream, transform) {
    if (transform) {
      // 应用转换
      const transformStream = new Transform({
        objectMode: true,
        transform(chunk, encoding, callback) {
          try {
            const data = JSON.parse(chunk.toString());
            const transformedData = transform(data);
            callback(null, transformedData);
          } catch (parseError) {
            callback(null, { content: chunk.toString() });
          }
        }
      });
      
      stream
        .pipe(transformStream)
        .on('data', (chunk) => {
          this.passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`);
        })
        .on('end', () => {
          this.passThrough.write('data: [DONE]\n\n');
          this.passThrough.end();
        })
        .on('error', (err) => {
          this.ctx.logger.error('Transform stream error:', err);
          this.passThrough.write(`data: {"error": "${err.message}"}\n\n`);
          this.passThrough.end();
        });
    } else {
      // 直接转发
      stream.on('data', (chunk) => {
        this.passThrough.write(chunk);
      });
      
      stream.on('end', () => {
        this.passThrough.end();
      });
      
      stream.on('error', (err) => {
        this.ctx.logger.error('Stream error:', err);
        this.passThrough.write(`data: {"error": "${err.message}"}\n\n`);
        this.passThrough.end();
      });
    }
  }
}

// 在控制器中使用
async proxy() {
  const { ctx } = this;
  
  const proxyService = new StreamProxyService(ctx);
  proxyService.setHeaders();
  
  await proxyService.proxy({
    url: ctx.request.body.url,
    method: ctx.request.body.method || 'GET',
    headers: ctx.request.body.headers || {},
    data: ctx.request.body.data || null,
    timeout: ctx.request.body.timeout || 30000,
    transform: ctx.request.body.transform || null
  });
}