Skip to content

模型选择与路由策略

本文档整理模型选择和智能路由的最佳实践,帮助在不同场景下选择最合适的模型。

核心决策框架

txt
┌─────────────────────────────────────────────────────┐
│                模型选择决策树                         │
├─────────────────────────────────────────────────────┤
│                                                     │
│  任务复杂度?                                        │
│       │                                             │
│       ├── 简单(分类、抽取)→ 小模型                 │
│       │                                             │
│       ├── 中等(对话、问答)→ 中等模型               │
│       │                                             │
│       └── 复杂(推理、创作)→ 大模型                 │
│                                                     │
│  时延要求?                                          │
│       │                                             │
│       ├── 实时(<1s)→ 小模型 + 缓存                 │
│       │                                             │
│       ├── 正常(<5s)→ 中等模型                      │
│       │                                             │
│       └── 宽松(>5s)→ 大模型                        │
│                                                     │
│  成本预算?                                          │
│       │                                             │
│       ├── 紧张 → 小模型 + 路由优化                   │
│       │                                             │
│       ├── 中等 → 混合路由                           │
│       │                                             │
│       └── 宽松 → 最佳效果模型                        │
│                                                     │
└─────────────────────────────────────────────────────┘

模型能力矩阵

模型类型代表模型上下文优势劣势适用场景
小模型GPT-3.5, Claude Haiku16K-200K快速、便宜推理能力有限分类、抽取、简单问答
中等模型GPT-4 Turbo, Claude Sonnet128K-200K平衡性好成本中等对话、问答、分析
大模型GPT-4, Claude Opus128K-200K推理能力强慢、贵复杂推理、创作、代码
超长上下文Gemini 1.5 Pro1M+超长文档时延高文档分析、长对话
开源模型Llama 3, Mistral可变可自托管能力有限私有化、低成本

模型路由策略

策略 1:基于任务复杂度路由

python
from dataclasses import dataclass
from typing import Literal
import re

@dataclass
class TaskAnalysis:
    complexity: Literal["simple", "medium", "complex"]
    task_type: str
    estimated_tokens: int
    requires_reasoning: bool
    requires_creativity: bool

class ComplexityRouter:
    def __init__(self):
        self.complexity_indicators = {
            "simple": [
                r"classify",
                r"extract",
                r"summarize",
                r"translate",
                r"format",
            ],
            "medium": [
                r"explain",
                r"compare",
                r"analyze",
                r"answer",
                r"describe",
            ],
            "complex": [
                r"reason",
                r"design",
                r"create",
                r"debug",
                r"evaluate",
                r"critique",
            ]
        }

    def analyze(self, prompt: str) -> TaskAnalysis:
        """分析任务复杂度"""
        prompt_lower = prompt.lower()

        # 检测任务类型
        task_type = self._detect_task_type(prompt_lower)

        # 评估复杂度
        complexity = self._evaluate_complexity(prompt_lower, task_type)

        # 估算 token
        estimated_tokens = len(prompt.split()) * 2

        # 检测推理需求
        requires_reasoning = self._requires_reasoning(prompt_lower)

        # 检测创作需求
        requires_creativity = self._requires_creativity(prompt_lower)

        return TaskAnalysis(
            complexity=complexity,
            task_type=task_type,
            estimated_tokens=estimated_tokens,
            requires_reasoning=requires_reasoning,
            requires_creativity=requires_creativity
        )

    def route(self, analysis: TaskAnalysis) -> str:
        """路由到合适的模型"""
        if analysis.complexity == "simple":
            return "gpt-3.5-turbo"
        elif analysis.complexity == "medium":
            if analysis.requires_reasoning:
                return "gpt-4-turbo"
            return "gpt-3.5-turbo"
        else:  # complex
            if analysis.requires_creativity:
                return "gpt-4"
            return "gpt-4-turbo"

    def _detect_task_type(self, prompt: str) -> str:
        """检测任务类型"""
        if any(word in prompt for word in ["classify", "categorize", "label"]):
            return "classification"
        elif any(word in prompt for word in ["extract", "get", "find"]):
            return "extraction"
        elif any(word in prompt for word in ["summarize", "summary"]):
            return "summarization"
        elif any(word in prompt for word in ["write", "create", "generate"]):
            return "generation"
        elif any(word in prompt for word in ["answer", "what", "how", "why"]):
            return "qa"
        else:
            return "general"

    def _evaluate_complexity(self, prompt: str, task_type: str) -> str:
        """评估复杂度"""
        for complexity, patterns in self.complexity_indicators.items():
            for pattern in patterns:
                if re.search(pattern, prompt):
                    return complexity
        return "medium"

    def _requires_reasoning(self, prompt: str) -> bool:
        """检测是否需要推理"""
        reasoning_indicators = [
            "why", "how", "reason", "explain", "analyze",
            "compare", "evaluate", "decide", "conclude"
        ]
        return any(indicator in prompt for indicator in reasoning_indicators)

    def _requires_creativity(self, prompt: str) -> bool:
        """检测是否需要创作"""
        creativity_indicators = [
            "create", "write", "generate", "compose", "design",
            "innovative", "creative", "original"
        ]
        return any(indicator in prompt for indicator in creativity_indicators)

策略 2:基于成本路由

python
from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelCost:
    input_price: float  # per 1M tokens
    output_price: float  # per 1M tokens
    avg_latency_ms: int
    quality_score: float  # 0-1

class CostOptimizer:
    def __init__(self):
        self.models = {
            "gpt-3.5-turbo": ModelCost(0.5, 1.5, 500, 0.7),
            "gpt-4-turbo": ModelCost(10, 30, 2000, 0.9),
            "gpt-4": ModelCost(30, 60, 5000, 0.95),
            "claude-haiku": ModelCost(0.25, 1.25, 300, 0.65),
            "claude-sonnet": ModelCost(3, 15, 1500, 0.85),
            "claude-opus": ModelCost(15, 75, 4000, 0.92),
        }

    def select_model(
        self,
        task_analysis: TaskAnalysis,
        budget: Optional[float] = None,
        min_quality: float = 0.7,
        max_latency_ms: Optional[int] = None
    ) -> str:
        """选择最优模型"""
        candidates = []

        for name, cost in self.models.items():
            # 检查质量要求
            if cost.quality_score < min_quality:
                continue

            # 检查时延要求
            if max_latency_ms and cost.avg_latency_ms > max_latency_ms:
                continue

            # 计算预估成本
            estimated_cost = self._estimate_cost(
                name, task_analysis.estimated_tokens
            )

            # 检查预算
            if budget and estimated_cost > budget:
                continue

            candidates.append({
                "name": name,
                "cost": cost,
                "estimated_cost": estimated_cost,
                "quality": cost.quality_score,
                "latency": cost.avg_latency_ms
            })

        if not candidates:
            # 没有符合条件的模型,选择最便宜的
            return self._get_cheapest_model()

        # 按性价比排序
        candidates.sort(key=lambda x: x["quality"] / x["estimated_cost"], reverse=True)

        return candidates[0]["name"]

    def _estimate_cost(self, model: str, tokens: int) -> float:
        """估算成本"""
        cost = self.models[model]
        # 假设输入输出各占一半
        input_tokens = tokens * 0.6
        output_tokens = tokens * 0.4

        return (
            input_tokens * cost.input_price / 1_000_000 +
            output_tokens * cost.output_price / 1_000_000
        )

    def _get_cheapest_model(self) -> str:
        """获取最便宜的模型"""
        return min(
            self.models.keys(),
            key=lambda m: self.models[m].input_price + self.models[m].output_price
        )

策略 3:基于时延路由

python
from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class LatencyRequirement:
    max_latency_ms: int
    p95_latency_ms: Optional[int] = None
    streaming: bool = False

class LatencyRouter:
    def __init__(self):
        self.model_latencies = {
            "gpt-3.5-turbo": {"p50": 400, "p95": 800, "p99": 1500},
            "gpt-4-turbo": {"p50": 1500, "p95": 3000, "p99": 5000},
            "gpt-4": {"p50": 4000, "p95": 8000, "p99": 15000},
            "claude-haiku": {"p50": 250, "p95": 500, "p99": 1000},
            "claude-sonnet": {"p50": 1200, "p95": 2500, "p99": 4000},
            "claude-opus": {"p50": 3500, "p95": 7000, "p99": 12000},
        }
        self.streaming_bonus = 0.3  # 流式可以减少感知时延

    def select_model(
        self,
        requirement: LatencyRequirement,
        task_analysis: TaskAnalysis
    ) -> str:
        """选择满足时延要求的模型"""
        candidates = []

        for name, latencies in self.model_latencies.items():
            # 计算有效时延
            effective_latency = latencies["p95"]
            if requirement.streaming:
                effective_latency *= (1 - self.streaming_bonus)

            # 检查是否满足要求
            if effective_latency <= requirement.max_latency_ms:
                candidates.append({
                    "name": name,
                    "latency": effective_latency,
                    "quality": self._get_quality_score(name, task_analysis)
                })

        if not candidates:
            # 没有满足时延要求的模型,选择最快的
            return self._get_fastest_model()

        # 在满足时延的前提下,选择质量最高的
        candidates.sort(key=lambda x: x["quality"], reverse=True)

        return candidates[0]["name"]

    def _get_quality_score(self, model: str, analysis: TaskAnalysis) -> float:
        """获取模型质量分数(考虑任务类型)"""
        base_scores = {
            "gpt-3.5-turbo": 0.7,
            "gpt-4-turbo": 0.9,
            "gpt-4": 0.95,
            "claude-haiku": 0.65,
            "claude-sonnet": 0.85,
            "claude-opus": 0.92,
        }

        score = base_scores.get(model, 0.5)

        # 根据任务类型调整
        if analysis.requires_reasoning and model in ["gpt-4", "claude-opus"]:
            score += 0.05
        if analysis.requires_creativity and model in ["gpt-4", "claude-opus"]:
            score += 0.05

        return min(score, 1.0)

    def _get_fastest_model(self) -> str:
        """获取最快的模型"""
        return min(
            self.model_latencies.keys(),
            key=lambda m: self.model_latencies[m]["p50"]
        )

策略 4:自适应路由

python
from dataclasses import dataclass
from typing import Optional
import random

@dataclass
class RoutingDecision:
    model: str
    reason: str
    estimated_cost: float
    estimated_latency_ms: int

class AdaptiveRouter:
    def __init__(self):
        self.complexity_router = ComplexityRouter()
        self.cost_optimizer = CostOptimizer()
        self.latency_router = LatencyRouter()

        # 历史数据
        self.history = []
        self.model_performance = {}

    def route(
        self,
        prompt: str,
        budget: Optional[float] = None,
        max_latency_ms: Optional[int] = None,
        min_quality: float = 0.7
    ) -> RoutingDecision:
        """自适应路由"""
        # 分析任务
        analysis = self.complexity_router.analyze(prompt)

        # 获取候选模型
        candidates = self._get_candidates(
            analysis, budget, max_latency_ms, min_quality
        )

        if not candidates:
            # 降级到最便宜的模型
            return self._fallback_decision(analysis)

        # 选择最佳模型
        best = self._select_best(candidates, analysis)

        return RoutingDecision(
            model=best["name"],
            reason=best["reason"],
            estimated_cost=best["estimated_cost"],
            estimated_latency_ms=best["latency"]
        )

    def record_result(
        self,
        model: str,
        prompt: str,
        latency_ms: int,
        quality_score: float,
        cost: float,
        success: bool
    ):
        """记录结果用于学习"""
        self.history.append({
            "model": model,
            "prompt": prompt,
            "latency_ms": latency_ms,
            "quality_score": quality_score,
            "cost": cost,
            "success": success,
            "timestamp": time.time()
        })

        # 更新模型性能统计
        if model not in self.model_performance:
            self.model_performance[model] = {
                "total_calls": 0,
                "total_latency": 0,
                "total_quality": 0,
                "total_cost": 0,
                "success_rate": 0
            }

        perf = self.model_performance[model]
        perf["total_calls"] += 1
        perf["total_latency"] += latency_ms
        perf["total_quality"] += quality_score
        perf["total_cost"] += cost
        perf["success_rate"] = (
            (perf["success_rate"] * (perf["total_calls"] - 1) + (1 if success else 0))
            / perf["total_calls"]
        )

    def _get_candidates(
        self,
        analysis: TaskAnalysis,
        budget: Optional[float],
        max_latency_ms: Optional[int],
        min_quality: float
    ) -> list[dict]:
        """获取候选模型"""
        candidates = []

        for model in self.cost_optimizer.models.keys():
            # 检查质量
            quality = self.cost_optimizer.models[model].quality_score
            if quality < min_quality:
                continue

            # 检查时延
            latency = self.cost_optimizer.models[model].avg_latency_ms
            if max_latency_ms and latency > max_latency_ms:
                continue

            # 计算成本
            cost = self.cost_optimizer._estimate_cost(model, analysis.estimated_tokens)
            if budget and cost > budget:
                continue

            candidates.append({
                "name": model,
                "quality": quality,
                "latency": latency,
                "estimated_cost": cost
            })

        return candidates

    def _select_best(self, candidates: list[dict], analysis: TaskAnalysis) -> dict:
        """选择最佳模型"""
        # 计算综合得分
        for candidate in candidates:
            score = self._calculate_score(candidate, analysis)
            candidate["score"] = score
            candidate["reason"] = self._generate_reason(candidate, analysis)

        # 按得分排序
        candidates.sort(key=lambda x: x["score"], reverse=True)

        return candidates[0]

    def _calculate_score(self, candidate: dict, analysis: TaskAnalysis) -> float:
        """计算综合得分"""
        # 权重
        quality_weight = 0.4
        cost_weight = 0.3
        latency_weight = 0.3

        # 归一化
        quality_score = candidate["quality"]
        cost_score = 1 - min(candidate["estimated_cost"] / 0.1, 1)  # 假设 0.1 是昂贵
        latency_score = 1 - min(candidate["latency"] / 5000, 1)  # 假设 5s 是慢

        # 根据任务调整权重
        if analysis.requires_reasoning:
            quality_weight = 0.5
            cost_weight = 0.25
            latency_weight = 0.25
        elif analysis.complexity == "simple":
            quality_weight = 0.3
            cost_weight = 0.4
            latency_weight = 0.3

        return (
            quality_score * quality_weight +
            cost_score * cost_weight +
            latency_score * latency_weight
        )

    def _generate_reason(self, candidate: dict, analysis: TaskAnalysis) -> str:
        """生成选择原因"""
        reasons = []

        if candidate["quality"] > 0.9:
            reasons.append("高质量模型")
        if candidate["estimated_cost"] < 0.01:
            reasons.append("低成本")
        if candidate["latency"] < 1000:
            reasons.append("快速响应")

        if analysis.requires_reasoning:
            reasons.append("适合推理任务")
        elif analysis.complexity == "simple":
            reasons.append("适合简单任务")

        return "、".join(reasons)

    def _fallback_decision(self, analysis: TaskAnalysis) -> RoutingDecision:
        """降级决策"""
        return RoutingDecision(
            model="gpt-3.5-turbo",
            reason="降级选择:无满足条件的模型",
            estimated_cost=0.001,
            estimated_latency_ms=500
        )

级联与重试策略

级联策略

python
class CascadeStrategy:
    def __init__(self, model_sequence: list[str]):
        self.model_sequence = model_sequence

    async def execute(self, prompt: str, max_attempts: int = 3) -> str:
        """级联执行"""
        for i, model in enumerate(self.model_sequence[:max_attempts]):
            try:
                result = await self._call_model(model, prompt)

                # 验证结果
                if self._is_valid_result(result):
                    return result

                # 结果无效,尝试下一个模型
                print(f"Model {model} returned invalid result, trying next...")

            except Exception as e:
                print(f"Model {model} failed: {e}")
                continue

        raise Exception("All models failed")

    async def _call_model(self, model: str, prompt: str) -> str:
        """调用模型"""
        # 实现模型调用
        pass

    def _is_valid_result(self, result: str) -> bool:
        """验证结果"""
        if not result or len(result) < 10:
            return False
        if "error" in result.lower():
            return False
        return True

# 使用示例
cascade = CascadeStrategy([
    "claude-opus",      # 最高质量
    "gpt-4-turbo",      # 次高质量
    "gpt-3.5-turbo",    # 兜底
])

重试策略

python
import asyncio
from typing import Optional
import random

class RetryStrategy:
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base

    async def execute_with_retry(
        self,
        model: str,
        prompt: str,
        validate_func: Optional[callable] = None
    ) -> str:
        """带重试的执行"""
        last_error = None

        for attempt in range(self.max_retries):
            try:
                result = await self._call_model(model, prompt)

                # 验证结果
                if validate_func and not validate_func(result):
                    raise ValueError("Validation failed")

                return result

            except Exception as e:
                last_error = e

                # 计算延迟
                delay = self._calculate_delay(attempt)

                # 添加随机抖动
                delay = delay * (1 + random.random() * 0.1)

                print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")

                await asyncio.sleep(delay)

        raise Exception(f"All retries failed: {last_error}")

    def _calculate_delay(self, attempt: int) -> float:
        """计算延迟(指数退避)"""
        delay = self.base_delay * (self.exponential_base ** attempt)
        return min(delay, self.max_delay)

    async def _call_model(self, model: str, prompt: str) -> str:
        """调用模型"""
        # 实现模型调用
        pass

缓存策略

语义缓存

python
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class CacheEntry:
    query: str
    response: str
    model: str
    embedding: list[float]
    timestamp: float

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.cache: list[CacheEntry] = []
        self.similarity_threshold = similarity_threshold
        self.embedder = None  # 嵌入模型

    async def get(self, query: str) -> Optional[CacheEntry]:
        """获取缓存"""
        query_embedding = await self._get_embedding(query)

        for entry in self.cache:
            similarity = self._cosine_similarity(query_embedding, entry.embedding)
            if similarity >= self.similarity_threshold:
                return entry

        return None

    async def set(self, query: str, response: str, model: str):
        """设置缓存"""
        embedding = await self._get_embedding(query)

        entry = CacheEntry(
            query=query,
            response=response,
            model=model,
            embedding=embedding,
            timestamp=time.time()
        )

        self.cache.append(entry)

        # 限制缓存大小
        if len(self.cache) > 1000:
            self.cache = self.cache[-1000:]

    async def _get_embedding(self, text: str) -> list[float]:
        """获取嵌入向量"""
        if self.embedder is None:
            # 初始化嵌入模型
            pass
        return await self.embedder.embed(text)

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        """计算余弦相似度"""
        import numpy as np
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# 使用示例
cache = SemanticCache()

async def get_response_with_cache(query: str, model: str) -> str:
    # 尝试从缓存获取
    cached = await cache.get(query)
    if cached:
        return cached.response

    # 调用模型
    response = await call_model(model, query)

    # 存入缓存
    await cache.set(query, response, model)

    return response

最佳实践总结

模型选择决策表

场景推荐模型原因
简单分类GPT-3.5 / Claude Haiku快速、便宜、足够
信息抽取GPT-3.5 / Claude Haiku结构化输出
简单问答GPT-3.5 / Claude Haiku响应快
复杂问答GPT-4 Turbo / Claude Sonnet需要推理
代码生成GPT-4 / Claude Opus质量优先
创意写作GPT-4 / Claude Opus创造力
长文档分析Gemini 1.5 Pro超长上下文
实时对话GPT-3.5 / Claude Haiku时延优先
高并发场景GPT-3.5 / Claude Haiku成本控制

路由配置示例

yaml
# routing-config.yaml
default_model: gpt-4-turbo

routes:
  # 简单任务路由
  - name: simple_tasks
    condition:
      task_types: [classification, extraction, summarization]
      max_tokens: 500
    models:
      - gpt-3.5-turbo
    fallback: gpt-4-turbo

  # 推理任务路由
  - name: reasoning_tasks
    condition:
      requires_reasoning: true
    models:
      - gpt-4
      - gpt-4-turbo
    fallback: claude-opus

  # 创意任务路由
  - name: creative_tasks
    condition:
      requires_creativity: true
    models:
      - gpt-4
      - claude-opus
    fallback: gpt-4-turbo

  # 高时延任务路由
  - name: high_latency_allowed
    condition:
      max_latency_ms: 10000
    models:
      - gpt-4
      - claude-opus
    fallback: gpt-4-turbo

# 缓存配置
cache:
  enabled: true
  type: semantic
  similarity_threshold: 0.95
  max_entries: 1000
  ttl_seconds: 3600

# 重试配置
retry:
  max_retries: 3
  base_delay_ms: 1000
  max_delay_ms: 60000
  exponential_base: 2

# 成本控制
cost_control:
  daily_budget: 100 # 美元
  per_request_budget: 1 # 美元
  alert_threshold: 0.8 # 80%