Skip to content
登录/注册

流式响应 (Streaming)

流式响应是一种实时输出 AI 生成内容的方式,就像在聊天界面中看到的"打字效果"一样。海鲸AI 支持所有模型的流式输出。

什么是流式响应?

与传统方式等待模型生成完整内容后一次性返回不同,流式响应会将生成的内容分成小块(chunks)实时推送给客户端。这样可以:

  • 提升用户体验 - 用户无需等待,可以立即看到生成过程
  • 适用聊天场景 - 模拟真实对话的打字效果
  • 降低感知延迟 - 让应用感觉更快速、更流畅

如何启用流式响应?

只需在请求中设置 stream: true 参数即可。

以下是完整的代码示例:

python
import requests
import json

question = "如何建造世界上最高的建筑?"

# 海鲸AI API 配置
url = "https://api.atalk-ai.com/v2/chat/completions"
headers = {
  "Authorization": "<API_KEY>",  # 替换为您的 API 密钥
  "Content-Type": "application/json"
}

# 请求参数,注意设置 stream=True
payload = {
  "model": "gpt-4o",
  "messages": [{"role": "user", "content": question}],
  "stream": True  # 启用流式输出
}

# 使用缓冲区处理流式数据
buffer = ""
with requests.post(url, headers=headers, json=payload, stream=True) as r:
  for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
    buffer += chunk
    while True:
      try:
        # 查找完整的 SSE(Server-Sent Events)行
        line_end = buffer.find('\n')
        if line_end == -1:
          break

        line = buffer[:line_end].strip()
        buffer = buffer[line_end + 1:]

        # 解析以 "data: " 开头的数据行
        if line.startswith('data: '):
          data = line[6:]
          if data == '[DONE]':  # 流结束标志
            break

          try:
            data_obj = json.loads(data)
            content = data_obj["choices"][0]["delta"].get("content")
            if content:
              print(content, end="", flush=True)  # 实时输出内容
          except json.JSONDecodeError:
            pass
      except Exception:
        break
js
const question = '如何建造世界上最高的建筑?'

// 发送流式请求
const response = await fetch('https://api.atalk-ai.com/v2/chat/completions', {
  method: 'POST',
  headers: {
    Authorization: '<API_KEY>', // 替换为您的 API 密钥
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    model: 'gpt-4o',
    messages: [{role: 'user', content: question}],
    stream: true, // 启用流式输出
  }),
})

// 获取响应流读取器
const reader = response.body?.getReader()
if (!reader) {
  throw new Error('无法读取响应体')
}

const decoder = new TextDecoder()
let buffer = ''

try {
  while (true) {
    const {done, value} = await reader.read()
    if (done) break

    // 将新数据块添加到缓冲区
    buffer += decoder.decode(value, {stream: true})

    // 处理缓冲区中的完整行
    while (true) {
      const lineEnd = buffer.indexOf('\n')
      if (lineEnd === -1) break

      const line = buffer.slice(0, lineEnd).trim()
      buffer = buffer.slice(lineEnd + 1)

      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') break // 流结束标志

        try {
          const parsed = JSON.parse(data)
          const content = parsed.choices[0].delta.content
          if (content) {
            console.log(content) // 实时输出内容
          }
        } catch (e) {
          // 忽略无效的 JSON
        }
      }
    }
  }
} finally {
  reader.cancel() // 清理资源
}

流式响应的技术细节

数据格式:流式响应使用 SSE (Server-Sent Events) 协议,每行数据以 data: 开头。

结束标志:当收到 data: [DONE] 时,表示流式输出结束。

增量内容:每个数据块包含 delta 对象,其中的 content 字段是本次新增的内容片段。

取消流式请求

您可以随时中止连接来取消流式请求。对于支持的服务商,这会立即停止模型生成并停止计费。

以下是如何实现流式取消:

python
import requests
from threading import Event, Thread

def stream_with_cancellation(prompt: str, cancel_event: Event):
    """支持取消的流式请求函数"""
    with requests.Session() as session:
        response = session.post(
            "https://api.atalk-ai.com/v2/chat/completions",
            headers={"Authorization": "<API_KEY>"},
            json={
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
                "stream": True
            },
            stream=True
        )

        try:
            for line in response.iter_lines():
                # 检查是否需要取消
                if cancel_event.is_set():
                    response.close()
                    print("\n流已取消")
                    return
                if line:
                    print(line.decode(), end="", flush=True)
        finally:
            response.close()

# 使用示例
cancel_event = Event()
stream_thread = Thread(
    target=lambda: stream_with_cancellation("写一个故事", cancel_event)
)
stream_thread.start()

# 在需要时取消流式请求
# cancel_event.set()
js
// 创建 AbortController 用于取消请求
const controller = new AbortController()

try {
  const response = await fetch('https://api.atalk-ai.com/v2/chat/completions', {
    method: 'POST',
    headers: {
      Authorization: '<API_KEY>',
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      model: 'gpt-4o',
      messages: [{role: 'user', content: '写一个故事'}],
      stream: true,
    }),
    signal: controller.signal, // 传入取消信号
  })

  // 处理流式响应...
  const reader = response.body?.getReader()
  // ... 省略流处理代码
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('流已取消')
  } else {
    throw error
  }
}

// 在需要时取消流式请求
// controller.abort();

TIP

重要提示

  • 取消功能仅适用于流式请求(stream: true
  • 部分服务商可能不支持即时取消,模型会继续处理直到完成
  • 及时取消可以节省 token 费用

流式响应的错误处理

在流式传输过程中可能会遇到两种类型的错误,海鲸AI 会根据错误发生的时机采用不同的处理方式。

情况一:流开始前的错误

如果在任何内容流式传输到客户端之前发生错误,您会收到标准的 HTTP 错误响应(带有相应的状态码)。

错误格式

js
{
  "error": {
    "code": 400,
    "message": "指定的模型无效"
  }
}

常见 HTTP 状态码

  • 400 - 错误请求(无效参数)
  • 401 - 未授权(API 密钥无效)
  • 402 - 需要付款(积分不足)
  • 429 - 请求过多(触发速率限制)
  • 502 - 网关错误(服务商错误)
  • 503 - 服务不可用(无可用服务商)

情况二:流传输中途的错误

如果在已经开始流式传输内容后发生错误,由于 HTTP 状态码已经发送(200 OK),海鲸AI 无法修改状态码。此时错误会以 SSE 事件的形式发送。

错误格式

js
data: {
  "id": "cmpl-abc123",
  "object": "chat.completion.chunk",
  "created": 1234567890,
  "model": "gpt-3.5-turbo",
  "error": {
    "code": "server_error",
    "message": "服务商意外断开连接"
  },
  "choices": [{
    "index": 0,
    "delta": {"content": ""},
    "finish_reason": "error"
  }]
}

中途错误的特征

  • 错误信息在响应的顶层 error 字段中
  • choices 数组中的 finish_reason"error",用于正确终止流
  • HTTP 状态码仍然是 200 OK(因为响应头已经发送)
  • 收到错误事件后,流会立即终止

完整的错误处理示例

以下代码展示了如何正确处理这两种类型的错误:

python
import requests
import json

def stream_with_error_handling(prompt):
    response = requests.post(
        'https://api.atalk-ai.com/v2/chat/completions',
        headers={'Authorization': '<API_KEY>'},
        json={
            'model': 'gpt-4o',
            'messages': [{'role': 'user', 'content': prompt}],
            'stream': True
        },
        stream=True
    )

    # 第一步:检查初始 HTTP 状态(流开始前的错误)
    if response.status_code != 200:
        error_data = response.json()
        print(f"错误:{error_data['error']['message']}")
        return

    # 第二步:处理流式响应并捕获中途错误
    for line in response.iter_lines():
        if line:
            line_text = line.decode('utf-8')
            if line_text.startswith('data: '):
                data = line_text[6:]
                if data == '[DONE]':
                    break

                try:
                    parsed = json.loads(data)

                    # 检查是否有中途错误
                    if 'error' in parsed:
                        print(f"\n流式错误:{parsed['error']['message']}")
                        # 检查 finish_reason 确认错误终止
                        if parsed.get('choices', [{}])[0].get('finish_reason') == 'error':
                            print("流因错误而终止")
                        break

                    # 正常处理内容
                    content = parsed['choices'][0]['delta'].get('content')
                    if content:
                        print(content, end='', flush=True)

                except json.JSONDecodeError:
                    pass  # 忽略 JSON 解析错误
js
async function streamWithErrorHandling(prompt: string) {
  const response = await fetch(
    'https://api.atalk-ai.com/v2/chat/completions',
    {
      method: 'POST',
      headers: {
        'Authorization': '<API_KEY>',
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: prompt }],
        stream: true,
      }),
    }
  );

  // 第一步:检查初始 HTTP 状态(流开始前的错误)
  if (!response.ok) {
    const error = await response.json();
    console.error(`错误:${error.error.message}`);
    return;
  }

  const reader = response.body?.getReader();
  if (!reader) throw new Error('无响应体');

  const decoder = new TextDecoder();
  let buffer = '';

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });

      while (true) {
        const lineEnd = buffer.indexOf('\n');
        if (lineEnd === -1) break;

        const line = buffer.slice(0, lineEnd).trim();
        buffer = buffer.slice(lineEnd + 1);

        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') return;

          try {
            const parsed = JSON.parse(data);

            // 检查是否有中途错误
            if (parsed.error) {
              console.error(`流式错误:${parsed.error.message}`);
              // 检查 finish_reason 确认错误终止
              if (parsed.choices?.[0]?.finish_reason === 'error') {
                console.log('流因错误而终止');
              }
              return;
            }

            // 正常处理内容
            const content = parsed.choices[0].delta.content;
            if (content) {
              console.log(content);
            }
          } catch (e) {
            // 忽略 JSON 解析错误
          }
        }
      }
    }
  } finally {
    reader.cancel();
  }
}