可观测性与监控
本文档整理 AI 系统可观测性和监控的最佳实践。
监控体系架构
txt
┌─────────────────────────────────────────────────────┐
│ 监控体系架构 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ 请求日志 │ 响应日志 │ 错误日志 │ 审计日志 │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 模型层 │ │
│ │ Token 消耗 │ 延迟 │ 成功率 │ 质量指标 │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 基础设施层 │ │
│ │ CPU │ 内存 │ 网络 │ 存储 │ 队列 │ │
│ └─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 告警与响应 │ │
│ │ 告警规则 │ 通知渠道 │ 值班轮转 │ 自动响应 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘核心指标
模型指标
python
from dataclasses import dataclass
from typing import Dict, List, Any
from datetime import datetime, timedelta
from collections import defaultdict
import statistics
@dataclass
class ModelMetrics:
"""模型指标"""
timestamp: datetime
model: str
request_count: int
success_count: int
error_count: int
total_input_tokens: int
total_output_tokens: int
total_latency_ms: float
p50_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
total_cost: float
class ModelMetricsCollector:
"""模型指标收集器"""
def __init__(self):
self.requests: List[Dict[str, Any]] = []
self.metrics_by_model: Dict[str, List[ModelMetrics]] = defaultdict(list)
def record_request(
self,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
success: bool,
error_type: str = None,
cost: float = 0.0
):
"""记录请求"""
self.requests.append({
"timestamp": datetime.now(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_ms": latency_ms,
"success": success,
"error_type": error_type,
"cost": cost
})
def calculate_metrics(
self,
model: str = None,
window: timedelta = timedelta(minutes=5)
) -> ModelMetrics:
"""计算指标"""
cutoff = datetime.now() - window
# 过滤请求
filtered = [
r for r in self.requests
if r["timestamp"] >= cutoff
and (model is None or r["model"] == model)
]
if not filtered:
return None
# 计算指标
latencies = [r["latency_ms"] for r in filtered]
success_requests = [r for r in filtered if r["success"]]
metrics = ModelMetrics(
timestamp=datetime.now(),
model=model or "all",
request_count=len(filtered),
success_count=len(success_requests),
error_count=len(filtered) - len(success_requests),
total_input_tokens=sum(r["input_tokens"] for r in filtered),
total_output_tokens=sum(r["output_tokens"] for r in filtered),
total_latency_ms=sum(latencies),
p50_latency_ms=statistics.median(latencies),
p95_latency_ms=self._percentile(latencies, 95),
p99_latency_ms=self._percentile(latencies, 99),
total_cost=sum(r["cost"] for r in filtered)
)
return metrics
def _percentile(self, data: List[float], p: float) -> float:
"""计算百分位数"""
if not data:
return 0.0
sorted_data = sorted(data)
idx = int(len(sorted_data) * p / 100)
return sorted_data[min(idx, len(sorted_data) - 1)]
def get_success_rate(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
"""获取成功率"""
metrics = self.calculate_metrics(model, window)
if metrics and metrics.request_count > 0:
return metrics.success_count / metrics.request_count
return 0.0
def get_error_rate(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
"""获取错误率"""
return 1.0 - self.get_success_rate(model, window)
def get_average_latency(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
"""获取平均延迟"""
metrics = self.calculate_metrics(model, window)
if metrics and metrics.request_count > 0:
return metrics.total_latency_ms / metrics.request_count
return 0.0
def get_cost_per_request(self, model: str = None, window: timedelta = timedelta(minutes=5)) -> float:
"""获取单次请求成本"""
metrics = self.calculate_metrics(model, window)
if metrics and metrics.request_count > 0:
return metrics.total_cost / metrics.request_count
return 0.0
def get_error_breakdown(
self,
model: str = None,
window: timedelta = timedelta(minutes=5)
) -> Dict[str, int]:
"""获取错误分布"""
cutoff = datetime.now() - window
errors = defaultdict(int)
for r in self.requests:
if r["timestamp"] >= cutoff and not r["success"]:
key = r.get("error_type", "unknown")
errors[key] += 1
return dict(errors)
# 使用示例
collector = ModelMetricsCollector()
# 记录请求
collector.record_request(
model="gpt-4",
input_tokens=100,
output_tokens=50,
latency_ms=500,
success=True,
cost=0.0125
)
# 计算指标
metrics = collector.calculate_metrics(model="gpt-4")
print(f"成功率: {collector.get_success_rate():.2%}")
print(f"P95 延迟: {metrics.p95_latency_ms:.2f}ms")
print(f"总成本: ${metrics.total_cost:.4f}")质量指标
python
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class QualityMetric(Enum):
ACCURACY = "accuracy"
PRECISION = "precision"
RECALL = "recall"
F1 = "f1"
HALLUCINATION_RATE = "hallucination_rate"
REFUSAL_RATE = "refusal_rate"
@dataclass
class QualityScore:
metric: QualityMetric
value: float
sample_size: int
timestamp: datetime
details: Dict[str, Any]
class QualityMonitor:
"""质量监控器"""
def __init__(self):
self.predictions: List[Dict[str, Any]] = []
self.ground_truths: List[Dict[str, Any]] = []
self.quality_scores: List[QualityScore] = []
def record_prediction(
self,
prediction_id: str,
input_text: str,
output_text: str,
model: str,
metadata: Dict[str, Any] = None
):
"""记录预测"""
self.predictions.append({
"prediction_id": prediction_id,
"input_text": input_text,
"output_text": output_text,
"model": model,
"timestamp": datetime.now(),
"metadata": metadata or {}
})
def record_ground_truth(
self,
prediction_id: str,
expected_output: str,
is_correct: bool = None,
feedback: str = None
):
"""记录真实标签"""
self.ground_truths.append({
"prediction_id": prediction_id,
"expected_output": expected_output,
"is_correct": is_correct,
"feedback": feedback,
"timestamp": datetime.now()
})
def calculate_accuracy(self) -> QualityScore:
"""计算准确率"""
matched = []
for gt in self.ground_truths:
pred = next(
(p for p in self.predictions if p["prediction_id"] == gt["prediction_id"]),
None
)
if pred and gt["is_correct"] is not None:
matched.append({
"prediction_id": gt["prediction_id"],
"is_correct": gt["is_correct"]
})
if not matched:
return None
correct = sum(1 for m in matched if m["is_correct"])
accuracy = correct / len(matched)
return QualityScore(
metric=QualityMetric.ACCURACY,
value=accuracy,
sample_size=len(matched),
timestamp=datetime.now(),
details={"correct": correct, "total": len(matched)}
)
def calculate_hallucination_rate(
self,
hallucination_checks: List[Dict[str, Any]]
) -> QualityScore:
"""计算幻觉率"""
if not hallucination_checks:
return None
hallucinations = sum(1 for c in hallucination_checks if c.get("is_hallucination"))
rate = hallucinations / len(hallucination_checks)
return QualityScore(
metric=QualityMetric.HALLUCINATION_RATE,
value=rate,
sample_size=len(hallucination_checks),
timestamp=datetime.now(),
details={"hallucinations": hallucinations, "total": len(hallucination_checks)}
)
def calculate_refusal_rate(
self,
refusals: List[Dict[str, Any]]
) -> QualityScore:
"""计算拒答率"""
if not refusals:
return None
refused = sum(1 for r in refusals if r.get("refused"))
rate = refused / len(refusals)
return QualityScore(
metric=QualityMetric.REFUSAL_RATE,
value=rate,
sample_size=len(refusals),
timestamp=datetime.now(),
details={"refused": refused, "total": len(refusals)}
)
def get_quality_report(self) -> Dict[str, Any]:
"""获取质量报告"""
accuracy = self.calculate_accuracy()
return {
"total_predictions": len(self.predictions),
"total_ground_truths": len(self.ground_truths),
"accuracy": accuracy.value if accuracy else None,
"accuracy_sample_size": accuracy.sample_size if accuracy else 0,
"quality_trend": [
{"timestamp": s.timestamp.isoformat(), "metric": s.metric.value, "value": s.value}
for s in self.quality_scores[-10:]
]
}
# 使用示例
quality_monitor = QualityMonitor()
# 记录预测
quality_monitor.record_prediction(
prediction_id="pred_001",
input_text="翻译:Hello",
output_text="你好",
model="gpt-4"
)
# 记录真实标签
quality_monitor.record_ground_truth(
prediction_id="pred_001",
expected_output="你好",
is_correct=True
)
# 计算准确率
accuracy = quality_monitor.calculate_accuracy()
print(f"准确率: {accuracy.value:.2%}")日志管理
结构化日志
python
import json
from datetime import datetime
from typing import Dict, Any, Optional
from enum import Enum
import logging
class LogLevel(Enum):
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class StructuredLogger:
"""结构化日志器"""
def __init__(self, service_name: str, environment: str = "production"):
self.service_name = service_name
self.environment = environment
self.logger = logging.getLogger(service_name)
self.logger.setLevel(logging.DEBUG)
# 添加 JSON 格式化器
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
def log(
self,
level: LogLevel,
message: str,
**kwargs
):
"""记录日志"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"service": self.service_name,
"environment": self.environment,
"level": level.value,
"message": message,
**kwargs
}
log_method = getattr(self.logger, level.value)
log_method(json.dumps(log_entry, ensure_ascii=False))
def log_request(
self,
request_id: str,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
success: bool,
error: str = None,
**kwargs
):
"""记录请求日志"""
self.log(
LogLevel.INFO if success else LogLevel.ERROR,
"model_request",
request_id=request_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
success=success,
error=error,
**kwargs
)
def log_error(
self,
error_type: str,
error_message: str,
stack_trace: str = None,
**kwargs
):
"""记录错误日志"""
self.log(
LogLevel.ERROR,
"error",
error_type=error_type,
error_message=error_message,
stack_trace=stack_trace,
**kwargs
)
def log_audit(
self,
action: str,
user_id: str,
resource: str,
result: str,
**kwargs
):
"""记录审计日志"""
self.log(
LogLevel.INFO,
"audit",
action=action,
user_id=user_id,
resource=resource,
result=result,
**kwargs
)
class JsonFormatter(logging.Formatter):
"""JSON 格式化器"""
def format(self, record):
try:
return json.dumps(json.loads(record.getMessage()), ensure_ascii=False)
except:
return record.getMessage()
# 使用示例
logger = StructuredLogger("ai-service")
# 记录请求
logger.log_request(
request_id="req_001",
model="gpt-4",
input_tokens=100,
output_tokens=50,
latency_ms=500,
success=True
)
# 记录错误
logger.log_error(
error_type="RateLimitError",
error_message="Rate limit exceeded",
model="gpt-4"
)
# 记录审计
logger.log_audit(
action="model_call",
user_id="user_123",
resource="gpt-4",
result="success"
)告警系统
告警规则
python
from dataclasses import dataclass
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
from datetime import datetime, timedelta
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
name: str
description: str
severity: AlertSeverity
condition: Callable[[Any], bool]
threshold: float
window: timedelta
cooldown: timedelta
last_triggered: datetime = None
@dataclass
class Alert:
rule_name: str
severity: AlertSeverity
message: str
value: float
threshold: float
timestamp: datetime
details: Dict[str, Any]
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules: Dict[str, AlertRule] = {}
self.alerts: List[Alert] = []
self.handlers: List[Callable] = []
def add_rule(
self,
name: str,
description: str,
severity: AlertSeverity,
condition: Callable,
threshold: float,
window: timedelta,
cooldown: timedelta = timedelta(minutes=5)
):
"""添加告警规则"""
rule = AlertRule(
name=name,
description=description,
severity=severity,
condition=condition,
threshold=threshold,
window=window,
cooldown=cooldown
)
self.rules[name] = rule
def check(self, metrics: Dict[str, Any]):
"""检查告警"""
for name, rule in self.rules.items():
# 检查冷却期
if rule.last_triggered:
if datetime.now() - rule.last_triggered < rule.cooldown:
continue
# 检查条件
value = metrics.get(name)
if value is not None and rule.condition(value):
# 触发告警
alert = Alert(
rule_name=name,
severity=rule.severity,
message=f"{name}: {value} exceeds threshold {rule.threshold}",
value=value,
threshold=rule.threshold,
timestamp=datetime.now(),
details={"window": str(rule.window)}
)
self.alerts.append(alert)
rule.last_triggered = datetime.now()
# 调用处理程序
for handler in self.handlers:
handler(alert)
def add_handler(self, handler: Callable):
"""添加告警处理程序"""
self.handlers.append(handler)
def get_active_alerts(
self,
severity: AlertSeverity = None,
since: datetime = None
) -> List[Alert]:
"""获取活跃告警"""
alerts = self.alerts
if severity:
alerts = [a for a in alerts if a.severity == severity]
if since:
alerts = [a for a in alerts if a.timestamp >= since]
return alerts
# 预定义告警规则
def create_default_rules():
"""创建默认告警规则"""
rules = [
{
"name": "error_rate_high",
"description": "错误率超过阈值",
"severity": AlertSeverity.WARNING,
"condition": lambda x: x > 0.05,
"threshold": 0.05,
"window": timedelta(minutes=5)
},
{
"name": "error_rate_critical",
"description": "错误率严重超标",
"severity": AlertSeverity.CRITICAL,
"condition": lambda x: x > 0.1,
"threshold": 0.1,
"window": timedelta(minutes=5)
},
{
"name": "latency_p95_high",
"description": "P95 延迟过高",
"severity": AlertSeverity.WARNING,
"condition": lambda x: x > 3000,
"threshold": 3000,
"window": timedelta(minutes=5)
},
{
"name": "cost_daily_high",
"description": "日成本超过预算",
"severity": AlertSeverity.WARNING,
"condition": lambda x: x > 100,
"threshold": 100,
"window": timedelta(hours=24)
},
]
return rules
# 使用示例
alert_manager = AlertManager()
# 添加默认规则
for rule in create_default_rules():
alert_manager.add_rule(**rule)
# 添加处理程序
def handle_alert(alert: Alert):
print(f"[{alert.severity.value.upper()}] {alert.message}")
alert_manager.add_handler(handle_alert)
# 检查告警
metrics = {
"error_rate_high": 0.08,
"latency_p95_high": 2500,
"cost_daily_high": 80
}
alert_manager.check(metrics)监控仪表板
Prometheus 指标
python
from prometheus_client import Counter, Histogram, Gauge, Info
import time
# 定义指标
REQUEST_COUNT = Counter(
'ai_request_total',
'Total number of AI requests',
['model', 'status']
)
REQUEST_LATENCY = Histogram(
'ai_request_latency_seconds',
'Request latency in seconds',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
TOKEN_COUNT = Counter(
'ai_token_total',
'Total number of tokens',
['model', 'type'] # type: input/output
)
COST_TOTAL = Counter(
'ai_cost_total_dollars',
'Total cost in dollars',
['model']
)
ACTIVE_REQUESTS = Gauge(
'ai_active_requests',
'Number of active requests',
['model']
)
MODEL_INFO = Info(
'ai_model_info',
'Model information'
)
# 使用示例
class PrometheusMonitor:
"""Prometheus 监控器"""
def __init__(self):
self.active_requests = {}
def start_request(self, model: str):
"""开始请求"""
if model not in self.active_requests:
self.active_requests[model] = 0
self.active_requests[model] += 1
ACTIVE_REQUESTS.labels(model=model).inc()
def end_request(
self,
model: str,
status: str,
latency_seconds: float,
input_tokens: int,
output_tokens: int,
cost: float
):
"""结束请求"""
# 更新计数器
REQUEST_COUNT.labels(model=model, status=status).inc()
# 更新延迟
REQUEST_LATENCY.labels(model=model).observe(latency_seconds)
# 更新 Token
TOKEN_COUNT.labels(model=model, type='input').inc(input_tokens)
TOKEN_COUNT.labels(model=model, type='output').inc(output_tokens)
# 更新成本
COST_TOTAL.labels(model=model).inc(cost)
# 更新活跃请求数
self.active_requests[model] -= 1
ACTIVE_REQUESTS.labels(model=model).dec()
def update_model_info(self, model: str, version: str, provider: str):
"""更新模型信息"""
MODEL_INFO.info({
'model': model,
'version': version,
'provider': provider
})
# 装饰器方式
def monitor_request(model: str):
"""监控装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
monitor = PrometheusMonitor()
monitor.start_request(model)
start_time = time.time()
try:
result = func(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'error'
raise
finally:
latency = time.time() - start_time
monitor.end_request(
model=model,
status=status,
latency_seconds=latency,
input_tokens=100, # 实际应该从结果中获取
output_tokens=50,
cost=0.01
)
return wrapper
return decorator
@monitor_request('gpt-4')
def call_model(prompt: str):
# 调用模型
return "response"最佳实践总结
监控检查清单
markdown
## 监控检查清单
### 必须指标
- [ ] 请求成功率
- [ ] 请求延迟 (P50/P95/P99)
- [ ] Token 消耗
- [ ] 成本监控
- [ ] 错误率
### 推荐指标
- [ ] 模型质量评分
- [ ] 幻觉率
- [ ] 拒答率
- [ ] 缓存命中率
- [ ] 队列长度
### 日志管理
- [ ] 结构化日志
- [ ] 请求 ID 追踪
- [ ] 错误堆栈记录
- [ ] 审计日志
- [ ] 日志保留策略
### 告警配置
- [ ] 错误率告警
- [ ] 延迟告警
- [ ] 成本告警
- [ ] 质量告警
- [ ] 告警升级机制
### 仪表板
- [ ] 实时监控面板
- [ ] 历史趋势分析
- [ ] 成本分析面板
- [ ] 质量分析面板
- [ ] 异常检测面板