登录/注册
流式响应 (Streaming)
流式响应是一种实时输出 AI 生成内容的方式,就像在聊天界面中看到的"打字效果"一样。海鲸AI 支持所有模型的流式输出。
什么是流式响应?
与传统方式等待模型生成完整内容后一次性返回不同,流式响应会将生成的内容分成小块(chunks)实时推送给客户端。这样可以:
- 提升用户体验 - 用户无需等待,可以立即看到生成过程
- 适用聊天场景 - 模拟真实对话的打字效果
- 降低感知延迟 - 让应用感觉更快速、更流畅
如何启用流式响应?
只需在请求中设置 stream: true 参数即可。
以下是完整的代码示例:
python
import requests
import json
question = "如何建造世界上最高的建筑?"
# 海鲸AI API 配置
url = "https://api.atalk-ai.com/v2/chat/completions"
headers = {
"Authorization": "<API_KEY>", # 替换为您的 API 密钥
"Content-Type": "application/json"
}
# 请求参数,注意设置 stream=True
payload = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": question}],
"stream": True # 启用流式输出
}
# 使用缓冲区处理流式数据
buffer = ""
with requests.post(url, headers=headers, json=payload, stream=True) as r:
for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
buffer += chunk
while True:
try:
# 查找完整的 SSE(Server-Sent Events)行
line_end = buffer.find('\n')
if line_end == -1:
break
line = buffer[:line_end].strip()
buffer = buffer[line_end + 1:]
# 解析以 "data: " 开头的数据行
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]': # 流结束标志
break
try:
data_obj = json.loads(data)
content = data_obj["choices"][0]["delta"].get("content")
if content:
print(content, end="", flush=True) # 实时输出内容
except json.JSONDecodeError:
pass
except Exception:
breakjs
const question = '如何建造世界上最高的建筑?'
// 发送流式请求
const response = await fetch('https://api.atalk-ai.com/v2/chat/completions', {
method: 'POST',
headers: {
Authorization: '<API_KEY>', // 替换为您的 API 密钥
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-4o',
messages: [{role: 'user', content: question}],
stream: true, // 启用流式输出
}),
})
// 获取响应流读取器
const reader = response.body?.getReader()
if (!reader) {
throw new Error('无法读取响应体')
}
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const {done, value} = await reader.read()
if (done) break
// 将新数据块添加到缓冲区
buffer += decoder.decode(value, {stream: true})
// 处理缓冲区中的完整行
while (true) {
const lineEnd = buffer.indexOf('\n')
if (lineEnd === -1) break
const line = buffer.slice(0, lineEnd).trim()
buffer = buffer.slice(lineEnd + 1)
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') break // 流结束标志
try {
const parsed = JSON.parse(data)
const content = parsed.choices[0].delta.content
if (content) {
console.log(content) // 实时输出内容
}
} catch (e) {
// 忽略无效的 JSON
}
}
}
}
} finally {
reader.cancel() // 清理资源
}流式响应的技术细节
数据格式:流式响应使用 SSE (Server-Sent Events) 协议,每行数据以 data: 开头。
结束标志:当收到 data: [DONE] 时,表示流式输出结束。
增量内容:每个数据块包含 delta 对象,其中的 content 字段是本次新增的内容片段。
取消流式请求
您可以随时中止连接来取消流式请求。对于支持的服务商,这会立即停止模型生成并停止计费。
以下是如何实现流式取消:
python
import requests
from threading import Event, Thread
def stream_with_cancellation(prompt: str, cancel_event: Event):
"""支持取消的流式请求函数"""
with requests.Session() as session:
response = session.post(
"https://api.atalk-ai.com/v2/chat/completions",
headers={"Authorization": "<API_KEY>"},
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"stream": True
},
stream=True
)
try:
for line in response.iter_lines():
# 检查是否需要取消
if cancel_event.is_set():
response.close()
print("\n流已取消")
return
if line:
print(line.decode(), end="", flush=True)
finally:
response.close()
# 使用示例
cancel_event = Event()
stream_thread = Thread(
target=lambda: stream_with_cancellation("写一个故事", cancel_event)
)
stream_thread.start()
# 在需要时取消流式请求
# cancel_event.set()js
// 创建 AbortController 用于取消请求
const controller = new AbortController()
try {
const response = await fetch('https://api.atalk-ai.com/v2/chat/completions', {
method: 'POST',
headers: {
Authorization: '<API_KEY>',
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-4o',
messages: [{role: 'user', content: '写一个故事'}],
stream: true,
}),
signal: controller.signal, // 传入取消信号
})
// 处理流式响应...
const reader = response.body?.getReader()
// ... 省略流处理代码
} catch (error) {
if (error.name === 'AbortError') {
console.log('流已取消')
} else {
throw error
}
}
// 在需要时取消流式请求
// controller.abort();TIP
重要提示:
- 取消功能仅适用于流式请求(
stream: true) - 部分服务商可能不支持即时取消,模型会继续处理直到完成
- 及时取消可以节省 token 费用
流式响应的错误处理
在流式传输过程中可能会遇到两种类型的错误,海鲸AI 会根据错误发生的时机采用不同的处理方式。
情况一:流开始前的错误
如果在任何内容流式传输到客户端之前发生错误,您会收到标准的 HTTP 错误响应(带有相应的状态码)。
错误格式:
js
{
"error": {
"code": 400,
"message": "指定的模型无效"
}
}常见 HTTP 状态码:
400- 错误请求(无效参数)401- 未授权(API 密钥无效)402- 需要付款(积分不足)429- 请求过多(触发速率限制)502- 网关错误(服务商错误)503- 服务不可用(无可用服务商)
情况二:流传输中途的错误
如果在已经开始流式传输内容后发生错误,由于 HTTP 状态码已经发送(200 OK),海鲸AI 无法修改状态码。此时错误会以 SSE 事件的形式发送。
错误格式:
js
data: {
"id": "cmpl-abc123",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-3.5-turbo",
"error": {
"code": "server_error",
"message": "服务商意外断开连接"
},
"choices": [{
"index": 0,
"delta": {"content": ""},
"finish_reason": "error"
}]
}中途错误的特征:
- 错误信息在响应的顶层
error字段中 choices数组中的finish_reason为"error",用于正确终止流- HTTP 状态码仍然是 200 OK(因为响应头已经发送)
- 收到错误事件后,流会立即终止
完整的错误处理示例
以下代码展示了如何正确处理这两种类型的错误:
python
import requests
import json
def stream_with_error_handling(prompt):
response = requests.post(
'https://api.atalk-ai.com/v2/chat/completions',
headers={'Authorization': '<API_KEY>'},
json={
'model': 'gpt-4o',
'messages': [{'role': 'user', 'content': prompt}],
'stream': True
},
stream=True
)
# 第一步:检查初始 HTTP 状态(流开始前的错误)
if response.status_code != 200:
error_data = response.json()
print(f"错误:{error_data['error']['message']}")
return
# 第二步:处理流式响应并捕获中途错误
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith('data: '):
data = line_text[6:]
if data == '[DONE]':
break
try:
parsed = json.loads(data)
# 检查是否有中途错误
if 'error' in parsed:
print(f"\n流式错误:{parsed['error']['message']}")
# 检查 finish_reason 确认错误终止
if parsed.get('choices', [{}])[0].get('finish_reason') == 'error':
print("流因错误而终止")
break
# 正常处理内容
content = parsed['choices'][0]['delta'].get('content')
if content:
print(content, end='', flush=True)
except json.JSONDecodeError:
pass # 忽略 JSON 解析错误js
async function streamWithErrorHandling(prompt: string) {
const response = await fetch(
'https://api.atalk-ai.com/v2/chat/completions',
{
method: 'POST',
headers: {
'Authorization': '<API_KEY>',
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
}),
}
);
// 第一步:检查初始 HTTP 状态(流开始前的错误)
if (!response.ok) {
const error = await response.json();
console.error(`错误:${error.error.message}`);
return;
}
const reader = response.body?.getReader();
if (!reader) throw new Error('无响应体');
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) break;
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
// 检查是否有中途错误
if (parsed.error) {
console.error(`流式错误:${parsed.error.message}`);
// 检查 finish_reason 确认错误终止
if (parsed.choices?.[0]?.finish_reason === 'error') {
console.log('流因错误而终止');
}
return;
}
// 正常处理内容
const content = parsed.choices[0].delta.content;
if (content) {
console.log(content);
}
} catch (e) {
// 忽略 JSON 解析错误
}
}
}
}
} finally {
reader.cancel();
}
}