Skip to content

成本优化策略

本文档整理 AI 系统成本优化的最佳实践,帮助在保证质量的前提下降低成本。

成本构成分析

txt
┌─────────────────────────────────────────────────────┐
│                   AI 系统成本构成                     │
├─────────────────────────────────────────────────────┤
│                                                     │
│  模型调用成本 (60-80%)                              │
│  ├── 输入 Token 成本                               │
│  ├── 输出 Token 成本                               │
│  └── 模型选择成本                                  │
│                                                     │
│  检索成本 (10-20%)                                  │
│  ├── 向量存储成本                                  │
│  ├── 向量检索成本                                  │
│  └── 重排计算成本                                  │
│                                                     │
│  基础设施成本 (10-20%)                              │
│  ├── 计算资源                                      │
│  ├── 存储资源                                      │
│  └── 网络带宽                                      │
│                                                     │
└─────────────────────────────────────────────────────┘

模型定价参考

模型输入价格 ($/1M tokens)输出价格 ($/1M tokens)上下文
GPT-430.0060.008K
GPT-4 Turbo10.0030.00128K
GPT-3.5 Turbo0.501.5016K
Claude 3 Opus15.0075.00200K
Claude 3 Sonnet3.0015.00200K
Claude 3 Haiku0.251.25200K
Gemini 1.5 Pro1.255.001M

策略 1:Token 优化

输入优化

python
from typing import List, Dict, Any

class TokenOptimizer:
    """Token 优化器"""

    def __init__(self, tokenizer):
        self.tokenizer = tokenizer

    def optimize_prompt(self, prompt: str) -> str:
        """优化提示词"""
        # 1. 移除多余空格
        prompt = self._remove_extra_whitespace(prompt)

        # 2. 压缩重复内容
        prompt = self._compress_repetitive_content(prompt)

        # 3. 简化冗余表达
        prompt = self._simplify_redundancy(prompt)

        return prompt

    def _remove_extra_whitespace(self, text: str) -> str:
        """移除多余空格"""
        import re
        # 移除多余空格
        text = re.sub(r' +', ' ', text)
        # 移除多余换行
        text = re.sub(r'\n+', '\n', text)
        return text.strip()

    def _compress_repetitive_content(self, text: str) -> str:
        """压缩重复内容"""
        import re
        # 检测并压缩重复的句子
        sentences = text.split('. ')
        seen = set()
        unique_sentences = []

        for sentence in sentences:
            normalized = sentence.lower().strip()
            if normalized not in seen:
                seen.add(normalized)
                unique_sentences.append(sentence)

        return '. '.join(unique_sentences)

    def _simplify_redundancy(self, text: str) -> str:
        """简化冗余表达"""
        redundancies = {
            "please please": "please",
            "very very": "very",
            "absolutely absolutely": "absolutely",
            "在在": "在",
            "是是": "是",
        }

        for redundant, simplified in redundancies.items():
            text = text.replace(redundant, simplified)

        return text

    def truncate_context(
        self,
        context: List[Dict[str, Any]],
        max_tokens: int,
        keep_first: int = 1,
        keep_last: int = 1
    ) -> List[Dict[str, Any]]:
        """截断上下文"""
        if not context:
            return context

        # 计算当前 token 数
        total_tokens = sum(
            len(self.tokenizer.encode(item.get("content", "")))
            for item in context
        )

        if total_tokens <= max_tokens:
            return context

        # 保留首尾,截断中间
        result = []
        remaining_tokens = max_tokens

        # 保留首部
        for i in range(min(keep_first, len(context))):
            tokens = len(self.tokenizer.encode(context[i].get("content", "")))
            if remaining_tokens >= tokens:
                result.append(context[i])
                remaining_tokens -= tokens

        # 保留尾部
        tail = []
        for i in range(len(context) - 1, max(keep_first - 1, len(context) - keep_last - 1), -1):
            tokens = len(self.tokenizer.encode(context[i].get("content", "")))
            if remaining_tokens >= tokens:
                tail.insert(0, context[i])
                remaining_tokens -= tokens
            else:
                break

        result.extend(tail)
        return result

    def summarize_context(
        self,
        context: List[Dict[str, Any]],
        target_ratio: float = 0.3
    ) -> str:
        """摘要上下文"""
        # 合并上下文
        combined = "\n".join(
            f"{item.get('role', '')}: {item.get('content', '')}"
            for item in context
        )

        # 计算目标长度
        original_tokens = len(self.tokenizer.encode(combined))
        target_tokens = int(original_tokens * target_ratio)

        # 使用模型摘要(简化版,实际应调用模型)
        summary = self._extract_key_points(combined, target_tokens)

        return summary

    def _extract_key_points(self, text: str, max_tokens: int) -> str:
        """提取关键点"""
        # 简化实现:提取前 N 个句子
        sentences = text.split('. ')

        result = []
        current_tokens = 0

        for sentence in sentences:
            tokens = len(self.tokenizer.encode(sentence))
            if current_tokens + tokens <= max_tokens:
                result.append(sentence)
                current_tokens += tokens
            else:
                break

        return '. '.join(result)

# 使用示例
optimizer = TokenOptimizer(tokenizer)

# 优化提示词
original_prompt = """
请帮我写一篇关于人工智能的文章。

人工智能是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。

请写一篇详细的文章,要求内容丰富、结构清晰。
"""

optimized_prompt = optimizer.optimize_prompt(original_prompt)
print(f"优化后: {optimized_prompt}")

输出优化

python
from typing import Dict, Any

class OutputOptimizer:
    """输出优化器"""

    def __init__(self):
        self.max_output_tokens = 4096

    def optimize_output_config(
        self,
        task_type: str,
        expected_length: str = "medium"
    ) -> Dict[str, Any]:
        """优化输出配置"""
        configs = {
            "classification": {
                "short": {"max_tokens": 50},
                "medium": {"max_tokens": 100},
                "long": {"max_tokens": 200}
            },
            "extraction": {
                "short": {"max_tokens": 100},
                "medium": {"max_tokens": 300},
                "long": {"max_tokens": 500}
            },
            "summarization": {
                "short": {"max_tokens": 100},
                "medium": {"max_tokens": 300},
                "long": {"max_tokens": 500}
            },
            "qa": {
                "short": {"max_tokens": 100},
                "medium": {"max_tokens": 300},
                "long": {"max_tokens": 500}
            },
            "generation": {
                "short": {"max_tokens": 300},
                "medium": {"max_tokens": 1000},
                "long": {"max_tokens": 2000}
            }
        }

        return configs.get(task_type, {}).get(expected_length, {"max_tokens": 1000})

    def estimate_output_tokens(self, task: str, input_length: int) -> int:
        """估算输出 token 数"""
        # 基于任务类型估算
        task_ratios = {
            "classification": 0.1,
            "extraction": 0.3,
            "summarization": 0.3,
            "qa": 0.5,
            "generation": 1.0,
            "translation": 1.2
        }

        ratio = task_ratios.get(task, 0.5)
        return min(int(input_length * ratio), self.max_output_tokens)

    def should_stop_early(self, content: str, task: str) -> bool:
        """判断是否可以提前停止"""
        # 对于某些任务,如果已经得到答案,可以提前停止
        early_stop_patterns = {
            "classification": [r"^(yes|no|true|false|[a-z])$", r"^\d+$"],
            "extraction": [r"^{.*}$", r"^\[.*\]$"],
            "qa": [r"\.\s*$"]  # 以句号结尾
        }

        import re
        patterns = early_stop_patterns.get(task, [])

        for pattern in patterns:
            if re.match(pattern, content.strip(), re.IGNORECASE):
                return True

        return False

# 使用示例
output_optimizer = OutputOptimizer()

# 获取输出配置
config = output_optimizer.optimize_output_config("qa", "medium")
print(f"输出配置: {config}")

# 估算输出 token
estimated = output_optimizer.estimate_output_tokens("qa", 100)
print(f"估算输出 token: {estimated}")

策略 2:智能缓存

语义缓存

python
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import hashlib
import json
import time

@dataclass
class CacheEntry:
    query: str
    response: str
    embedding: List[float]
    timestamp: float
    hit_count: int
    model: str
    tokens: int

class SemanticCache:
    """语义缓存:基于相似度的缓存"""

    def __init__(
        self,
        similarity_threshold: float = 0.95,
        max_entries: int = 10000,
        ttl_seconds: int = 3600,
        embedding_model = None
    ):
        self.similarity_threshold = similarity_threshold
        self.max_entries = max_entries
        self.ttl_seconds = ttl_seconds
        self.embedding_model = embedding_model

        self.cache: Dict[str, CacheEntry] = {}
        self.embeddings: List[List[float]] = []
        self.keys: List[str] = []

    async def get(self, query: str) -> Optional[str]:
        """获取缓存"""
        # 获取查询向量
        query_embedding = await self._get_embedding(query)

        # 查找相似缓存
        best_match = None
        best_similarity = 0.0

        for key, entry in self.cache.items():
            # 检查 TTL
            if time.time() - entry.timestamp > self.ttl_seconds:
                continue

            # 计算相似度
            similarity = self._cosine_similarity(query_embedding, entry.embedding)

            if similarity > best_similarity and similarity >= self.similarity_threshold:
                best_similarity = similarity
                best_match = entry

        if best_match:
            best_match.hit_count += 1
            return best_match.response

        return None

    async def set(
        self,
        query: str,
        response: str,
        model: str,
        tokens: int
    ):
        """设置缓存"""
        # 获取向量
        embedding = await self._get_embedding(query)

        # 创建缓存条目
        entry = CacheEntry(
            query=query,
            response=response,
            embedding=embedding,
            timestamp=time.time(),
            hit_count=0,
            model=model,
            tokens=tokens
        )

        # 生成键
        key = self._generate_key(query)

        # 检查容量
        if len(self.cache) >= self.max_entries:
            self._evict_lru()

        # 存储缓存
        self.cache[key] = entry
        self.embeddings.append(embedding)
        self.keys.append(key)

    async def _get_embedding(self, text: str) -> List[float]:
        """获取文本向量"""
        if self.embedding_model:
            return await self.embedding_model.embed(text)
        # 简化实现:使用哈希作为向量
        return [float(ord(c)) for c in text[:100]]

    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """计算余弦相似度"""
        import numpy as np
        a_array = np.array(a)
        b_array = np.array(b)

        dot_product = np.dot(a_array, b_array)
        norm_a = np.linalg.norm(a_array)
        norm_b = np.linalg.norm(b_array)

        if norm_a == 0 or norm_b == 0:
            return 0.0

        return float(dot_product / (norm_a * norm_b))

    def _generate_key(self, query: str) -> str:
        """生成缓存键"""
        normalized = query.lower().strip()
        return hashlib.md5(normalized.encode()).hexdigest()

    def _evict_lru(self):
        """淘汰最少使用的缓存"""
        # 找到最少的命中次数
        min_key = min(self.cache.keys(), key=lambda k: self.cache[k].hit_count)
        del self.cache[min_key]

        # 从向量列表中移除
        idx = self.keys.index(min_key)
        del self.keys[idx]
        del self.embeddings[idx]

    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        total_hits = sum(e.hit_count for e in self.cache.values())
        total_tokens = sum(e.tokens for e in self.cache.values())

        return {
            "entries": len(self.cache),
            "total_hits": total_hits,
            "total_tokens_saved": total_tokens,
            "estimated_cost_saved": total_tokens * 0.00001  # 假设 $0.01/1M tokens
        }

# 使用示例
cache = SemanticCache(similarity_threshold=0.95)

async def get_response_with_cache(query: str, model: str) -> str:
    # 尝试从缓存获取
    cached = await cache.get(query)
    if cached:
        return cached

    # 调用模型
    response = await call_model(model, query)
    tokens = len(response.split()) * 2

    # 存入缓存
    await cache.set(query, response, model, tokens)

    return response

策略 3:智能路由

python
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

class ModelTier(Enum):
    PREMIUM = "premium"      # 最高质量
    STANDARD = "standard"    # 标准质量
    ECONOMY = "economy"      # 经济型

@dataclass
class RoutingDecision:
    model: str
    tier: ModelTier
    estimated_cost: float
    estimated_quality: float
    reason: str

class CostAwareRouter:
    """成本感知路由器"""

    def __init__(self):
        # 模型成本($ / 1M tokens)
        self.model_costs = {
            "gpt-4": {"input": 30, "output": 60, "quality": 0.95},
            "gpt-4-turbo": {"input": 10, "output": 30, "quality": 0.90},
            "gpt-3.5-turbo": {"input": 0.5, "output": 1.5, "quality": 0.70},
            "claude-opus": {"input": 15, "output": 75, "quality": 0.92},
            "claude-sonnet": {"input": 3, "output": 15, "quality": 0.85},
            "claude-haiku": {"input": 0.25, "output": 1.25, "quality": 0.65},
        }

        # 任务复杂度阈值
        self.complexity_thresholds = {
            "simple": 0.3,
            "medium": 0.6,
            "complex": 1.0
        }

    def route(
        self,
        query: str,
        task_type: str,
        budget: Optional[float] = None,
        min_quality: float = 0.7,
        max_latency_ms: Optional[int] = None
    ) -> RoutingDecision:
        """路由到合适的模型"""
        # 分析查询复杂度
        complexity = self._analyze_complexity(query, task_type)

        # 获取候选模型
        candidates = self._get_candidates(complexity, min_quality)

        # 根据预算和延迟过滤
        if budget:
            candidates = [c for c in candidates if c["cost"] <= budget]

        # 选择最优模型
        best = self._select_best(candidates, complexity)

        return RoutingDecision(
            model=best["name"],
            tier=best["tier"],
            estimated_cost=best["cost"],
            estimated_quality=best["quality"],
            reason=best["reason"]
        )

    def _analyze_complexity(self, query: str, task_type: str) -> float:
        """分析查询复杂度"""
        complexity = 0.0

        # 基于任务类型
        task_complexity = {
            "classification": 0.1,
            "extraction": 0.2,
            "summarization": 0.3,
            "qa": 0.4,
            "translation": 0.5,
            "generation": 0.6,
            "reasoning": 0.8,
            "coding": 0.9
        }
        complexity += task_complexity.get(task_type, 0.5) * 0.5

        # 基于查询长度
        length = len(query.split())
        if length > 500:
            complexity += 0.2
        elif length > 200:
            complexity += 0.1

        # 基于关键词
        complex_keywords = [
            "分析", "推理", "比较", "设计", "优化",
            "analyze", "reasoning", "compare", "design", "optimize"
        ]
        for keyword in complex_keywords:
            if keyword in query.lower():
                complexity += 0.1
                break

        return min(complexity, 1.0)

    def _get_candidates(
        self,
        complexity: float,
        min_quality: float
    ) -> list[Dict[str, Any]]:
        """获取候选模型"""
        candidates = []

        for name, info in self.model_costs.items():
            # 过滤质量不达标的
            if info["quality"] < min_quality:
                continue

            # 估算成本(假设平均 1000 输入 + 500 输出)
            estimated_cost = (
                info["input"] * 1000 / 1_000_000 +
                info["output"] * 500 / 1_000_000
            )

            # 确定层级
            if info["quality"] >= 0.90:
                tier = ModelTier.PREMIUM
            elif info["quality"] >= 0.80:
                tier = ModelTier.STANDARD
            else:
                tier = ModelTier.ECONOMY

            candidates.append({
                "name": name,
                "tier": tier,
                "quality": info["quality"],
                "cost": estimated_cost,
                "complexity_match": abs(info["quality"] - (complexity + 0.3))
            })

        return candidates

    def _select_best(
        self,
        candidates: list[Dict[str, Any]],
        complexity: float
    ) -> Dict[str, Any]:
        """选择最优模型"""
        if not candidates:
            # 降级到最便宜的模型
            return {
                "name": "gpt-3.5-turbo",
                "tier": ModelTier.ECONOMY,
                "quality": 0.70,
                "cost": 0.001,
                "reason": "降级选择:无满足条件的模型"
            }

        # 按性价比排序
        for candidate in candidates:
            # 计算综合得分
            quality_score = candidate["quality"]
            cost_score = 1 - min(candidate["cost"] / 0.1, 1)  # 归一化
            complexity_score = 1 - candidate["complexity_match"]

            # 权重:复杂任务优先质量
            if complexity > 0.6:
                weights = (0.5, 0.2, 0.3)
            else:
                weights = (0.3, 0.4, 0.3)

            candidate["score"] = (
                quality_score * weights[0] +
                cost_score * weights[1] +
                complexity_score * weights[2]
            )
            candidate["reason"] = self._generate_reason(candidate, complexity)

        # 选择得分最高的
        candidates.sort(key=lambda x: x["score"], reverse=True)
        return candidates[0]

    def _generate_reason(self, candidate: Dict[str, Any], complexity: float) -> str:
        """生成选择原因"""
        reasons = []

        if candidate["quality"] >= 0.90:
            reasons.append("高质量模型")
        elif candidate["quality"] >= 0.80:
            reasons.append("标准质量模型")
        else:
            reasons.append("经济型模型")

        if complexity > 0.6:
            reasons.append("适合复杂任务")
        elif complexity < 0.3:
            reasons.append("适合简单任务")

        if candidate["cost"] < 0.01:
            reasons.append("低成本")

        return "、".join(reasons)

# 使用示例
router = CostAwareRouter()

# 简单任务
decision = router.route(
    query="这个邮件是垃圾邮件吗?",
    task_type="classification",
    min_quality=0.7
)
print(f"选择模型: {decision.model}, 原因: {decision.reason}")

# 复杂任务
decision = router.route(
    query="请分析这个算法的时间复杂度,并提出优化建议",
    task_type="reasoning",
    min_quality=0.85
)
print(f"选择模型: {decision.model}, 原因: {decision.reason}")

策略 4:批量处理

python
from typing import List, Dict, Any
from dataclasses import dataclass
import asyncio

@dataclass
class BatchRequest:
    id: str
    prompt: str
    priority: int = 0

@dataclass
class BatchResult:
    id: str
    response: str
    tokens_used: int
    cost: float

class BatchProcessor:
    """批量处理器"""

    def __init__(
        self,
        model_client,
        batch_size: int = 20,
        max_concurrent: int = 5
    ):
        self.model_client = model_client
        self.batch_size = batch_size
        self.max_concurrent = max_concurrent

    async def process_batch(
        self,
        requests: List[BatchRequest]
    ) -> List[BatchResult]:
        """批量处理请求"""
        # 按优先级排序
        sorted_requests = sorted(requests, key=lambda x: x.priority, reverse=True)

        # 分批
        batches = [
            sorted_requests[i:i + self.batch_size]
            for i in range(0, len(sorted_requests), self.batch_size)
        ]

        # 并行处理批次
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def process_with_semaphore(batch):
            async with semaphore:
                return await self._process_single_batch(batch)

        tasks = [process_with_semaphore(batch) for batch in batches]
        results = await asyncio.gather(*tasks)

        # 合并结果
        all_results = []
        for batch_results in results:
            all_results.extend(batch_results)

        return all_results

    async def _process_single_batch(
        self,
        batch: List[BatchRequest]
    ) -> List[BatchResult]:
        """处理单个批次"""
        results = []

        # 合并提示词(如果模型支持批量)
        combined_prompt = self._combine_prompts(batch)

        # 调用模型
        response = await self.model_client.generate(combined_prompt)

        # 解析响应
        parsed_responses = self._parse_batch_response(response, batch)

        for req, resp in zip(batch, parsed_responses):
            tokens = len(req.prompt.split()) + len(resp.split())
            results.append(BatchResult(
                id=req.id,
                response=resp,
                tokens_used=tokens,
                cost=self._calculate_cost(tokens)
            ))

        return results

    def _combine_prompts(self, batch: List[BatchRequest]) -> str:
        """合并提示词"""
        prompts = []
        for i, req in enumerate(batch):
            prompts.append(f"[请求 {i+1}]\n{req.prompt}")
        return "\n\n---\n\n".join(prompts)

    def _parse_batch_response(
        self,
        response: str,
        batch: List[BatchRequest]
    ) -> List[str]:
        """解析批量响应"""
        # 按分隔符分割
        parts = response.split("---")

        # 如果分割数量不匹配,使用单次响应
        if len(parts) != len(batch):
            return [response] * len(batch)

        return [p.strip() for p in parts]

    def _calculate_cost(self, tokens: int) -> float:
        """计算成本"""
        # 假设 $0.01 / 1K tokens
        return tokens * 0.01 / 1000

# 使用示例
async def batch_example():
    client = ModelClient()
    processor = BatchProcessor(client, batch_size=10)

    requests = [
        BatchRequest(id="1", prompt="翻译:Hello", priority=1),
        BatchRequest(id="2", prompt="翻译:World", priority=2),
        BatchRequest(id="3", prompt="翻译:AI", priority=0),
    ]

    results = await processor.process_batch(requests)

    for result in results:
        print(f"ID: {result.id}, 响应: {result.response}, 成本: ${result.cost:.6f}")

成本监控

python
from typing import Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json

@dataclass
class CostRecord:
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    task_type: str
    user_id: str

class CostMonitor:
    """成本监控器"""

    def __init__(self, alert_threshold: float = 100.0):
        self.alert_threshold = alert_threshold
        self.records: list[CostRecord] = []
        self.daily_costs: Dict[str, float] = {}
        self.model_costs: Dict[str, float] = {}
        self.user_costs: Dict[str, float] = {}

    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cost: float,
        task_type: str,
        user_id: str
    ):
        """记录成本"""
        record = CostRecord(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            task_type=task_type,
            user_id=user_id
        )

        self.records.append(record)

        # 更新统计
        date_key = record.timestamp.strftime("%Y-%m-%d")
        self.daily_costs[date_key] = self.daily_costs.get(date_key, 0) + cost
        self.model_costs[model] = self.model_costs.get(model, 0) + cost
        self.user_costs[user_id] = self.user_costs.get(user_id, 0) + cost

        # 检查告警
        self._check_alert(date_key, user_id, cost)

    def _check_alert(self, date_key: str, user_id: str, cost: float):
        """检查告警"""
        # 每日成本告警
        if self.daily_costs.get(date_key, 0) > self.alert_threshold:
            self._send_alert(
                level="warning",
                message=f"每日成本 ${self.daily_costs[date_key]:.2f} 超过阈值 ${self.alert_threshold}"
            )

        # 单次成本告警
        if cost > 1.0:
            self._send_alert(
                level="warning",
                message=f"单次成本 ${cost:.2f} 较高"
            )

    def _send_alert(self, level: str, message: str):
        """发送告警"""
        print(f"[{level.upper()}] {message}")

    def get_report(self, period: timedelta = timedelta(days=7)) -> Dict[str, Any]:
        """获取报告"""
        cutoff = datetime.now() - period
        recent_records = [r for r in self.records if r.timestamp >= cutoff]

        total_cost = sum(r.cost for r in recent_records)
        total_input_tokens = sum(r.input_tokens for r in recent_records)
        total_output_tokens = sum(r.output_tokens for r in recent_records)

        # 按模型分组
        model_breakdown = {}
        for r in recent_records:
            if r.model not in model_breakdown:
                model_breakdown[r.model] = {
                    "cost": 0,
                    "calls": 0,
                    "input_tokens": 0,
                    "output_tokens": 0
                }
            model_breakdown[r.model]["cost"] += r.cost
            model_breakdown[r.model]["calls"] += 1
            model_breakdown[r.model]["input_tokens"] += r.input_tokens
            model_breakdown[r.model]["output_tokens"] += r.output_tokens

        # 按任务类型分组
        task_breakdown = {}
        for r in recent_records:
            if r.task_type not in task_breakdown:
                task_breakdown[r.task_type] = {"cost": 0, "calls": 0}
            task_breakdown[r.task_type]["cost"] += r.cost
            task_breakdown[r.task_type]["calls"] += 1

        return {
            "period": f"{period.days} days",
            "total_cost": total_cost,
            "total_input_tokens": total_input_tokens,
            "total_output_tokens": total_output_tokens,
            "total_calls": len(recent_records),
            "average_cost_per_call": total_cost / len(recent_records) if recent_records else 0,
            "model_breakdown": model_breakdown,
            "task_breakdown": task_breakdown,
            "top_users": sorted(
                [(u, c) for u, c in self.user_costs.items()],
                key=lambda x: x[1],
                reverse=True
            )[:10]
        }

# 使用示例
monitor = CostMonitor(alert_threshold=100.0)

# 记录成本
monitor.record(
    model="gpt-4-turbo",
    input_tokens=1000,
    output_tokens=500,
    cost=0.025,
    task_type="qa",
    user_id="user123"
)

# 获取报告
report = monitor.get_report(period=timedelta(days=7))
print(json.dumps(report, indent=2))

最佳实践总结

成本优化检查清单

markdown
## 成本优化检查清单

### Token 优化

- [ ] 压缩提示词
- [ ] 移除冗余内容
- [ ] 限制输出长度
- [ ] 使用更短的示例

### 缓存策略

- [ ] 实现语义缓存
- [ ] 设置合理的 TTL
- [ ] 实现缓存预热
- [ ] 监控命中率

### 模型选择

- [ ] 根据任务复杂度选择模型
- [ ] 实现智能路由
- [ ] 使用小模型处理简单任务
- [ ] 批量处理请求

### 监控与告警

- [ ] 记录每次调用的成本
- [ ] 设置成本告警阈值
- [ ] 分析成本趋势
- [ ] 识别高成本用户/任务