成本优化策略
本文档整理 AI 系统成本优化的最佳实践,帮助在保证质量的前提下降低成本。
成本构成分析
txt
┌─────────────────────────────────────────────────────┐
│ AI 系统成本构成 │
├─────────────────────────────────────────────────────┤
│ │
│ 模型调用成本 (60-80%) │
│ ├── 输入 Token 成本 │
│ ├── 输出 Token 成本 │
│ └── 模型选择成本 │
│ │
│ 检索成本 (10-20%) │
│ ├── 向量存储成本 │
│ ├── 向量检索成本 │
│ └── 重排计算成本 │
│ │
│ 基础设施成本 (10-20%) │
│ ├── 计算资源 │
│ ├── 存储资源 │
│ └── 网络带宽 │
│ │
└─────────────────────────────────────────────────────┘模型定价参考
| 模型 | 输入价格 ($/1M tokens) | 输出价格 ($/1M tokens) | 上下文 |
|---|---|---|---|
| GPT-4 | 30.00 | 60.00 | 8K |
| GPT-4 Turbo | 10.00 | 30.00 | 128K |
| GPT-3.5 Turbo | 0.50 | 1.50 | 16K |
| Claude 3 Opus | 15.00 | 75.00 | 200K |
| Claude 3 Sonnet | 3.00 | 15.00 | 200K |
| Claude 3 Haiku | 0.25 | 1.25 | 200K |
| Gemini 1.5 Pro | 1.25 | 5.00 | 1M |
策略 1:Token 优化
输入优化
python
from typing import List, Dict, Any
class TokenOptimizer:
"""Token 优化器"""
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def optimize_prompt(self, prompt: str) -> str:
"""优化提示词"""
# 1. 移除多余空格
prompt = self._remove_extra_whitespace(prompt)
# 2. 压缩重复内容
prompt = self._compress_repetitive_content(prompt)
# 3. 简化冗余表达
prompt = self._simplify_redundancy(prompt)
return prompt
def _remove_extra_whitespace(self, text: str) -> str:
"""移除多余空格"""
import re
# 移除多余空格
text = re.sub(r' +', ' ', text)
# 移除多余换行
text = re.sub(r'\n+', '\n', text)
return text.strip()
def _compress_repetitive_content(self, text: str) -> str:
"""压缩重复内容"""
import re
# 检测并压缩重复的句子
sentences = text.split('. ')
seen = set()
unique_sentences = []
for sentence in sentences:
normalized = sentence.lower().strip()
if normalized not in seen:
seen.add(normalized)
unique_sentences.append(sentence)
return '. '.join(unique_sentences)
def _simplify_redundancy(self, text: str) -> str:
"""简化冗余表达"""
redundancies = {
"please please": "please",
"very very": "very",
"absolutely absolutely": "absolutely",
"在在": "在",
"是是": "是",
}
for redundant, simplified in redundancies.items():
text = text.replace(redundant, simplified)
return text
def truncate_context(
self,
context: List[Dict[str, Any]],
max_tokens: int,
keep_first: int = 1,
keep_last: int = 1
) -> List[Dict[str, Any]]:
"""截断上下文"""
if not context:
return context
# 计算当前 token 数
total_tokens = sum(
len(self.tokenizer.encode(item.get("content", "")))
for item in context
)
if total_tokens <= max_tokens:
return context
# 保留首尾,截断中间
result = []
remaining_tokens = max_tokens
# 保留首部
for i in range(min(keep_first, len(context))):
tokens = len(self.tokenizer.encode(context[i].get("content", "")))
if remaining_tokens >= tokens:
result.append(context[i])
remaining_tokens -= tokens
# 保留尾部
tail = []
for i in range(len(context) - 1, max(keep_first - 1, len(context) - keep_last - 1), -1):
tokens = len(self.tokenizer.encode(context[i].get("content", "")))
if remaining_tokens >= tokens:
tail.insert(0, context[i])
remaining_tokens -= tokens
else:
break
result.extend(tail)
return result
def summarize_context(
self,
context: List[Dict[str, Any]],
target_ratio: float = 0.3
) -> str:
"""摘要上下文"""
# 合并上下文
combined = "\n".join(
f"{item.get('role', '')}: {item.get('content', '')}"
for item in context
)
# 计算目标长度
original_tokens = len(self.tokenizer.encode(combined))
target_tokens = int(original_tokens * target_ratio)
# 使用模型摘要(简化版,实际应调用模型)
summary = self._extract_key_points(combined, target_tokens)
return summary
def _extract_key_points(self, text: str, max_tokens: int) -> str:
"""提取关键点"""
# 简化实现:提取前 N 个句子
sentences = text.split('. ')
result = []
current_tokens = 0
for sentence in sentences:
tokens = len(self.tokenizer.encode(sentence))
if current_tokens + tokens <= max_tokens:
result.append(sentence)
current_tokens += tokens
else:
break
return '. '.join(result)
# 使用示例
optimizer = TokenOptimizer(tokenizer)
# 优化提示词
original_prompt = """
请帮我写一篇关于人工智能的文章。
人工智能是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。
请写一篇详细的文章,要求内容丰富、结构清晰。
"""
optimized_prompt = optimizer.optimize_prompt(original_prompt)
print(f"优化后: {optimized_prompt}")输出优化
python
from typing import Dict, Any
class OutputOptimizer:
"""输出优化器"""
def __init__(self):
self.max_output_tokens = 4096
def optimize_output_config(
self,
task_type: str,
expected_length: str = "medium"
) -> Dict[str, Any]:
"""优化输出配置"""
configs = {
"classification": {
"short": {"max_tokens": 50},
"medium": {"max_tokens": 100},
"long": {"max_tokens": 200}
},
"extraction": {
"short": {"max_tokens": 100},
"medium": {"max_tokens": 300},
"long": {"max_tokens": 500}
},
"summarization": {
"short": {"max_tokens": 100},
"medium": {"max_tokens": 300},
"long": {"max_tokens": 500}
},
"qa": {
"short": {"max_tokens": 100},
"medium": {"max_tokens": 300},
"long": {"max_tokens": 500}
},
"generation": {
"short": {"max_tokens": 300},
"medium": {"max_tokens": 1000},
"long": {"max_tokens": 2000}
}
}
return configs.get(task_type, {}).get(expected_length, {"max_tokens": 1000})
def estimate_output_tokens(self, task: str, input_length: int) -> int:
"""估算输出 token 数"""
# 基于任务类型估算
task_ratios = {
"classification": 0.1,
"extraction": 0.3,
"summarization": 0.3,
"qa": 0.5,
"generation": 1.0,
"translation": 1.2
}
ratio = task_ratios.get(task, 0.5)
return min(int(input_length * ratio), self.max_output_tokens)
def should_stop_early(self, content: str, task: str) -> bool:
"""判断是否可以提前停止"""
# 对于某些任务,如果已经得到答案,可以提前停止
early_stop_patterns = {
"classification": [r"^(yes|no|true|false|[a-z])$", r"^\d+$"],
"extraction": [r"^{.*}$", r"^\[.*\]$"],
"qa": [r"\.\s*$"] # 以句号结尾
}
import re
patterns = early_stop_patterns.get(task, [])
for pattern in patterns:
if re.match(pattern, content.strip(), re.IGNORECASE):
return True
return False
# 使用示例
output_optimizer = OutputOptimizer()
# 获取输出配置
config = output_optimizer.optimize_output_config("qa", "medium")
print(f"输出配置: {config}")
# 估算输出 token
estimated = output_optimizer.estimate_output_tokens("qa", 100)
print(f"估算输出 token: {estimated}")策略 2:智能缓存
语义缓存
python
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import hashlib
import json
import time
@dataclass
class CacheEntry:
query: str
response: str
embedding: List[float]
timestamp: float
hit_count: int
model: str
tokens: int
class SemanticCache:
"""语义缓存:基于相似度的缓存"""
def __init__(
self,
similarity_threshold: float = 0.95,
max_entries: int = 10000,
ttl_seconds: int = 3600,
embedding_model = None
):
self.similarity_threshold = similarity_threshold
self.max_entries = max_entries
self.ttl_seconds = ttl_seconds
self.embedding_model = embedding_model
self.cache: Dict[str, CacheEntry] = {}
self.embeddings: List[List[float]] = []
self.keys: List[str] = []
async def get(self, query: str) -> Optional[str]:
"""获取缓存"""
# 获取查询向量
query_embedding = await self._get_embedding(query)
# 查找相似缓存
best_match = None
best_similarity = 0.0
for key, entry in self.cache.items():
# 检查 TTL
if time.time() - entry.timestamp > self.ttl_seconds:
continue
# 计算相似度
similarity = self._cosine_similarity(query_embedding, entry.embedding)
if similarity > best_similarity and similarity >= self.similarity_threshold:
best_similarity = similarity
best_match = entry
if best_match:
best_match.hit_count += 1
return best_match.response
return None
async def set(
self,
query: str,
response: str,
model: str,
tokens: int
):
"""设置缓存"""
# 获取向量
embedding = await self._get_embedding(query)
# 创建缓存条目
entry = CacheEntry(
query=query,
response=response,
embedding=embedding,
timestamp=time.time(),
hit_count=0,
model=model,
tokens=tokens
)
# 生成键
key = self._generate_key(query)
# 检查容量
if len(self.cache) >= self.max_entries:
self._evict_lru()
# 存储缓存
self.cache[key] = entry
self.embeddings.append(embedding)
self.keys.append(key)
async def _get_embedding(self, text: str) -> List[float]:
"""获取文本向量"""
if self.embedding_model:
return await self.embedding_model.embed(text)
# 简化实现:使用哈希作为向量
return [float(ord(c)) for c in text[:100]]
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""计算余弦相似度"""
import numpy as np
a_array = np.array(a)
b_array = np.array(b)
dot_product = np.dot(a_array, b_array)
norm_a = np.linalg.norm(a_array)
norm_b = np.linalg.norm(b_array)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(dot_product / (norm_a * norm_b))
def _generate_key(self, query: str) -> str:
"""生成缓存键"""
normalized = query.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()
def _evict_lru(self):
"""淘汰最少使用的缓存"""
# 找到最少的命中次数
min_key = min(self.cache.keys(), key=lambda k: self.cache[k].hit_count)
del self.cache[min_key]
# 从向量列表中移除
idx = self.keys.index(min_key)
del self.keys[idx]
del self.embeddings[idx]
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
total_hits = sum(e.hit_count for e in self.cache.values())
total_tokens = sum(e.tokens for e in self.cache.values())
return {
"entries": len(self.cache),
"total_hits": total_hits,
"total_tokens_saved": total_tokens,
"estimated_cost_saved": total_tokens * 0.00001 # 假设 $0.01/1M tokens
}
# 使用示例
cache = SemanticCache(similarity_threshold=0.95)
async def get_response_with_cache(query: str, model: str) -> str:
# 尝试从缓存获取
cached = await cache.get(query)
if cached:
return cached
# 调用模型
response = await call_model(model, query)
tokens = len(response.split()) * 2
# 存入缓存
await cache.set(query, response, model, tokens)
return response策略 3:智能路由
python
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ModelTier(Enum):
PREMIUM = "premium" # 最高质量
STANDARD = "standard" # 标准质量
ECONOMY = "economy" # 经济型
@dataclass
class RoutingDecision:
model: str
tier: ModelTier
estimated_cost: float
estimated_quality: float
reason: str
class CostAwareRouter:
"""成本感知路由器"""
def __init__(self):
# 模型成本($ / 1M tokens)
self.model_costs = {
"gpt-4": {"input": 30, "output": 60, "quality": 0.95},
"gpt-4-turbo": {"input": 10, "output": 30, "quality": 0.90},
"gpt-3.5-turbo": {"input": 0.5, "output": 1.5, "quality": 0.70},
"claude-opus": {"input": 15, "output": 75, "quality": 0.92},
"claude-sonnet": {"input": 3, "output": 15, "quality": 0.85},
"claude-haiku": {"input": 0.25, "output": 1.25, "quality": 0.65},
}
# 任务复杂度阈值
self.complexity_thresholds = {
"simple": 0.3,
"medium": 0.6,
"complex": 1.0
}
def route(
self,
query: str,
task_type: str,
budget: Optional[float] = None,
min_quality: float = 0.7,
max_latency_ms: Optional[int] = None
) -> RoutingDecision:
"""路由到合适的模型"""
# 分析查询复杂度
complexity = self._analyze_complexity(query, task_type)
# 获取候选模型
candidates = self._get_candidates(complexity, min_quality)
# 根据预算和延迟过滤
if budget:
candidates = [c for c in candidates if c["cost"] <= budget]
# 选择最优模型
best = self._select_best(candidates, complexity)
return RoutingDecision(
model=best["name"],
tier=best["tier"],
estimated_cost=best["cost"],
estimated_quality=best["quality"],
reason=best["reason"]
)
def _analyze_complexity(self, query: str, task_type: str) -> float:
"""分析查询复杂度"""
complexity = 0.0
# 基于任务类型
task_complexity = {
"classification": 0.1,
"extraction": 0.2,
"summarization": 0.3,
"qa": 0.4,
"translation": 0.5,
"generation": 0.6,
"reasoning": 0.8,
"coding": 0.9
}
complexity += task_complexity.get(task_type, 0.5) * 0.5
# 基于查询长度
length = len(query.split())
if length > 500:
complexity += 0.2
elif length > 200:
complexity += 0.1
# 基于关键词
complex_keywords = [
"分析", "推理", "比较", "设计", "优化",
"analyze", "reasoning", "compare", "design", "optimize"
]
for keyword in complex_keywords:
if keyword in query.lower():
complexity += 0.1
break
return min(complexity, 1.0)
def _get_candidates(
self,
complexity: float,
min_quality: float
) -> list[Dict[str, Any]]:
"""获取候选模型"""
candidates = []
for name, info in self.model_costs.items():
# 过滤质量不达标的
if info["quality"] < min_quality:
continue
# 估算成本(假设平均 1000 输入 + 500 输出)
estimated_cost = (
info["input"] * 1000 / 1_000_000 +
info["output"] * 500 / 1_000_000
)
# 确定层级
if info["quality"] >= 0.90:
tier = ModelTier.PREMIUM
elif info["quality"] >= 0.80:
tier = ModelTier.STANDARD
else:
tier = ModelTier.ECONOMY
candidates.append({
"name": name,
"tier": tier,
"quality": info["quality"],
"cost": estimated_cost,
"complexity_match": abs(info["quality"] - (complexity + 0.3))
})
return candidates
def _select_best(
self,
candidates: list[Dict[str, Any]],
complexity: float
) -> Dict[str, Any]:
"""选择最优模型"""
if not candidates:
# 降级到最便宜的模型
return {
"name": "gpt-3.5-turbo",
"tier": ModelTier.ECONOMY,
"quality": 0.70,
"cost": 0.001,
"reason": "降级选择:无满足条件的模型"
}
# 按性价比排序
for candidate in candidates:
# 计算综合得分
quality_score = candidate["quality"]
cost_score = 1 - min(candidate["cost"] / 0.1, 1) # 归一化
complexity_score = 1 - candidate["complexity_match"]
# 权重:复杂任务优先质量
if complexity > 0.6:
weights = (0.5, 0.2, 0.3)
else:
weights = (0.3, 0.4, 0.3)
candidate["score"] = (
quality_score * weights[0] +
cost_score * weights[1] +
complexity_score * weights[2]
)
candidate["reason"] = self._generate_reason(candidate, complexity)
# 选择得分最高的
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[0]
def _generate_reason(self, candidate: Dict[str, Any], complexity: float) -> str:
"""生成选择原因"""
reasons = []
if candidate["quality"] >= 0.90:
reasons.append("高质量模型")
elif candidate["quality"] >= 0.80:
reasons.append("标准质量模型")
else:
reasons.append("经济型模型")
if complexity > 0.6:
reasons.append("适合复杂任务")
elif complexity < 0.3:
reasons.append("适合简单任务")
if candidate["cost"] < 0.01:
reasons.append("低成本")
return "、".join(reasons)
# 使用示例
router = CostAwareRouter()
# 简单任务
decision = router.route(
query="这个邮件是垃圾邮件吗?",
task_type="classification",
min_quality=0.7
)
print(f"选择模型: {decision.model}, 原因: {decision.reason}")
# 复杂任务
decision = router.route(
query="请分析这个算法的时间复杂度,并提出优化建议",
task_type="reasoning",
min_quality=0.85
)
print(f"选择模型: {decision.model}, 原因: {decision.reason}")策略 4:批量处理
python
from typing import List, Dict, Any
from dataclasses import dataclass
import asyncio
@dataclass
class BatchRequest:
id: str
prompt: str
priority: int = 0
@dataclass
class BatchResult:
id: str
response: str
tokens_used: int
cost: float
class BatchProcessor:
"""批量处理器"""
def __init__(
self,
model_client,
batch_size: int = 20,
max_concurrent: int = 5
):
self.model_client = model_client
self.batch_size = batch_size
self.max_concurrent = max_concurrent
async def process_batch(
self,
requests: List[BatchRequest]
) -> List[BatchResult]:
"""批量处理请求"""
# 按优先级排序
sorted_requests = sorted(requests, key=lambda x: x.priority, reverse=True)
# 分批
batches = [
sorted_requests[i:i + self.batch_size]
for i in range(0, len(sorted_requests), self.batch_size)
]
# 并行处理批次
semaphore = asyncio.Semaphore(self.max_concurrent)
async def process_with_semaphore(batch):
async with semaphore:
return await self._process_single_batch(batch)
tasks = [process_with_semaphore(batch) for batch in batches]
results = await asyncio.gather(*tasks)
# 合并结果
all_results = []
for batch_results in results:
all_results.extend(batch_results)
return all_results
async def _process_single_batch(
self,
batch: List[BatchRequest]
) -> List[BatchResult]:
"""处理单个批次"""
results = []
# 合并提示词(如果模型支持批量)
combined_prompt = self._combine_prompts(batch)
# 调用模型
response = await self.model_client.generate(combined_prompt)
# 解析响应
parsed_responses = self._parse_batch_response(response, batch)
for req, resp in zip(batch, parsed_responses):
tokens = len(req.prompt.split()) + len(resp.split())
results.append(BatchResult(
id=req.id,
response=resp,
tokens_used=tokens,
cost=self._calculate_cost(tokens)
))
return results
def _combine_prompts(self, batch: List[BatchRequest]) -> str:
"""合并提示词"""
prompts = []
for i, req in enumerate(batch):
prompts.append(f"[请求 {i+1}]\n{req.prompt}")
return "\n\n---\n\n".join(prompts)
def _parse_batch_response(
self,
response: str,
batch: List[BatchRequest]
) -> List[str]:
"""解析批量响应"""
# 按分隔符分割
parts = response.split("---")
# 如果分割数量不匹配,使用单次响应
if len(parts) != len(batch):
return [response] * len(batch)
return [p.strip() for p in parts]
def _calculate_cost(self, tokens: int) -> float:
"""计算成本"""
# 假设 $0.01 / 1K tokens
return tokens * 0.01 / 1000
# 使用示例
async def batch_example():
client = ModelClient()
processor = BatchProcessor(client, batch_size=10)
requests = [
BatchRequest(id="1", prompt="翻译:Hello", priority=1),
BatchRequest(id="2", prompt="翻译:World", priority=2),
BatchRequest(id="3", prompt="翻译:AI", priority=0),
]
results = await processor.process_batch(requests)
for result in results:
print(f"ID: {result.id}, 响应: {result.response}, 成本: ${result.cost:.6f}")成本监控
python
from typing import Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
@dataclass
class CostRecord:
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost: float
task_type: str
user_id: str
class CostMonitor:
"""成本监控器"""
def __init__(self, alert_threshold: float = 100.0):
self.alert_threshold = alert_threshold
self.records: list[CostRecord] = []
self.daily_costs: Dict[str, float] = {}
self.model_costs: Dict[str, float] = {}
self.user_costs: Dict[str, float] = {}
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
cost: float,
task_type: str,
user_id: str
):
"""记录成本"""
record = CostRecord(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
task_type=task_type,
user_id=user_id
)
self.records.append(record)
# 更新统计
date_key = record.timestamp.strftime("%Y-%m-%d")
self.daily_costs[date_key] = self.daily_costs.get(date_key, 0) + cost
self.model_costs[model] = self.model_costs.get(model, 0) + cost
self.user_costs[user_id] = self.user_costs.get(user_id, 0) + cost
# 检查告警
self._check_alert(date_key, user_id, cost)
def _check_alert(self, date_key: str, user_id: str, cost: float):
"""检查告警"""
# 每日成本告警
if self.daily_costs.get(date_key, 0) > self.alert_threshold:
self._send_alert(
level="warning",
message=f"每日成本 ${self.daily_costs[date_key]:.2f} 超过阈值 ${self.alert_threshold}"
)
# 单次成本告警
if cost > 1.0:
self._send_alert(
level="warning",
message=f"单次成本 ${cost:.2f} 较高"
)
def _send_alert(self, level: str, message: str):
"""发送告警"""
print(f"[{level.upper()}] {message}")
def get_report(self, period: timedelta = timedelta(days=7)) -> Dict[str, Any]:
"""获取报告"""
cutoff = datetime.now() - period
recent_records = [r for r in self.records if r.timestamp >= cutoff]
total_cost = sum(r.cost for r in recent_records)
total_input_tokens = sum(r.input_tokens for r in recent_records)
total_output_tokens = sum(r.output_tokens for r in recent_records)
# 按模型分组
model_breakdown = {}
for r in recent_records:
if r.model not in model_breakdown:
model_breakdown[r.model] = {
"cost": 0,
"calls": 0,
"input_tokens": 0,
"output_tokens": 0
}
model_breakdown[r.model]["cost"] += r.cost
model_breakdown[r.model]["calls"] += 1
model_breakdown[r.model]["input_tokens"] += r.input_tokens
model_breakdown[r.model]["output_tokens"] += r.output_tokens
# 按任务类型分组
task_breakdown = {}
for r in recent_records:
if r.task_type not in task_breakdown:
task_breakdown[r.task_type] = {"cost": 0, "calls": 0}
task_breakdown[r.task_type]["cost"] += r.cost
task_breakdown[r.task_type]["calls"] += 1
return {
"period": f"{period.days} days",
"total_cost": total_cost,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_calls": len(recent_records),
"average_cost_per_call": total_cost / len(recent_records) if recent_records else 0,
"model_breakdown": model_breakdown,
"task_breakdown": task_breakdown,
"top_users": sorted(
[(u, c) for u, c in self.user_costs.items()],
key=lambda x: x[1],
reverse=True
)[:10]
}
# 使用示例
monitor = CostMonitor(alert_threshold=100.0)
# 记录成本
monitor.record(
model="gpt-4-turbo",
input_tokens=1000,
output_tokens=500,
cost=0.025,
task_type="qa",
user_id="user123"
)
# 获取报告
report = monitor.get_report(period=timedelta(days=7))
print(json.dumps(report, indent=2))最佳实践总结
成本优化检查清单
markdown
## 成本优化检查清单
### Token 优化
- [ ] 压缩提示词
- [ ] 移除冗余内容
- [ ] 限制输出长度
- [ ] 使用更短的示例
### 缓存策略
- [ ] 实现语义缓存
- [ ] 设置合理的 TTL
- [ ] 实现缓存预热
- [ ] 监控命中率
### 模型选择
- [ ] 根据任务复杂度选择模型
- [ ] 实现智能路由
- [ ] 使用小模型处理简单任务
- [ ] 批量处理请求
### 监控与告警
- [ ] 记录每次调用的成本
- [ ] 设置成本告警阈值
- [ ] 分析成本趋势
- [ ] 识别高成本用户/任务