ML 学习站
跳到正文

线上监控与数据漂移

性能监控、数据漂移检测、PSI / KS 检验, 报警链路。

40 分钟3 / 32,095
加载中...

本章探讨了线上模型监控与数据漂移问题,强调模型会随时间老化,需要持续监控以保持准确性。核心概念包括性能监控、数据漂移检测和监控指标。读者将学会如何评估线上模型的准确性,检测数据分布变化,并使用合适的工具和方法进行监控。性能监控关注延迟、吞吐量和错误率,而数据漂移检测则涉及特征和输出分布的变化。常用方法包括KS检验、PSI指标和卡方检验。Evidently AI等工具可以简化漂移报告生成。掌握这些知识后,读者能够构建完整的监控闭环,设置合理的告警阈值,并实施有效的重训流程,确保模型在动态环境中保持高性能。

线上监控与数据漂移

模型上线第一天是准的, 3 个月后可能就废了。 因为现实世界在变, 而你的模型还在用 3 个月前的数据规律。

这一章讲两件事:

  1. 性能监控: 线上模型真的准吗?
  2. 数据漂移检测: 进来的数据分布变了没?

模型为什么会"老化"

  • 用户偏好变化 (iPhone 卖得少了)
  • 经济环境变化 (疫情期间销量暴增)
  • 竞争对手上新功能
  • 数据 pipeline bug 导致特征分布改变
  • 政策/法规变化

任何一个原因都让历史训练数据"过期"

监控 3 件套:Latency / Throughput / Error

# 加在 FastAPI 中
from prometheus_client import Counter, Histogram, Gauge

PREDICT_COUNT = Counter("model_predict_total", "Predictions served", ["model_version", "label"])
PREDICT_LATENCY = Histogram("model_predict_latency_seconds", "Prediction latency", ["model_version"])
INFLIGHT = Gauge("model_inflight_requests", "In-flight requests", ["model_version"])

@app.post("/predict")
def predict(inp: Input):
    INFLIGHT.labels(model_version="v1").inc()
    start = time.time()
    try:
        pred = model.predict([inp.features])[0]
        PREDICT_COUNT.labels(model_version="v1", label=str(pred)).inc()
        return {"prediction": int(pred)}
    except Exception as e:
        PREDICT_COUNT.labels(model_version="v1", label="error").inc()
        raise
    finally:
        PREDICT_LATENCY.labels(model_version="v1").observe(time.time() - start)
        INFLIGHT.labels(model_version="v1").dec()

Grafana 看板监控 3 个图:

  • 延迟 P99 < 100ms (SLA)
  • QPS (每秒请求数) 趋势
  • 错误率 < 0.1%

性能监控的难题: 延迟反馈

真正难的不是"服务挂没挂", 而是"模型准不准"。因为:

  • 真实 label 通常要几小时/几天才知道 (用户有没有点击, 贷款有没有违约)
  • 没有 label = 没法算 accuracy

3 个解决方案:

  1. 代理指标 (Proxy Metric): 用"模型输出分布"代替"准确率"。比如"预测为正类的比例突然从 30% 涨到 50%", 警告
  2. 业务指标: 转化率、点击率、AUC 都能算, 拼多多、抖音都在用
  3. 人工抽样: 每天抽 1000 条人工标, 计算真实准确率

数据漂移:特征分布变了

训练时用户平均年龄 30, 现在用户平均年龄 50 — 你的模型没学过中年人, 准确率必然下降。这就是数据漂移

3 类漂移:

  • Covariate shift: 输入分布 P(X) 变了
  • Label shift: 输出分布 P(Y) 变了
  • Concept drift: 关系 P(Y|X) 变了 (最难发现)

检测 1:KS 检验 (适合连续特征)

Kolmogorov-Smirnov 检验比较"训练分布"和"线上分布":

from scipy.stats import ks_2samp

def detect_drift_ks(train_data, live_data, threshold=0.05):
    """返回每个特征的 p 值, p<0.05 说明漂移"""
    results = {}
    for col in train_data.columns:
        stat, p_value = ks_2samp(train_data[col].dropna(), live_data[col].dropna())
        results[col] = {"ks_stat": stat, "p_value": p_value, "drift": p_value < threshold}
    return pd.DataFrame(results).T
# 实战
train = pd.read_parquet("training_features.parquet")
live = get_last_week_features()  # 线上最近 7 天

drift_report = detect_drift_ks(train, live)
print(drift_report[drift_report["drift"]])
#                  ks_stat   p_value  drift
# user_age           0.18     0.001   True    <- 漂移!
# user_income        0.05     0.42    False

KS 检验对连续特征最敏感, 但对类别特征不好用 (用 PSI)。

检测 2:PSI (Population Stability Index)

PSI 是风控模型最常用的漂移指标:

PSI = Σ (实际占比 - 期望占比) × ln(实际占比 / 期望占比)
PSI解读
< 0.1无漂移
0.1 - 0.25轻微漂移, 关注
> 0.25严重漂移, 需重训
import numpy as np

def calculate_psi(expected, actual, bins=10):
    """PSI: 比较 expected (训练) 和 actual (线上) 分布"""
    breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
    
    expected_counts = np.histogram(expected, breakpoints)[0]
    actual_counts = np.histogram(actual, breakpoints)[0]
    
    # 避免除零
    expected_pct = (expected_counts + 1) / (expected_counts.sum() + len(expected_counts))
    actual_pct = (actual_counts + 1) / (actual_counts.sum() + len(actual_counts))
    
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
    return psi

psi = calculate_psi(train["user_age"], live["user_age"])
print(f"PSI: {psi:.3f}")
# 0.35 > 0.25 → 严重漂移, 需重训

检测 3:卡方检验 (类别特征)

类别特征用卡方检验:

from scipy.stats import chi2_contingency

def detect_drift_chi2(train_cats, live_cats, threshold=0.05):
    results = {}
    for col in train_cats.columns:
        # 构造列联表
        train_dist = train_cats[col].value_counts(normalize=True)
        live_dist = live_cats[col].value_counts(normalize=True)
        all_categories = set(train_dist.index) | set(live_dist.index)
        # 补齐
        observed = pd.DataFrame({
            "train": [train_dist.get(c, 0) for c in all_categories],
            "live":  [live_dist.get(c, 0) for c in all_categories]
        })
        chi2, p, _, _ = chi2_contingency(observed.T)
        results[col] = {"chi2": chi2, "p_value": p, "drift": p < threshold}
    return pd.DataFrame(results).T

监控流水线 (Evidently AI)

手写 KS/PSI 麻烦, 用 evidently 一键生成报告:

# pip install evidently
from evidently.report import Report
from evidently.metrics import DataDriftTable, DatasetDriftMetric

# 准备 reference (训练) 和 current (线上)
report = Report(metrics=[DataDriftTable(), DatasetDriftMetric()])
report.run(reference_data=train, current_data=live)

# 1. HTML 可视化
report.save_html("drift_report.html")

# 2. 字典输出
result = report.as_dict()
drift_score = result["metrics"][0]["result"]["drift_by_columns"]
print(drift_score)
# {'user_age': {'drift_detected': True, 'drift_score': 0.21}, ...}

evidently 还支持:

  • 目标漂移 (Target Drift)
  • 模型性能 (Regression / Classification Performance)
  • 数据质量 (Data Quality)

完整监控栈

[线上模型] → 日志 (input, output, latency) → Kafka → 离线分析 (Spark/Flink)
                                                  ↓
                            KS / PSI / 卡方检验 (Evidently / whylogs)
                                                  ↓
                            漂移告警 → Slack / 钉钉 / PagerDuty
                                                  ↓
                            触发重训 pipeline (Airflow)

告警阈值建议

指标黄警红警
P99 延迟> 200ms> 500ms
错误率> 1%> 5%
PSI (任一特征)> 0.1> 0.25
输出分布偏移> 5%> 10%
转化率下降> 5%> 10%

5 个工程实践

  1. 每周自动跑一次漂移报告, 不要等出问题
  2. 同时监控输入和输出分布, 任何一个变了都要查
  3. 保留近 30 天线上数据, 用于事后分析
  4. 漂移阈值根据业务调, 金融领域 PSI > 0.1 就该重训
  5. 告警去重: 5 分钟内同类告警不重复推

小结

  • 模型会"老化": 用户偏好、季节、竞品都在变
  • 监控 3 件套: 延迟 / 错误 / 性能 (P99 / error / accuracy)
  • 数据漂移: KS (连续) / PSI (连续+风控) / 卡方 (类别)
  • Evidently AI 一键生成漂移报告
  • 漂移阈值: PSI 0.1 黄 / 0.25 红
  • 重训 pipeline: 漂移告警 → Airflow 触发 → MLflow 记录新版本

练习思考

  1. 用 Evidently 对你前 30 天和最近 30 天的用户行为数据做漂移报告, 哪些特征漂移最严重?
  2. 金融领域 PSI > 0.1 就重训, 互联网产品可以宽松到 0.2, 为什么?
  3. 没有真实 label 时, 怎么估算"模型准确率"? 除了抽样, 还有其他办法吗?

章末小测验

检验你对《线上监控与数据漂移》的掌握程度。

1

KS 检验适合检测什么类型特征的漂移?

2

数据漂移的 3 大类是?

这门课在以下学习路径中

当前课程出现在 1 条系统化路径里, 你可以一键生成完整学习计划, 自动跳过已完成章节。

还有疑问? 问问 AI (v19.5)

基于全站 19 门课 68 章内容检索 + LLM 总结, 会引用具体章节作为出处

讨论区(0)

加载评论中...