流式响应处理
本文档整理流式响应(Streaming)的最佳实践,帮助实现更好的用户体验和更低的时延。
为什么使用流式响应
txt
┌─────────────────────────────────────────────────────┐
│ 传统响应 vs 流式响应 │
├─────────────────────────────────────────────────────┤
│ │
│ 传统响应: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 请求 → 等待 → 等待 → 等待 → 完整响应 │ │
│ │ [=============================] │ │
│ │ 用户等待整个响应 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 流式响应: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 请求 → 响应片段1 → 响应片段2 → 响应片段3 → ... │ │
│ │ [=] [=] [=] [=] [=] [=] [=] [=] [=] [=] │ │
│ │ 用户逐步看到内容 │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 优势: │
│ - 首字节时延(TTFT)更低 │
│ - 用户感知响应更快 │
│ - 可提前中断长响应 │
│ - 减少超时风险 │
│ │
└─────────────────────────────────────────────────────┘基础流式实现
OpenAI 流式调用
python
import asyncio
from typing import AsyncIterator, Optional
from dataclasses import dataclass
@dataclass
class StreamChunk:
content: str
finish_reason: Optional[str] = None
usage: Optional[dict] = None
class OpenAIStreamClient:
def __init__(self, api_key: str, model: str = "gpt-4"):
self.api_key = api_key
self.model = model
self.client = None # 初始化 OpenAI 客户端
async def stream(
self,
messages: list[dict],
temperature: float = 0.7,
max_tokens: int = 4096
) -> AsyncIterator[StreamChunk]:
"""流式生成响应"""
import openai
stream = await openai.ChatCompletion.acreate(
model=self.model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
async for chunk in stream:
# 解析 chunk
if chunk.choices:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
content = delta.content if hasattr(delta, 'content') else ""
yield StreamChunk(
content=content,
finish_reason=finish_reason
)
# 最后一个 chunk 包含 usage 信息
if hasattr(chunk, 'usage'):
yield StreamChunk(
content="",
usage=chunk.usage
)
# 使用示例
async def stream_example():
client = OpenAIStreamClient(api_key="your-api-key")
messages = [
{"role": "user", "content": "写一篇关于AI的文章"}
]
async for chunk in client.stream(messages):
if chunk.content:
print(chunk.content, end="", flush=True)
if chunk.finish_reason:
print(f"\n\n完成原因: {chunk.finish_reason}")
if chunk.usage:
print(f"Token 使用: {chunk.usage}")Claude 流式调用
python
from anthropic import AsyncAnthropic
class ClaudeStreamClient:
def __init__(self, api_key: str, model: str = "claude-3-opus-20240229"):
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
async def stream(
self,
messages: list[dict],
system: str = None,
max_tokens: int = 4096
) -> AsyncIterator[StreamChunk]:
"""流式生成响应"""
async with self.client.messages.stream(
model=self.model,
max_tokens=max_tokens,
system=system,
messages=messages
) as stream:
async for text in stream.text_stream:
yield StreamChunk(content=text)
# 获取最终消息
final_message = await stream.get_final_message()
yield StreamChunk(
content="",
finish_reason="end_turn",
usage={
"input_tokens": final_message.usage.input_tokens,
"output_tokens": final_message.usage.output_tokens
}
)
# 使用示例
async def claude_stream_example():
client = ClaudeStreamClient(api_key="your-api-key")
messages = [
{"role": "user", "content": "解释量子计算的基本原理"}
]
async for chunk in client.stream(
messages,
system="你是一个科学教育者,用简单易懂的语言解释复杂概念。"
):
print(chunk.content, end="", flush=True)流式响应处理
增量渲染
python
from dataclasses import dataclass
from typing import Optional, List, Callable
import re
@dataclass
class RenderState:
text: str = ""
in_code_block: bool = False
code_language: str = ""
in_table: bool = False
table_rows: List[str] = None
def __post_init__(self):
if self.table_rows is None:
self.table_rows = []
class IncrementalRenderer:
"""增量渲染器:处理流式内容的实时渲染"""
def __init__(self):
self.state = RenderState()
self.callbacks: List[Callable] = []
def add_callback(self, callback: Callable):
"""添加回调函数"""
self.callbacks.append(callback)
async def process_chunk(self, chunk: str):
"""处理流式 chunk"""
self.state.text += chunk
# 检测代码块
self._detect_code_block()
# 检测表格
self._detect_table()
# 触发回调
for callback in self.callbacks:
await callback(self.state)
def _detect_code_block(self):
"""检测代码块"""
# 检测代码块开始
if not self.state.in_code_block:
match = re.search(r'```(\w*)$', self.state.text)
if match:
self.state.in_code_block = True
self.state.code_language = match.group(1)
# 检测代码块结束
elif self.state.in_code_block:
if '```' in self.state.text[self.state.text.rfind('```'):]:
self.state.in_code_block = False
self.state.code_language = ""
def _detect_table(self):
"""检测表格"""
lines = self.state.text.split('\n')
table_lines = [l for l in lines if l.strip().startswith('|')]
if len(table_lines) >= 2:
self.state.in_table = True
self.state.table_rows = table_lines
def get_current_state(self) -> RenderState:
"""获取当前状态"""
return self.state
# 使用示例
async def render_stream():
client = OpenAIStreamClient(api_key="your-api-key")
renderer = IncrementalRenderer()
# 添加回调:实时更新 UI
async def update_ui(state: RenderState):
if state.in_code_block:
print(f"\r[代码块: {state.code_language}] {len(state.text)} 字符", end="")
else:
# 实时显示文本
print(state.text[-len(chunk.content):], end="", flush=True)
renderer.add_callback(update_ui)
messages = [{"role": "user", "content": "写一个 Python 快速排序实现"}]
async for chunk in client.stream(messages):
if chunk.content:
await renderer.process_chunk(chunk.content)内容缓冲与批量处理
python
from dataclasses import dataclass
from typing import AsyncIterator, List
import asyncio
@dataclass
class BufferedChunk:
content: str
is_complete: bool = False
class StreamBuffer:
"""流式缓冲器:批量处理流式内容"""
def __init__(
self,
buffer_size: int = 100,
buffer_time_ms: int = 100,
sentence_end_chars: List[str] = None
):
self.buffer_size = buffer_size
self.buffer_time_ms = buffer_time_ms
self.sentence_end_chars = sentence_end_chars or ['.', '!', '?', '。', '!', '?']
self.buffer = ""
self.last_flush = 0
async def process_stream(
self,
stream: AsyncIterator[StreamChunk]
) -> AsyncIterator[BufferedChunk]:
"""处理流式内容"""
async for chunk in stream:
if not chunk.content:
continue
self.buffer += chunk.content
# 检查是否需要刷新
should_flush = (
len(self.buffer) >= self.buffer_size or
self._ends_with_sentence() or
chunk.finish_reason
)
if should_flush:
yield BufferedChunk(
content=self.buffer,
is_complete=bool(chunk.finish_reason)
)
self.buffer = ""
# 刷新剩余内容
if self.buffer:
yield BufferedChunk(content=self.buffer, is_complete=True)
def _ends_with_sentence(self) -> bool:
"""检查是否以句子结束符结尾"""
if not self.buffer:
return False
return self.buffer[-1] in self.sentence_end_chars
# 使用示例
async def buffered_stream_example():
client = OpenAIStreamClient(api_key="your-api-key")
buffer = StreamBuffer(buffer_size=200, buffer_time_ms=100)
messages = [{"role": "user", "content": "讲述一个简短的故事"}]
stream = client.stream(messages)
async for buffered_chunk in buffer.process_stream(stream):
print(buffered_chunk.content, end="", flush=True)
if buffered_chunk.is_complete:
print("\n[完成]")中断与取消
用户中断处理
python
from dataclasses import dataclass
from typing import Optional, Callable
import asyncio
@dataclass
class StreamState:
is_running: bool = False
is_cancelled: bool = False
content: str = ""
token_count: int = 0
class InterruptibleStream:
"""可中断的流式响应"""
def __init__(self):
self.state = StreamState()
self.cancel_event = asyncio.Event()
self.on_content: Optional[Callable] = None
self.on_cancel: Optional[Callable] = None
self.on_complete: Optional[Callable] = None
async def stream(
self,
client,
messages: list[dict],
**kwargs
) -> str:
"""可中断的流式生成"""
self.state = StreamState(is_running=True)
self.cancel_event.clear()
try:
stream = client.stream(messages, **kwargs)
async for chunk in stream:
# 检查是否被取消
if self.cancel_event.is_set():
self.state.is_cancelled = True
if self.on_cancel:
await self.on_cancel(self.state)
break
# 处理内容
if chunk.content:
self.state.content += chunk.content
self.state.token_count += 1
if self.on_content:
await self.on_content(chunk.content)
self.state.is_running = False
if not self.state.is_cancelled and self.on_complete:
await self.on_complete(self.state)
return self.state.content
except Exception as e:
self.state.is_running = False
raise
def cancel(self):
"""取消流式生成"""
self.cancel_event.set()
def get_state(self) -> StreamState:
"""获取当前状态"""
return self.state
# 使用示例
async def interruptible_example():
client = OpenAIStreamClient(api_key="your-api-key")
streamer = InterruptibleStream()
# 设置回调
async def on_content(content: str):
print(content, end="", flush=True)
async def on_cancel(state: StreamState):
print(f"\n\n[用户中断] 已生成 {state.token_count} 个 token")
streamer.on_content = on_content
streamer.on_cancel = on_cancel
# 启动流式生成
messages = [{"role": "user", "content": "写一篇长文章"}]
# 模拟用户中断
async def simulate_user_interrupt():
await asyncio.sleep(2) # 2秒后中断
print("\n\n[用户按下停止按钮]")
streamer.cancel()
# 并行执行
stream_task = asyncio.create_task(streamer.stream(client, messages))
interrupt_task = asyncio.create_task(simulate_user_interrupt())
# 等待流式生成完成或被中断
await asyncio.gather(stream_task, interrupt_task, return_exceptions=True)超时控制
python
from typing import AsyncIterator, Optional
import asyncio
class TimeoutStream:
"""带超时控制的流式响应"""
def __init__(
self,
first_token_timeout: float = 10.0,
total_timeout: float = 60.0,
idle_timeout: float = 5.0
):
self.first_token_timeout = first_token_timeout
self.total_timeout = total_timeout
self.idle_timeout = idle_timeout
async def stream_with_timeout(
self,
stream: AsyncIterator[StreamChunk],
on_first_token: Optional[Callable] = None,
on_timeout: Optional[Callable] = None
) -> AsyncIterator[StreamChunk]:
"""带超时的流式处理"""
import time
start_time = time.time()
first_token_received = False
last_token_time = start_time
async def timeout_wrapper():
nonlocal first_token_received, last_token_time
async for chunk in stream:
current_time = time.time()
# 检查首 token 超时
if not first_token_received:
if current_time - start_time > self.first_token_timeout:
raise TimeoutError(f"首 token 超时 ({self.first_token_timeout}s)")
first_token_received = True
if on_first_token:
await on_first_token()
# 检查总超时
if current_time - start_time > self.total_timeout:
raise TimeoutError(f"总超时 ({self.total_timeout}s)")
# 检查空闲超时
if current_time - last_token_time > self.idle_timeout:
raise TimeoutError(f"空闲超时 ({self.idle_timeout}s)")
last_token_time = current_time
yield chunk
try:
async for chunk in timeout_wrapper():
yield chunk
except TimeoutError as e:
if on_timeout:
await on_timeout(str(e))
raise
# 使用示例
async def timeout_example():
client = OpenAIStreamClient(api_key="your-api-key")
timeout_stream = TimeoutStream(
first_token_timeout=5.0,
total_timeout=30.0,
idle_timeout=3.0
)
messages = [{"role": "user", "content": "生成一个长回答"}]
async def on_first_token():
print("[首 token 已接收]")
async def on_timeout(error: str):
print(f"[超时错误: {error}]")
try:
base_stream = client.stream(messages)
async for chunk in timeout_stream.stream_with_timeout(
base_stream,
on_first_token=on_first_token,
on_timeout=on_timeout
):
print(chunk.content, end="", flush=True)
except TimeoutError as e:
print(f"\n[处理超时: {e}]")多模态流式处理
流式图像描述
python
from typing import AsyncIterator, Tuple
from dataclasses import dataclass
@dataclass
class ImageStreamChunk:
type: str # "text" | "image_url" | "bounding_box"
content: str
bbox: Optional[Tuple[int, int, int, int]] = None
class MultiModalStreamClient:
"""多模态流式客户端"""
def __init__(self, client):
self.client = client
async def stream_image_analysis(
self,
image_url: str,
prompt: str
) -> AsyncIterator[ImageStreamChunk]:
"""流式图像分析"""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image_url}}
]
}
]
async for chunk in self.client.stream(messages):
# 解析内容
content = chunk.content
# 检测是否包含边界框信息
bbox_match = self._parse_bounding_box(content)
if bbox_match:
yield ImageStreamChunk(
type="bounding_box",
content=content,
bbox=bbox_match
)
else:
yield ImageStreamChunk(
type="text",
content=content
)
def _parse_bounding_box(self, text: str) -> Optional[Tuple[int, int, int, int]]:
"""解析边界框"""
import re
# 匹配 [x1, y1, x2, y2] 格式
match = re.search(r'\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]', text)
if match:
return tuple(int(x) for x in match.groups())
return None
# 使用示例
async def image_stream_example():
client = OpenAIStreamClient(api_key="your-api-key")
multimodal = MultiModalStreamClient(client)
image_url = "https://example.com/image.jpg"
prompt = "描述这张图片,并标注出图片中的主要物体位置。"
async for chunk in multimodal.stream_image_analysis(image_url, prompt):
if chunk.type == "text":
print(chunk.content, end="", flush=True)
elif chunk.type == "bounding_box":
print(f"\n[边界框: {chunk.bbox}]")性能优化
并行流式请求
python
from typing import List, AsyncIterator, Tuple
import asyncio
class ParallelStreamClient:
"""并行流式客户端"""
def __init__(self, client):
self.client = client
async def parallel_stream(
self,
messages_list: List[list],
max_concurrent: int = 3
) -> AsyncIterator[Tuple[int, str]]:
"""并行流式生成"""
semaphore = asyncio.Semaphore(max_concurrent)
async def stream_with_semaphore(index: int, messages: list):
async with semaphore:
async for chunk in self.client.stream(messages):
yield (index, chunk.content)
# 创建所有流
streams = [
stream_with_semaphore(i, messages)
for i, messages in enumerate(messages_list)
]
# 合并流
async for index, content in self._merge_streams(streams):
yield (index, content)
async def _merge_streams(
self,
streams: List[AsyncIterator]
) -> AsyncIterator[Tuple[int, str]]:
"""合并多个流"""
import asyncio
async def stream_to_queue(stream, queue, index):
try:
async for item in stream:
await queue.put((index, item))
finally:
await queue.put((index, None)) # 结束标记
queue = asyncio.Queue()
tasks = [
asyncio.create_task(stream_to_queue(stream, queue, i))
for i, stream in enumerate(streams)
]
completed = 0
while completed < len(streams):
index, content = await queue.get()
if content is None:
completed += 1
else:
yield (index, content)
# 清理任务
for task in tasks:
task.cancel()
# 使用示例
async def parallel_stream_example():
client = OpenAIStreamClient(api_key="your-api-key")
parallel_client = ParallelStreamClient(client)
messages_list = [
[{"role": "user", "content": "主题1:AI的未来"}],
[{"role": "user", "content": "主题2:量子计算"}],
[{"role": "user", "content": "主题3:生物技术"}],
]
# 存储每个流的内容
contents = [""] * len(messages_list)
async for index, content in parallel_client.parallel_stream(messages_list):
contents[index] += content
print(f"\r[流 {index}] {contents[index][-50:]}", end="")
print("\n\n=== 结果 ===")
for i, content in enumerate(contents):
print(f"\n流 {i}: {content[:100]}...")最佳实践总结
流式响应检查清单
markdown
## 流式响应检查清单
### 基础实现
- [ ] 支持流式和非流式模式切换
- [ ] 实现增量渲染
- [ ] 处理连接中断
- [ ] 实现超时控制
### 用户体验
- [ ] 显示加载状态
- [ ] 显示生成进度
- [ ] 支持中断操作
- [ ] 支持重新生成
### 性能优化
- [ ] 使用缓冲减少渲染次数
- [ ] 实现并行流式请求
- [ ] 优化首 token 时延
- [ ] 控制内存使用
### 错误处理
- [ ] 处理网络错误
- [ ] 处理超时
- [ ] 处理中断后的恢复
- [ ] 记录错误日志
### 安全考虑
- [ ] 验证流式内容
- [ ] 过滤敏感内容
- [ ] 限制内容长度
- [ ] 实现内容审核流式响应指标
txt
流式响应性能指标:
首字节时延 (TTFT):
目标: < 500ms
警告: 500ms - 1s
错误: > 1s
Token 生成速率:
目标: > 50 tokens/s
警告: 20-50 tokens/s
错误: < 20 tokens/s
总响应时间:
目标: < 10s (短文本)
警告: 10-30s
错误: > 30s
中断响应时间:
目标: < 100ms
警告: 100-500ms
错误: > 500ms