Skip to content

可观测性与监控

本文档整理 AI 系统可观测性和监控的最佳实践。

监控体系架构

txt
┌─────────────────────────────────────────────────────┐
│                   监控体系架构                       │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌─────────────────────────────────────────────┐   │
│  │                 应用层                       │   │
│  │  请求日志 │ 响应日志 │ 错误日志 │ 审计日志  │   │
│  └─────────────────────────────────────────────┘   │
│                        ↓                            │
│  ┌─────────────────────────────────────────────┐   │
│  │                 模型层                       │   │
│  │  Token 消耗 │ 延迟 │ 成功率 │ 质量指标     │   │
│  └─────────────────────────────────────────────┘   │
│                        ↓                            │
│  ┌─────────────────────────────────────────────┐   │
│  │                 基础设施层                   │   │
│  │  CPU │ 内存 │ 网络 │ 存储 │ 队列           │   │
│  └─────────────────────────────────────────────┘   │
│                        ↓                            │
│  ┌─────────────────────────────────────────────┐   │
│  │                 告警与响应                   │   │
│  │  告警规则 │ 通知渠道 │ 值班轮转 │ 自动响应 │   │
│  └─────────────────────────────────────────────┘   │
│                                                     │
└─────────────────────────────────────────────────────┘

核心指标

模型指标

python
from dataclasses import dataclass
from typing import Dict, List, Any
from datetime import datetime, timedelta
from collections import defaultdict
import statistics

@dataclass
class ModelMetrics:
    """模型指标"""
    timestamp: datetime
    model: str
    request_count: int
    success_count: int
    error_count: int
    total_input_tokens: int
    total_output_tokens: int
    total_latency_ms: float
    p50_latency_ms: float
    p95_latency_ms: float
    p99_latency_ms: float
    total_cost: float

class ModelMetricsCollector:
    """模型指标收集器"""

    def __init__(self):
        self.requests: List[Dict[str, Any]] = []
        self.metrics_by_model: Dict[str, List[ModelMetrics]] = defaultdict(list)

    def record_request(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        success: bool,
        error_type: str = None,
        cost: float = 0.0
    ):
        """记录请求"""
        self.requests.append({
            "timestamp": datetime.now(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "latency_ms": latency_ms,
            "success": success,
            "error_type": error_type,
            "cost": cost
        })

    def calculate_metrics(
        self,
        model: str = None,
        window: timedelta = timedelta(minutes=5)
    ) -> ModelMetrics:
        """计算指标"""
        cutoff = datetime.now() - window

        # 过滤请求
        filtered = [
            r for r in self.requests
            if r["timestamp"] >= cutoff
            and (model is None or r["model"] == model)
        ]

        if not filtered:
            return None

        # 计算指标
        latencies = [r["latency_ms"] for r in filtered]
        success_requests = [r for r in filtered if r["success"]]

        metrics = ModelMetrics(
            timestamp=datetime.now(),
            model=model or "all",
            request_count=len(filtered),
            success_count=len(success_requests),
            error_count=len(filtered) - len(success_requests),
            total_input_tokens=sum(r["input_tokens"] for r in filtered),
            total_output_tokens=sum(r["output_tokens"] for r in filtered),
            total_latency_ms=sum(latencies),
            p50_latency_ms=statistics.median(latencies),
            p95_latency_ms=self._percentile(latencies, 95),
            p99_latency_ms=self._percentile(latencies, 99),
            total_cost=sum(r["cost"] for r in filtered)
        )

        return metrics

    def _percentile(self, data: List[float], p: float) -> float:
        """计算百分位数"""
        if not data:
            return 0.0
        sorted_data = sorted(data)
        idx = int(len(sorted_data) * p / 100)
        return sorted_data[min(idx, len(sorted_data) - 1)]

    def get_success_rate(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
        """获取成功率"""
        metrics = self.calculate_metrics(model, window)
        if metrics and metrics.request_count > 0:
            return metrics.success_count / metrics.request_count
        return 0.0

    def get_error_rate(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
        """获取错误率"""
        return 1.0 - self.get_success_rate(model, window)

    def get_average_latency(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
        """获取平均延迟"""
        metrics = self.calculate_metrics(model, window)
        if metrics and metrics.request_count > 0:
            return metrics.total_latency_ms / metrics.request_count
        return 0.0

    def get_cost_per_request(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
        """获取单次请求成本"""
        metrics = self.calculate_metrics(model, window)
        if metrics and metrics.request_count > 0:
            return metrics.total_cost / metrics.request_count
        return 0.0

    def get_error_breakdown(
        self,
        model: str = None,
        window: timedelta = timedelta(minutes=5)
    ) -> Dict[str, int]:
        """获取错误分布"""
        cutoff = datetime.now() - window

        errors = defaultdict(int)
        for r in self.requests:
            if r["timestamp"] >= cutoff and not r["success"]:
                key = r.get("error_type", "unknown")
                errors[key] += 1

        return dict(errors)

# 使用示例
collector = ModelMetricsCollector()

# 记录请求
collector.record_request(
    model="gpt-4",
    input_tokens=100,
    output_tokens=50,
    latency_ms=500,
    success=True,
    cost=0.0125
)

# 计算指标
metrics = collector.calculate_metrics(model="gpt-4")
print(f"成功率: {collector.get_success_rate():.2%}")
print(f"P95 延迟: {metrics.p95_latency_ms:.2f}ms")
print(f"总成本: ${metrics.total_cost:.4f}")

质量指标

python
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class QualityMetric(Enum):
    ACCURACY = "accuracy"
    PRECISION = "precision"
    RECALL = "recall"
    F1 = "f1"
    HALLUCINATION_RATE = "hallucination_rate"
    REFUSAL_RATE = "refusal_rate"

@dataclass
class QualityScore:
    metric: QualityMetric
    value: float
    sample_size: int
    timestamp: datetime
    details: Dict[str, Any]

class QualityMonitor:
    """质量监控器"""

    def __init__(self):
        self.predictions: List[Dict[str, Any]] = []
        self.ground_truths: List[Dict[str, Any]] = []
        self.quality_scores: List[QualityScore] = []

    def record_prediction(
        self,
        prediction_id: str,
        input_text: str,
        output_text: str,
        model: str,
        metadata: Dict[str, Any] = None
    ):
        """记录预测"""
        self.predictions.append({
            "prediction_id": prediction_id,
            "input_text": input_text,
            "output_text": output_text,
            "model": model,
            "timestamp": datetime.now(),
            "metadata": metadata or {}
        })

    def record_ground_truth(
        self,
        prediction_id: str,
        expected_output: str,
        is_correct: bool = None,
        feedback: str = None
    ):
        """记录真实标签"""
        self.ground_truths.append({
            "prediction_id": prediction_id,
            "expected_output": expected_output,
            "is_correct": is_correct,
            "feedback": feedback,
            "timestamp": datetime.now()
        })

    def calculate_accuracy(self) -> QualityScore:
        """计算准确率"""
        matched = []

        for gt in self.ground_truths:
            pred = next(
                (p for p in self.predictions if p["prediction_id"] == gt["prediction_id"]),
                None
            )
            if pred and gt["is_correct"] is not None:
                matched.append({
                    "prediction_id": gt["prediction_id"],
                    "is_correct": gt["is_correct"]
                })

        if not matched:
            return None

        correct = sum(1 for m in matched if m["is_correct"])
        accuracy = correct / len(matched)

        return QualityScore(
            metric=QualityMetric.ACCURACY,
            value=accuracy,
            sample_size=len(matched),
            timestamp=datetime.now(),
            details={"correct": correct, "total": len(matched)}
        )

    def calculate_hallucination_rate(
        self,
        hallucination_checks: List[Dict[str, Any]]
    ) -> QualityScore:
        """计算幻觉率"""
        if not hallucination_checks:
            return None

        hallucinations = sum(1 for c in hallucination_checks if c.get("is_hallucination"))
        rate = hallucinations / len(hallucination_checks)

        return QualityScore(
            metric=QualityMetric.HALLUCINATION_RATE,
            value=rate,
            sample_size=len(hallucination_checks),
            timestamp=datetime.now(),
            details={"hallucinations": hallucinations, "total": len(hallucination_checks)}
        )

    def calculate_refusal_rate(
        self,
        refusals: List[Dict[str, Any]]
    ) -> QualityScore:
        """计算拒答率"""
        if not refusals:
            return None

        refused = sum(1 for r in refusals if r.get("refused"))
        rate = refused / len(refusals)

        return QualityScore(
            metric=QualityMetric.REFUSAL_RATE,
            value=rate,
            sample_size=len(refusals),
            timestamp=datetime.now(),
            details={"refused": refused, "total": len(refusals)}
        )

    def get_quality_report(self) -> Dict[str, Any]:
        """获取质量报告"""
        accuracy = self.calculate_accuracy()

        return {
            "total_predictions": len(self.predictions),
            "total_ground_truths": len(self.ground_truths),
            "accuracy": accuracy.value if accuracy else None,
            "accuracy_sample_size": accuracy.sample_size if accuracy else 0,
            "quality_trend": [
                {"timestamp": s.timestamp.isoformat(), "metric": s.metric.value, "value": s.value}
                for s in self.quality_scores[-10:]
            ]
        }

# 使用示例
quality_monitor = QualityMonitor()

# 记录预测
quality_monitor.record_prediction(
    prediction_id="pred_001",
    input_text="翻译:Hello",
    output_text="你好",
    model="gpt-4"
)

# 记录真实标签
quality_monitor.record_ground_truth(
    prediction_id="pred_001",
    expected_output="你好",
    is_correct=True
)

# 计算准确率
accuracy = quality_monitor.calculate_accuracy()
print(f"准确率: {accuracy.value:.2%}")

日志管理

结构化日志

python
import json
from datetime import datetime
from typing import Dict, Any, Optional
from enum import Enum
import logging

class LogLevel(Enum):
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class StructuredLogger:
    """结构化日志器"""

    def __init__(self, service_name: str, environment: str = "production"):
        self.service_name = service_name
        self.environment = environment
        self.logger = logging.getLogger(service_name)
        self.logger.setLevel(logging.DEBUG)

        # 添加 JSON 格式化器
        handler = logging.StreamHandler()
        handler.setFormatter(JsonFormatter())
        self.logger.addHandler(handler)

    def log(
        self,
        level: LogLevel,
        message: str,
        **kwargs
    ):
        """记录日志"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "service": self.service_name,
            "environment": self.environment,
            "level": level.value,
            "message": message,
            **kwargs
        }

        log_method = getattr(self.logger, level.value)
        log_method(json.dumps(log_entry, ensure_ascii=False))

    def log_request(
        self,
        request_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        success: bool,
        error: str = None,
        **kwargs
    ):
        """记录请求日志"""
        self.log(
            LogLevel.INFO if success else LogLevel.ERROR,
            "model_request",
            request_id=request_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=latency_ms,
            success=success,
            error=error,
            **kwargs
        )

    def log_error(
        self,
        error_type: str,
        error_message: str,
        stack_trace: str = None,
        **kwargs
    ):
        """记录错误日志"""
        self.log(
            LogLevel.ERROR,
            "error",
            error_type=error_type,
            error_message=error_message,
            stack_trace=stack_trace,
            **kwargs
        )

    def log_audit(
        self,
        action: str,
        user_id: str,
        resource: str,
        result: str,
        **kwargs
    ):
        """记录审计日志"""
        self.log(
            LogLevel.INFO,
            "audit",
            action=action,
            user_id=user_id,
            resource=resource,
            result=result,
            **kwargs
        )

class JsonFormatter(logging.Formatter):
    """JSON 格式化器"""

    def format(self, record):
        try:
            return json.dumps(json.loads(record.getMessage()), ensure_ascii=False)
        except:
            return record.getMessage()

# 使用示例
logger = StructuredLogger("ai-service")

# 记录请求
logger.log_request(
    request_id="req_001",
    model="gpt-4",
    input_tokens=100,
    output_tokens=50,
    latency_ms=500,
    success=True
)

# 记录错误
logger.log_error(
    error_type="RateLimitError",
    error_message="Rate limit exceeded",
    model="gpt-4"
)

# 记录审计
logger.log_audit(
    action="model_call",
    user_id="user_123",
    resource="gpt-4",
    result="success"
)

告警系统

告警规则

python
from dataclasses import dataclass
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
from datetime import datetime, timedelta

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    name: str
    description: str
    severity: AlertSeverity
    condition: Callable[[Any], bool]
    threshold: float
    window: timedelta
    cooldown: timedelta
    last_triggered: datetime = None

@dataclass
class Alert:
    rule_name: str
    severity: AlertSeverity
    message: str
    value: float
    threshold: float
    timestamp: datetime
    details: Dict[str, Any]

class AlertManager:
    """告警管理器"""

    def __init__(self):
        self.rules: Dict[str, AlertRule] = {}
        self.alerts: List[Alert] = []
        self.handlers: List[Callable] = []

    def add_rule(
        self,
        name: str,
        description: str,
        severity: AlertSeverity,
        condition: Callable,
        threshold: float,
        window: timedelta,
        cooldown: timedelta = timedelta(minutes=5)
    ):
        """添加告警规则"""
        rule = AlertRule(
            name=name,
            description=description,
            severity=severity,
            condition=condition,
            threshold=threshold,
            window=window,
            cooldown=cooldown
        )
        self.rules[name] = rule

    def check(self, metrics: Dict[str, Any]):
        """检查告警"""
        for name, rule in self.rules.items():
            # 检查冷却期
            if rule.last_triggered:
                if datetime.now() - rule.last_triggered < rule.cooldown:
                    continue

            # 检查条件
            value = metrics.get(name)
            if value is not None and rule.condition(value):
                # 触发告警
                alert = Alert(
                    rule_name=name,
                    severity=rule.severity,
                    message=f"{name}: {value} exceeds threshold {rule.threshold}",
                    value=value,
                    threshold=rule.threshold,
                    timestamp=datetime.now(),
                    details={"window": str(rule.window)}
                )

                self.alerts.append(alert)
                rule.last_triggered = datetime.now()

                # 调用处理程序
                for handler in self.handlers:
                    handler(alert)

    def add_handler(self, handler: Callable):
        """添加告警处理程序"""
        self.handlers.append(handler)

    def get_active_alerts(
        self,
        severity: AlertSeverity = None,
        since: datetime = None
    ) -> List[Alert]:
        """获取活跃告警"""
        alerts = self.alerts

        if severity:
            alerts = [a for a in alerts if a.severity == severity]

        if since:
            alerts = [a for a in alerts if a.timestamp >= since]

        return alerts

# 预定义告警规则
def create_default_rules():
    """创建默认告警规则"""
    rules = [
        {
            "name": "error_rate_high",
            "description": "错误率超过阈值",
            "severity": AlertSeverity.WARNING,
            "condition": lambda x: x > 0.05,
            "threshold": 0.05,
            "window": timedelta(minutes=5)
        },
        {
            "name": "error_rate_critical",
            "description": "错误率严重超标",
            "severity": AlertSeverity.CRITICAL,
            "condition": lambda x: x > 0.1,
            "threshold": 0.1,
            "window": timedelta(minutes=5)
        },
        {
            "name": "latency_p95_high",
            "description": "P95 延迟过高",
            "severity": AlertSeverity.WARNING,
            "condition": lambda x: x > 3000,
            "threshold": 3000,
            "window": timedelta(minutes=5)
        },
        {
            "name": "cost_daily_high",
            "description": "日成本超过预算",
            "severity": AlertSeverity.WARNING,
            "condition": lambda x: x > 100,
            "threshold": 100,
            "window": timedelta(hours=24)
        },
    ]
    return rules

# 使用示例
alert_manager = AlertManager()

# 添加默认规则
for rule in create_default_rules():
    alert_manager.add_rule(**rule)

# 添加处理程序
def handle_alert(alert: Alert):
    print(f"[{alert.severity.value.upper()}] {alert.message}")

alert_manager.add_handler(handle_alert)

# 检查告警
metrics = {
    "error_rate_high": 0.08,
    "latency_p95_high": 2500,
    "cost_daily_high": 80
}
alert_manager.check(metrics)

监控仪表板

Prometheus 指标

python
from prometheus_client import Counter, Histogram, Gauge, Info
import time

# 定义指标
REQUEST_COUNT = Counter(
    'ai_request_total',
    'Total number of AI requests',
    ['model', 'status']
)

REQUEST_LATENCY = Histogram(
    'ai_request_latency_seconds',
    'Request latency in seconds',
    ['model'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)

TOKEN_COUNT = Counter(
    'ai_token_total',
    'Total number of tokens',
    ['model', 'type']  # type: input/output
)

COST_TOTAL = Counter(
    'ai_cost_total_dollars',
    'Total cost in dollars',
    ['model']
)

ACTIVE_REQUESTS = Gauge(
    'ai_active_requests',
    'Number of active requests',
    ['model']
)

MODEL_INFO = Info(
    'ai_model_info',
    'Model information'
)

# 使用示例
class PrometheusMonitor:
    """Prometheus 监控器"""

    def __init__(self):
        self.active_requests = {}

    def start_request(self, model: str):
        """开始请求"""
        if model not in self.active_requests:
            self.active_requests[model] = 0
        self.active_requests[model] += 1
        ACTIVE_REQUESTS.labels(model=model).inc()

    def end_request(
        self,
        model: str,
        status: str,
        latency_seconds: float,
        input_tokens: int,
        output_tokens: int,
        cost: float
    ):
        """结束请求"""
        # 更新计数器
        REQUEST_COUNT.labels(model=model, status=status).inc()

        # 更新延迟
        REQUEST_LATENCY.labels(model=model).observe(latency_seconds)

        # 更新 Token
        TOKEN_COUNT.labels(model=model, type='input').inc(input_tokens)
        TOKEN_COUNT.labels(model=model, type='output').inc(output_tokens)

        # 更新成本
        COST_TOTAL.labels(model=model).inc(cost)

        # 更新活跃请求数
        self.active_requests[model] -= 1
        ACTIVE_REQUESTS.labels(model=model).dec()

    def update_model_info(self, model: str, version: str, provider: str):
        """更新模型信息"""
        MODEL_INFO.info({
            'model': model,
            'version': version,
            'provider': provider
        })

# 装饰器方式
def monitor_request(model: str):
    """监控装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            monitor = PrometheusMonitor()
            monitor.start_request(model)

            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                status = 'success'
                return result
            except Exception as e:
                status = 'error'
                raise
            finally:
                latency = time.time() - start_time
                monitor.end_request(
                    model=model,
                    status=status,
                    latency_seconds=latency,
                    input_tokens=100,  # 实际应该从结果中获取
                    output_tokens=50,
                    cost=0.01
                )
        return wrapper
    return decorator

@monitor_request('gpt-4')
def call_model(prompt: str):
    # 调用模型
    return "response"

最佳实践总结

监控检查清单

markdown
## 监控检查清单

### 必须指标

- [ ] 请求成功率
- [ ] 请求延迟 (P50/P95/P99)
- [ ] Token 消耗
- [ ] 成本监控
- [ ] 错误率

### 推荐指标

- [ ] 模型质量评分
- [ ] 幻觉率
- [ ] 拒答率
- [ ] 缓存命中率
- [ ] 队列长度

### 日志管理

- [ ] 结构化日志
- [ ] 请求 ID 追踪
- [ ] 错误堆栈记录
- [ ] 审计日志
- [ ] 日志保留策略

### 告警配置

- [ ] 错误率告警
- [ ] 延迟告警
- [ ] 成本告警
- [ ] 质量告警
- [ ] 告警升级机制

### 仪表板

- [ ] 实时监控面板
- [ ] 历史趋势分析
- [ ] 成本分析面板
- [ ] 质量分析面板
- [ ] 异常检测面板