模型选择与路由策略
本文档整理模型选择和智能路由的最佳实践,帮助在不同场景下选择最合适的模型。
核心决策框架
txt
┌─────────────────────────────────────────────────────┐
│ 模型选择决策树 │
├─────────────────────────────────────────────────────┤
│ │
│ 任务复杂度? │
│ │ │
│ ├── 简单(分类、抽取)→ 小模型 │
│ │ │
│ ├── 中等(对话、问答)→ 中等模型 │
│ │ │
│ └── 复杂(推理、创作)→ 大模型 │
│ │
│ 时延要求? │
│ │ │
│ ├── 实时(<1s)→ 小模型 + 缓存 │
│ │ │
│ ├── 正常(<5s)→ 中等模型 │
│ │ │
│ └── 宽松(>5s)→ 大模型 │
│ │
│ 成本预算? │
│ │ │
│ ├── 紧张 → 小模型 + 路由优化 │
│ │ │
│ ├── 中等 → 混合路由 │
│ │ │
│ └── 宽松 → 最佳效果模型 │
│ │
└─────────────────────────────────────────────────────┘模型能力矩阵
| 模型类型 | 代表模型 | 上下文 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|---|---|
| 小模型 | GPT-3.5, Claude Haiku | 16K-200K | 快速、便宜 | 推理能力有限 | 分类、抽取、简单问答 |
| 中等模型 | GPT-4 Turbo, Claude Sonnet | 128K-200K | 平衡性好 | 成本中等 | 对话、问答、分析 |
| 大模型 | GPT-4, Claude Opus | 128K-200K | 推理能力强 | 慢、贵 | 复杂推理、创作、代码 |
| 超长上下文 | Gemini 1.5 Pro | 1M+ | 超长文档 | 时延高 | 文档分析、长对话 |
| 开源模型 | Llama 3, Mistral | 可变 | 可自托管 | 能力有限 | 私有化、低成本 |
模型路由策略
策略 1:基于任务复杂度路由
python
from dataclasses import dataclass
from typing import Literal
import re
@dataclass
class TaskAnalysis:
complexity: Literal["simple", "medium", "complex"]
task_type: str
estimated_tokens: int
requires_reasoning: bool
requires_creativity: bool
class ComplexityRouter:
def __init__(self):
self.complexity_indicators = {
"simple": [
r"classify",
r"extract",
r"summarize",
r"translate",
r"format",
],
"medium": [
r"explain",
r"compare",
r"analyze",
r"answer",
r"describe",
],
"complex": [
r"reason",
r"design",
r"create",
r"debug",
r"evaluate",
r"critique",
]
}
def analyze(self, prompt: str) -> TaskAnalysis:
"""分析任务复杂度"""
prompt_lower = prompt.lower()
# 检测任务类型
task_type = self._detect_task_type(prompt_lower)
# 评估复杂度
complexity = self._evaluate_complexity(prompt_lower, task_type)
# 估算 token
estimated_tokens = len(prompt.split()) * 2
# 检测推理需求
requires_reasoning = self._requires_reasoning(prompt_lower)
# 检测创作需求
requires_creativity = self._requires_creativity(prompt_lower)
return TaskAnalysis(
complexity=complexity,
task_type=task_type,
estimated_tokens=estimated_tokens,
requires_reasoning=requires_reasoning,
requires_creativity=requires_creativity
)
def route(self, analysis: TaskAnalysis) -> str:
"""路由到合适的模型"""
if analysis.complexity == "simple":
return "gpt-3.5-turbo"
elif analysis.complexity == "medium":
if analysis.requires_reasoning:
return "gpt-4-turbo"
return "gpt-3.5-turbo"
else: # complex
if analysis.requires_creativity:
return "gpt-4"
return "gpt-4-turbo"
def _detect_task_type(self, prompt: str) -> str:
"""检测任务类型"""
if any(word in prompt for word in ["classify", "categorize", "label"]):
return "classification"
elif any(word in prompt for word in ["extract", "get", "find"]):
return "extraction"
elif any(word in prompt for word in ["summarize", "summary"]):
return "summarization"
elif any(word in prompt for word in ["write", "create", "generate"]):
return "generation"
elif any(word in prompt for word in ["answer", "what", "how", "why"]):
return "qa"
else:
return "general"
def _evaluate_complexity(self, prompt: str, task_type: str) -> str:
"""评估复杂度"""
for complexity, patterns in self.complexity_indicators.items():
for pattern in patterns:
if re.search(pattern, prompt):
return complexity
return "medium"
def _requires_reasoning(self, prompt: str) -> bool:
"""检测是否需要推理"""
reasoning_indicators = [
"why", "how", "reason", "explain", "analyze",
"compare", "evaluate", "decide", "conclude"
]
return any(indicator in prompt for indicator in reasoning_indicators)
def _requires_creativity(self, prompt: str) -> bool:
"""检测是否需要创作"""
creativity_indicators = [
"create", "write", "generate", "compose", "design",
"innovative", "creative", "original"
]
return any(indicator in prompt for indicator in creativity_indicators)策略 2:基于成本路由
python
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelCost:
input_price: float # per 1M tokens
output_price: float # per 1M tokens
avg_latency_ms: int
quality_score: float # 0-1
class CostOptimizer:
def __init__(self):
self.models = {
"gpt-3.5-turbo": ModelCost(0.5, 1.5, 500, 0.7),
"gpt-4-turbo": ModelCost(10, 30, 2000, 0.9),
"gpt-4": ModelCost(30, 60, 5000, 0.95),
"claude-haiku": ModelCost(0.25, 1.25, 300, 0.65),
"claude-sonnet": ModelCost(3, 15, 1500, 0.85),
"claude-opus": ModelCost(15, 75, 4000, 0.92),
}
def select_model(
self,
task_analysis: TaskAnalysis,
budget: Optional[float] = None,
min_quality: float = 0.7,
max_latency_ms: Optional[int] = None
) -> str:
"""选择最优模型"""
candidates = []
for name, cost in self.models.items():
# 检查质量要求
if cost.quality_score < min_quality:
continue
# 检查时延要求
if max_latency_ms and cost.avg_latency_ms > max_latency_ms:
continue
# 计算预估成本
estimated_cost = self._estimate_cost(
name, task_analysis.estimated_tokens
)
# 检查预算
if budget and estimated_cost > budget:
continue
candidates.append({
"name": name,
"cost": cost,
"estimated_cost": estimated_cost,
"quality": cost.quality_score,
"latency": cost.avg_latency_ms
})
if not candidates:
# 没有符合条件的模型,选择最便宜的
return self._get_cheapest_model()
# 按性价比排序
candidates.sort(key=lambda x: x["quality"] / x["estimated_cost"], reverse=True)
return candidates[0]["name"]
def _estimate_cost(self, model: str, tokens: int) -> float:
"""估算成本"""
cost = self.models[model]
# 假设输入输出各占一半
input_tokens = tokens * 0.6
output_tokens = tokens * 0.4
return (
input_tokens * cost.input_price / 1_000_000 +
output_tokens * cost.output_price / 1_000_000
)
def _get_cheapest_model(self) -> str:
"""获取最便宜的模型"""
return min(
self.models.keys(),
key=lambda m: self.models[m].input_price + self.models[m].output_price
)策略 3:基于时延路由
python
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class LatencyRequirement:
max_latency_ms: int
p95_latency_ms: Optional[int] = None
streaming: bool = False
class LatencyRouter:
def __init__(self):
self.model_latencies = {
"gpt-3.5-turbo": {"p50": 400, "p95": 800, "p99": 1500},
"gpt-4-turbo": {"p50": 1500, "p95": 3000, "p99": 5000},
"gpt-4": {"p50": 4000, "p95": 8000, "p99": 15000},
"claude-haiku": {"p50": 250, "p95": 500, "p99": 1000},
"claude-sonnet": {"p50": 1200, "p95": 2500, "p99": 4000},
"claude-opus": {"p50": 3500, "p95": 7000, "p99": 12000},
}
self.streaming_bonus = 0.3 # 流式可以减少感知时延
def select_model(
self,
requirement: LatencyRequirement,
task_analysis: TaskAnalysis
) -> str:
"""选择满足时延要求的模型"""
candidates = []
for name, latencies in self.model_latencies.items():
# 计算有效时延
effective_latency = latencies["p95"]
if requirement.streaming:
effective_latency *= (1 - self.streaming_bonus)
# 检查是否满足要求
if effective_latency <= requirement.max_latency_ms:
candidates.append({
"name": name,
"latency": effective_latency,
"quality": self._get_quality_score(name, task_analysis)
})
if not candidates:
# 没有满足时延要求的模型,选择最快的
return self._get_fastest_model()
# 在满足时延的前提下,选择质量最高的
candidates.sort(key=lambda x: x["quality"], reverse=True)
return candidates[0]["name"]
def _get_quality_score(self, model: str, analysis: TaskAnalysis) -> float:
"""获取模型质量分数(考虑任务类型)"""
base_scores = {
"gpt-3.5-turbo": 0.7,
"gpt-4-turbo": 0.9,
"gpt-4": 0.95,
"claude-haiku": 0.65,
"claude-sonnet": 0.85,
"claude-opus": 0.92,
}
score = base_scores.get(model, 0.5)
# 根据任务类型调整
if analysis.requires_reasoning and model in ["gpt-4", "claude-opus"]:
score += 0.05
if analysis.requires_creativity and model in ["gpt-4", "claude-opus"]:
score += 0.05
return min(score, 1.0)
def _get_fastest_model(self) -> str:
"""获取最快的模型"""
return min(
self.model_latencies.keys(),
key=lambda m: self.model_latencies[m]["p50"]
)策略 4:自适应路由
python
from dataclasses import dataclass
from typing import Optional
import random
@dataclass
class RoutingDecision:
model: str
reason: str
estimated_cost: float
estimated_latency_ms: int
class AdaptiveRouter:
def __init__(self):
self.complexity_router = ComplexityRouter()
self.cost_optimizer = CostOptimizer()
self.latency_router = LatencyRouter()
# 历史数据
self.history = []
self.model_performance = {}
def route(
self,
prompt: str,
budget: Optional[float] = None,
max_latency_ms: Optional[int] = None,
min_quality: float = 0.7
) -> RoutingDecision:
"""自适应路由"""
# 分析任务
analysis = self.complexity_router.analyze(prompt)
# 获取候选模型
candidates = self._get_candidates(
analysis, budget, max_latency_ms, min_quality
)
if not candidates:
# 降级到最便宜的模型
return self._fallback_decision(analysis)
# 选择最佳模型
best = self._select_best(candidates, analysis)
return RoutingDecision(
model=best["name"],
reason=best["reason"],
estimated_cost=best["estimated_cost"],
estimated_latency_ms=best["latency"]
)
def record_result(
self,
model: str,
prompt: str,
latency_ms: int,
quality_score: float,
cost: float,
success: bool
):
"""记录结果用于学习"""
self.history.append({
"model": model,
"prompt": prompt,
"latency_ms": latency_ms,
"quality_score": quality_score,
"cost": cost,
"success": success,
"timestamp": time.time()
})
# 更新模型性能统计
if model not in self.model_performance:
self.model_performance[model] = {
"total_calls": 0,
"total_latency": 0,
"total_quality": 0,
"total_cost": 0,
"success_rate": 0
}
perf = self.model_performance[model]
perf["total_calls"] += 1
perf["total_latency"] += latency_ms
perf["total_quality"] += quality_score
perf["total_cost"] += cost
perf["success_rate"] = (
(perf["success_rate"] * (perf["total_calls"] - 1) + (1 if success else 0))
/ perf["total_calls"]
)
def _get_candidates(
self,
analysis: TaskAnalysis,
budget: Optional[float],
max_latency_ms: Optional[int],
min_quality: float
) -> list[dict]:
"""获取候选模型"""
candidates = []
for model in self.cost_optimizer.models.keys():
# 检查质量
quality = self.cost_optimizer.models[model].quality_score
if quality < min_quality:
continue
# 检查时延
latency = self.cost_optimizer.models[model].avg_latency_ms
if max_latency_ms and latency > max_latency_ms:
continue
# 计算成本
cost = self.cost_optimizer._estimate_cost(model, analysis.estimated_tokens)
if budget and cost > budget:
continue
candidates.append({
"name": model,
"quality": quality,
"latency": latency,
"estimated_cost": cost
})
return candidates
def _select_best(self, candidates: list[dict], analysis: TaskAnalysis) -> dict:
"""选择最佳模型"""
# 计算综合得分
for candidate in candidates:
score = self._calculate_score(candidate, analysis)
candidate["score"] = score
candidate["reason"] = self._generate_reason(candidate, analysis)
# 按得分排序
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[0]
def _calculate_score(self, candidate: dict, analysis: TaskAnalysis) -> float:
"""计算综合得分"""
# 权重
quality_weight = 0.4
cost_weight = 0.3
latency_weight = 0.3
# 归一化
quality_score = candidate["quality"]
cost_score = 1 - min(candidate["estimated_cost"] / 0.1, 1) # 假设 0.1 是昂贵
latency_score = 1 - min(candidate["latency"] / 5000, 1) # 假设 5s 是慢
# 根据任务调整权重
if analysis.requires_reasoning:
quality_weight = 0.5
cost_weight = 0.25
latency_weight = 0.25
elif analysis.complexity == "simple":
quality_weight = 0.3
cost_weight = 0.4
latency_weight = 0.3
return (
quality_score * quality_weight +
cost_score * cost_weight +
latency_score * latency_weight
)
def _generate_reason(self, candidate: dict, analysis: TaskAnalysis) -> str:
"""生成选择原因"""
reasons = []
if candidate["quality"] > 0.9:
reasons.append("高质量模型")
if candidate["estimated_cost"] < 0.01:
reasons.append("低成本")
if candidate["latency"] < 1000:
reasons.append("快速响应")
if analysis.requires_reasoning:
reasons.append("适合推理任务")
elif analysis.complexity == "simple":
reasons.append("适合简单任务")
return "、".join(reasons)
def _fallback_decision(self, analysis: TaskAnalysis) -> RoutingDecision:
"""降级决策"""
return RoutingDecision(
model="gpt-3.5-turbo",
reason="降级选择:无满足条件的模型",
estimated_cost=0.001,
estimated_latency_ms=500
)级联与重试策略
级联策略
python
class CascadeStrategy:
def __init__(self, model_sequence: list[str]):
self.model_sequence = model_sequence
async def execute(self, prompt: str, max_attempts: int = 3) -> str:
"""级联执行"""
for i, model in enumerate(self.model_sequence[:max_attempts]):
try:
result = await self._call_model(model, prompt)
# 验证结果
if self._is_valid_result(result):
return result
# 结果无效,尝试下一个模型
print(f"Model {model} returned invalid result, trying next...")
except Exception as e:
print(f"Model {model} failed: {e}")
continue
raise Exception("All models failed")
async def _call_model(self, model: str, prompt: str) -> str:
"""调用模型"""
# 实现模型调用
pass
def _is_valid_result(self, result: str) -> bool:
"""验证结果"""
if not result or len(result) < 10:
return False
if "error" in result.lower():
return False
return True
# 使用示例
cascade = CascadeStrategy([
"claude-opus", # 最高质量
"gpt-4-turbo", # 次高质量
"gpt-3.5-turbo", # 兜底
])重试策略
python
import asyncio
from typing import Optional
import random
class RetryStrategy:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
async def execute_with_retry(
self,
model: str,
prompt: str,
validate_func: Optional[callable] = None
) -> str:
"""带重试的执行"""
last_error = None
for attempt in range(self.max_retries):
try:
result = await self._call_model(model, prompt)
# 验证结果
if validate_func and not validate_func(result):
raise ValueError("Validation failed")
return result
except Exception as e:
last_error = e
# 计算延迟
delay = self._calculate_delay(attempt)
# 添加随机抖动
delay = delay * (1 + random.random() * 0.1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
raise Exception(f"All retries failed: {last_error}")
def _calculate_delay(self, attempt: int) -> float:
"""计算延迟(指数退避)"""
delay = self.base_delay * (self.exponential_base ** attempt)
return min(delay, self.max_delay)
async def _call_model(self, model: str, prompt: str) -> str:
"""调用模型"""
# 实现模型调用
pass缓存策略
语义缓存
python
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class CacheEntry:
query: str
response: str
model: str
embedding: list[float]
timestamp: float
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.cache: list[CacheEntry] = []
self.similarity_threshold = similarity_threshold
self.embedder = None # 嵌入模型
async def get(self, query: str) -> Optional[CacheEntry]:
"""获取缓存"""
query_embedding = await self._get_embedding(query)
for entry in self.cache:
similarity = self._cosine_similarity(query_embedding, entry.embedding)
if similarity >= self.similarity_threshold:
return entry
return None
async def set(self, query: str, response: str, model: str):
"""设置缓存"""
embedding = await self._get_embedding(query)
entry = CacheEntry(
query=query,
response=response,
model=model,
embedding=embedding,
timestamp=time.time()
)
self.cache.append(entry)
# 限制缓存大小
if len(self.cache) > 1000:
self.cache = self.cache[-1000:]
async def _get_embedding(self, text: str) -> list[float]:
"""获取嵌入向量"""
if self.embedder is None:
# 初始化嵌入模型
pass
return await self.embedder.embed(text)
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""计算余弦相似度"""
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
# 使用示例
cache = SemanticCache()
async def get_response_with_cache(query: str, model: str) -> str:
# 尝试从缓存获取
cached = await cache.get(query)
if cached:
return cached.response
# 调用模型
response = await call_model(model, query)
# 存入缓存
await cache.set(query, response, model)
return response最佳实践总结
模型选择决策表
| 场景 | 推荐模型 | 原因 |
|---|---|---|
| 简单分类 | GPT-3.5 / Claude Haiku | 快速、便宜、足够 |
| 信息抽取 | GPT-3.5 / Claude Haiku | 结构化输出 |
| 简单问答 | GPT-3.5 / Claude Haiku | 响应快 |
| 复杂问答 | GPT-4 Turbo / Claude Sonnet | 需要推理 |
| 代码生成 | GPT-4 / Claude Opus | 质量优先 |
| 创意写作 | GPT-4 / Claude Opus | 创造力 |
| 长文档分析 | Gemini 1.5 Pro | 超长上下文 |
| 实时对话 | GPT-3.5 / Claude Haiku | 时延优先 |
| 高并发场景 | GPT-3.5 / Claude Haiku | 成本控制 |
路由配置示例
yaml
# routing-config.yaml
default_model: gpt-4-turbo
routes:
# 简单任务路由
- name: simple_tasks
condition:
task_types: [classification, extraction, summarization]
max_tokens: 500
models:
- gpt-3.5-turbo
fallback: gpt-4-turbo
# 推理任务路由
- name: reasoning_tasks
condition:
requires_reasoning: true
models:
- gpt-4
- gpt-4-turbo
fallback: claude-opus
# 创意任务路由
- name: creative_tasks
condition:
requires_creativity: true
models:
- gpt-4
- claude-opus
fallback: gpt-4-turbo
# 高时延任务路由
- name: high_latency_allowed
condition:
max_latency_ms: 10000
models:
- gpt-4
- claude-opus
fallback: gpt-4-turbo
# 缓存配置
cache:
enabled: true
type: semantic
similarity_threshold: 0.95
max_entries: 1000
ttl_seconds: 3600
# 重试配置
retry:
max_retries: 3
base_delay_ms: 1000
max_delay_ms: 60000
exponential_base: 2
# 成本控制
cost_control:
daily_budget: 100 # 美元
per_request_budget: 1 # 美元
alert_threshold: 0.8 # 80%