Skip to content

流式响应处理

本文档整理流式响应(Streaming)的最佳实践,帮助实现更好的用户体验和更低的时延。

为什么使用流式响应

txt
┌─────────────────────────────────────────────────────┐
│               传统响应 vs 流式响应                    │
├─────────────────────────────────────────────────────┤
│                                                     │
│  传统响应:                                          │
│  ┌─────────────────────────────────────────────┐   │
│  │ 请求 → 等待 → 等待 → 等待 → 完整响应          │   │
│  │  [=============================]             │   │
│  │              用户等待整个响应                  │   │
│  └─────────────────────────────────────────────┘   │
│                                                     │
│  流式响应:                                          │
│  ┌─────────────────────────────────────────────┐   │
│  │ 请求 → 响应片段1 → 响应片段2 → 响应片段3 → ... │   │
│  │  [=] [=] [=] [=] [=] [=] [=] [=] [=] [=]     │   │
│  │      用户逐步看到内容                          │   │
│  └─────────────────────────────────────────────┘   │
│                                                     │
│  优势:                                             │
│  - 首字节时延(TTFT)更低                           │
│  - 用户感知响应更快                                 │
│  - 可提前中断长响应                                 │
│  - 减少超时风险                                     │
│                                                     │
└─────────────────────────────────────────────────────┘

基础流式实现

OpenAI 流式调用

python
import asyncio
from typing import AsyncIterator, Optional
from dataclasses import dataclass

@dataclass
class StreamChunk:
    content: str
    finish_reason: Optional[str] = None
    usage: Optional[dict] = None

class OpenAIStreamClient:
    def __init__(self, api_key: str, model: str = "gpt-4"):
        self.api_key = api_key
        self.model = model
        self.client = None  # 初始化 OpenAI 客户端

    async def stream(
        self,
        messages: list[dict],
        temperature: float = 0.7,
        max_tokens: int = 4096
    ) -> AsyncIterator[StreamChunk]:
        """流式生成响应"""
        import openai

        stream = await openai.ChatCompletion.acreate(
            model=self.model,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=True
        )

        async for chunk in stream:
            # 解析 chunk
            if chunk.choices:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

                content = delta.content if hasattr(delta, 'content') else ""

                yield StreamChunk(
                    content=content,
                    finish_reason=finish_reason
                )

        # 最后一个 chunk 包含 usage 信息
        if hasattr(chunk, 'usage'):
            yield StreamChunk(
                content="",
                usage=chunk.usage
            )

# 使用示例
async def stream_example():
    client = OpenAIStreamClient(api_key="your-api-key")

    messages = [
        {"role": "user", "content": "写一篇关于AI的文章"}
    ]

    async for chunk in client.stream(messages):
        if chunk.content:
            print(chunk.content, end="", flush=True)

        if chunk.finish_reason:
            print(f"\n\n完成原因: {chunk.finish_reason}")

        if chunk.usage:
            print(f"Token 使用: {chunk.usage}")

Claude 流式调用

python
from anthropic import AsyncAnthropic

class ClaudeStreamClient:
    def __init__(self, api_key: str, model: str = "claude-3-opus-20240229"):
        self.client = AsyncAnthropic(api_key=api_key)
        self.model = model

    async def stream(
        self,
        messages: list[dict],
        system: str = None,
        max_tokens: int = 4096
    ) -> AsyncIterator[StreamChunk]:
        """流式生成响应"""
        async with self.client.messages.stream(
            model=self.model,
            max_tokens=max_tokens,
            system=system,
            messages=messages
        ) as stream:
            async for text in stream.text_stream:
                yield StreamChunk(content=text)

            # 获取最终消息
            final_message = await stream.get_final_message()
            yield StreamChunk(
                content="",
                finish_reason="end_turn",
                usage={
                    "input_tokens": final_message.usage.input_tokens,
                    "output_tokens": final_message.usage.output_tokens
                }
            )

# 使用示例
async def claude_stream_example():
    client = ClaudeStreamClient(api_key="your-api-key")

    messages = [
        {"role": "user", "content": "解释量子计算的基本原理"}
    ]

    async for chunk in client.stream(
        messages,
        system="你是一个科学教育者,用简单易懂的语言解释复杂概念。"
    ):
        print(chunk.content, end="", flush=True)

流式响应处理

增量渲染

python
from dataclasses import dataclass
from typing import Optional, List, Callable
import re

@dataclass
class RenderState:
    text: str = ""
    in_code_block: bool = False
    code_language: str = ""
    in_table: bool = False
    table_rows: List[str] = None

    def __post_init__(self):
        if self.table_rows is None:
            self.table_rows = []

class IncrementalRenderer:
    """增量渲染器:处理流式内容的实时渲染"""

    def __init__(self):
        self.state = RenderState()
        self.callbacks: List[Callable] = []

    def add_callback(self, callback: Callable):
        """添加回调函数"""
        self.callbacks.append(callback)

    async def process_chunk(self, chunk: str):
        """处理流式 chunk"""
        self.state.text += chunk

        # 检测代码块
        self._detect_code_block()

        # 检测表格
        self._detect_table()

        # 触发回调
        for callback in self.callbacks:
            await callback(self.state)

    def _detect_code_block(self):
        """检测代码块"""
        # 检测代码块开始
        if not self.state.in_code_block:
            match = re.search(r'```(\w*)$', self.state.text)
            if match:
                self.state.in_code_block = True
                self.state.code_language = match.group(1)

        # 检测代码块结束
        elif self.state.in_code_block:
            if '```' in self.state.text[self.state.text.rfind('```'):]:
                self.state.in_code_block = False
                self.state.code_language = ""

    def _detect_table(self):
        """检测表格"""
        lines = self.state.text.split('\n')
        table_lines = [l for l in lines if l.strip().startswith('|')]

        if len(table_lines) >= 2:
            self.state.in_table = True
            self.state.table_rows = table_lines

    def get_current_state(self) -> RenderState:
        """获取当前状态"""
        return self.state

# 使用示例
async def render_stream():
    client = OpenAIStreamClient(api_key="your-api-key")
    renderer = IncrementalRenderer()

    # 添加回调:实时更新 UI
    async def update_ui(state: RenderState):
        if state.in_code_block:
            print(f"\r[代码块: {state.code_language}] {len(state.text)} 字符", end="")
        else:
            # 实时显示文本
            print(state.text[-len(chunk.content):], end="", flush=True)

    renderer.add_callback(update_ui)

    messages = [{"role": "user", "content": "写一个 Python 快速排序实现"}]

    async for chunk in client.stream(messages):
        if chunk.content:
            await renderer.process_chunk(chunk.content)

内容缓冲与批量处理

python
from dataclasses import dataclass
from typing import AsyncIterator, List
import asyncio

@dataclass
class BufferedChunk:
    content: str
    is_complete: bool = False

class StreamBuffer:
    """流式缓冲器:批量处理流式内容"""

    def __init__(
        self,
        buffer_size: int = 100,
        buffer_time_ms: int = 100,
        sentence_end_chars: List[str] = None
    ):
        self.buffer_size = buffer_size
        self.buffer_time_ms = buffer_time_ms
        self.sentence_end_chars = sentence_end_chars or ['.', '!', '?', '。', '!', '?']

        self.buffer = ""
        self.last_flush = 0

    async def process_stream(
        self,
        stream: AsyncIterator[StreamChunk]
    ) -> AsyncIterator[BufferedChunk]:
        """处理流式内容"""
        async for chunk in stream:
            if not chunk.content:
                continue

            self.buffer += chunk.content

            # 检查是否需要刷新
            should_flush = (
                len(self.buffer) >= self.buffer_size or
                self._ends_with_sentence() or
                chunk.finish_reason
            )

            if should_flush:
                yield BufferedChunk(
                    content=self.buffer,
                    is_complete=bool(chunk.finish_reason)
                )
                self.buffer = ""

        # 刷新剩余内容
        if self.buffer:
            yield BufferedChunk(content=self.buffer, is_complete=True)

    def _ends_with_sentence(self) -> bool:
        """检查是否以句子结束符结尾"""
        if not self.buffer:
            return False
        return self.buffer[-1] in self.sentence_end_chars

# 使用示例
async def buffered_stream_example():
    client = OpenAIStreamClient(api_key="your-api-key")
    buffer = StreamBuffer(buffer_size=200, buffer_time_ms=100)

    messages = [{"role": "user", "content": "讲述一个简短的故事"}]

    stream = client.stream(messages)

    async for buffered_chunk in buffer.process_stream(stream):
        print(buffered_chunk.content, end="", flush=True)
        if buffered_chunk.is_complete:
            print("\n[完成]")

中断与取消

用户中断处理

python
from dataclasses import dataclass
from typing import Optional, Callable
import asyncio

@dataclass
class StreamState:
    is_running: bool = False
    is_cancelled: bool = False
    content: str = ""
    token_count: int = 0

class InterruptibleStream:
    """可中断的流式响应"""

    def __init__(self):
        self.state = StreamState()
        self.cancel_event = asyncio.Event()
        self.on_content: Optional[Callable] = None
        self.on_cancel: Optional[Callable] = None
        self.on_complete: Optional[Callable] = None

    async def stream(
        self,
        client,
        messages: list[dict],
        **kwargs
    ) -> str:
        """可中断的流式生成"""
        self.state = StreamState(is_running=True)
        self.cancel_event.clear()

        try:
            stream = client.stream(messages, **kwargs)

            async for chunk in stream:
                # 检查是否被取消
                if self.cancel_event.is_set():
                    self.state.is_cancelled = True
                    if self.on_cancel:
                        await self.on_cancel(self.state)
                    break

                # 处理内容
                if chunk.content:
                    self.state.content += chunk.content
                    self.state.token_count += 1

                    if self.on_content:
                        await self.on_content(chunk.content)

            self.state.is_running = False

            if not self.state.is_cancelled and self.on_complete:
                await self.on_complete(self.state)

            return self.state.content

        except Exception as e:
            self.state.is_running = False
            raise

    def cancel(self):
        """取消流式生成"""
        self.cancel_event.set()

    def get_state(self) -> StreamState:
        """获取当前状态"""
        return self.state

# 使用示例
async def interruptible_example():
    client = OpenAIStreamClient(api_key="your-api-key")
    streamer = InterruptibleStream()

    # 设置回调
    async def on_content(content: str):
        print(content, end="", flush=True)

    async def on_cancel(state: StreamState):
        print(f"\n\n[用户中断] 已生成 {state.token_count} 个 token")

    streamer.on_content = on_content
    streamer.on_cancel = on_cancel

    # 启动流式生成
    messages = [{"role": "user", "content": "写一篇长文章"}]

    # 模拟用户中断
    async def simulate_user_interrupt():
        await asyncio.sleep(2)  # 2秒后中断
        print("\n\n[用户按下停止按钮]")
        streamer.cancel()

    # 并行执行
    stream_task = asyncio.create_task(streamer.stream(client, messages))
    interrupt_task = asyncio.create_task(simulate_user_interrupt())

    # 等待流式生成完成或被中断
    await asyncio.gather(stream_task, interrupt_task, return_exceptions=True)

超时控制

python
from typing import AsyncIterator, Optional
import asyncio

class TimeoutStream:
    """带超时控制的流式响应"""

    def __init__(
        self,
        first_token_timeout: float = 10.0,
        total_timeout: float = 60.0,
        idle_timeout: float = 5.0
    ):
        self.first_token_timeout = first_token_timeout
        self.total_timeout = total_timeout
        self.idle_timeout = idle_timeout

    async def stream_with_timeout(
        self,
        stream: AsyncIterator[StreamChunk],
        on_first_token: Optional[Callable] = None,
        on_timeout: Optional[Callable] = None
    ) -> AsyncIterator[StreamChunk]:
        """带超时的流式处理"""
        import time
        start_time = time.time()
        first_token_received = False
        last_token_time = start_time

        async def timeout_wrapper():
            nonlocal first_token_received, last_token_time

            async for chunk in stream:
                current_time = time.time()

                # 检查首 token 超时
                if not first_token_received:
                    if current_time - start_time > self.first_token_timeout:
                        raise TimeoutError(f"首 token 超时 ({self.first_token_timeout}s)")
                    first_token_received = True
                    if on_first_token:
                        await on_first_token()

                # 检查总超时
                if current_time - start_time > self.total_timeout:
                    raise TimeoutError(f"总超时 ({self.total_timeout}s)")

                # 检查空闲超时
                if current_time - last_token_time > self.idle_timeout:
                    raise TimeoutError(f"空闲超时 ({self.idle_timeout}s)")

                last_token_time = current_time
                yield chunk

        try:
            async for chunk in timeout_wrapper():
                yield chunk
        except TimeoutError as e:
            if on_timeout:
                await on_timeout(str(e))
            raise

# 使用示例
async def timeout_example():
    client = OpenAIStreamClient(api_key="your-api-key")
    timeout_stream = TimeoutStream(
        first_token_timeout=5.0,
        total_timeout=30.0,
        idle_timeout=3.0
    )

    messages = [{"role": "user", "content": "生成一个长回答"}]

    async def on_first_token():
        print("[首 token 已接收]")

    async def on_timeout(error: str):
        print(f"[超时错误: {error}]")

    try:
        base_stream = client.stream(messages)
        async for chunk in timeout_stream.stream_with_timeout(
            base_stream,
            on_first_token=on_first_token,
            on_timeout=on_timeout
        ):
            print(chunk.content, end="", flush=True)
    except TimeoutError as e:
        print(f"\n[处理超时: {e}]")

多模态流式处理

流式图像描述

python
from typing import AsyncIterator, Tuple
from dataclasses import dataclass

@dataclass
class ImageStreamChunk:
    type: str  # "text" | "image_url" | "bounding_box"
    content: str
    bbox: Optional[Tuple[int, int, int, int]] = None

class MultiModalStreamClient:
    """多模态流式客户端"""

    def __init__(self, client):
        self.client = client

    async def stream_image_analysis(
        self,
        image_url: str,
        prompt: str
    ) -> AsyncIterator[ImageStreamChunk]:
        """流式图像分析"""
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    {"type": "image_url", "image_url": {"url": image_url}}
                ]
            }
        ]

        async for chunk in self.client.stream(messages):
            # 解析内容
            content = chunk.content

            # 检测是否包含边界框信息
            bbox_match = self._parse_bounding_box(content)
            if bbox_match:
                yield ImageStreamChunk(
                    type="bounding_box",
                    content=content,
                    bbox=bbox_match
                )
            else:
                yield ImageStreamChunk(
                    type="text",
                    content=content
                )

    def _parse_bounding_box(self, text: str) -> Optional[Tuple[int, int, int, int]]:
        """解析边界框"""
        import re
        # 匹配 [x1, y1, x2, y2] 格式
        match = re.search(r'\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]', text)
        if match:
            return tuple(int(x) for x in match.groups())
        return None

# 使用示例
async def image_stream_example():
    client = OpenAIStreamClient(api_key="your-api-key")
    multimodal = MultiModalStreamClient(client)

    image_url = "https://example.com/image.jpg"
    prompt = "描述这张图片,并标注出图片中的主要物体位置。"

    async for chunk in multimodal.stream_image_analysis(image_url, prompt):
        if chunk.type == "text":
            print(chunk.content, end="", flush=True)
        elif chunk.type == "bounding_box":
            print(f"\n[边界框: {chunk.bbox}]")

性能优化

并行流式请求

python
from typing import List, AsyncIterator, Tuple
import asyncio

class ParallelStreamClient:
    """并行流式客户端"""

    def __init__(self, client):
        self.client = client

    async def parallel_stream(
        self,
        messages_list: List[list],
        max_concurrent: int = 3
    ) -> AsyncIterator[Tuple[int, str]]:
        """并行流式生成"""
        semaphore = asyncio.Semaphore(max_concurrent)

        async def stream_with_semaphore(index: int, messages: list):
            async with semaphore:
                async for chunk in self.client.stream(messages):
                    yield (index, chunk.content)

        # 创建所有流
        streams = [
            stream_with_semaphore(i, messages)
            for i, messages in enumerate(messages_list)
        ]

        # 合并流
        async for index, content in self._merge_streams(streams):
            yield (index, content)

    async def _merge_streams(
        self,
        streams: List[AsyncIterator]
    ) -> AsyncIterator[Tuple[int, str]]:
        """合并多个流"""
        import asyncio

        async def stream_to_queue(stream, queue, index):
            try:
                async for item in stream:
                    await queue.put((index, item))
            finally:
                await queue.put((index, None))  # 结束标记

        queue = asyncio.Queue()
        tasks = [
            asyncio.create_task(stream_to_queue(stream, queue, i))
            for i, stream in enumerate(streams)
        ]

        completed = 0
        while completed < len(streams):
            index, content = await queue.get()
            if content is None:
                completed += 1
            else:
                yield (index, content)

        # 清理任务
        for task in tasks:
            task.cancel()

# 使用示例
async def parallel_stream_example():
    client = OpenAIStreamClient(api_key="your-api-key")
    parallel_client = ParallelStreamClient(client)

    messages_list = [
        [{"role": "user", "content": "主题1:AI的未来"}],
        [{"role": "user", "content": "主题2:量子计算"}],
        [{"role": "user", "content": "主题3:生物技术"}],
    ]

    # 存储每个流的内容
    contents = [""] * len(messages_list)

    async for index, content in parallel_client.parallel_stream(messages_list):
        contents[index] += content
        print(f"\r[流 {index}] {contents[index][-50:]}", end="")

    print("\n\n=== 结果 ===")
    for i, content in enumerate(contents):
        print(f"\n{i}: {content[:100]}...")

最佳实践总结

流式响应检查清单

markdown
## 流式响应检查清单

### 基础实现

- [ ] 支持流式和非流式模式切换
- [ ] 实现增量渲染
- [ ] 处理连接中断
- [ ] 实现超时控制

### 用户体验

- [ ] 显示加载状态
- [ ] 显示生成进度
- [ ] 支持中断操作
- [ ] 支持重新生成

### 性能优化

- [ ] 使用缓冲减少渲染次数
- [ ] 实现并行流式请求
- [ ] 优化首 token 时延
- [ ] 控制内存使用

### 错误处理

- [ ] 处理网络错误
- [ ] 处理超时
- [ ] 处理中断后的恢复
- [ ] 记录错误日志

### 安全考虑

- [ ] 验证流式内容
- [ ] 过滤敏感内容
- [ ] 限制内容长度
- [ ] 实现内容审核

流式响应指标

txt
流式响应性能指标:
  首字节时延 (TTFT):
    目标: < 500ms
    警告: 500ms - 1s
    错误: > 1s

  Token 生成速率:
    目标: > 50 tokens/s
    警告: 20-50 tokens/s
    错误: < 20 tokens/s

  总响应时间:
    目标: < 10s (短文本)
    警告: 10-30s
    错误: > 30s

  中断响应时间:
    目标: < 100ms
    警告: 100-500ms
    错误: > 500ms