精度が出るモデルを作るより、運用できるパイプラインを作る方が事業インパクト大。本記事は本編の到達点として、 の観点から異常検知システムを扱います。
データドリフトの検出
| 指標 | 意味 | 閾値 |
|---|---|---|
| (Population Stability Index) | 分布の安定性 | < 0.1 安定、> 0.25 大変化 |
| Wasserstein 距離 | 確率分布間の距離 | 業務文脈による |
| KS 検定 | 分布の同一性 | p < 0.05 で異なる |
モデル再学習のトリガー
- スケジュールベース:週次 / 月次で機械的に
- 性能ベース:F1 が閾値割れしたら
- ドリフトベース:PSI > 0.25 で発火
- 手動:ドメイン専門家が判断
Feedback Loop
人手レビューの結果(「これは異常」「これはノイズ」)を学習データに戻す。Active Learning で最も学習価値の高いケースを優先レビュー。
Champion / Challenger
本番モデルの新陳代謝
現行(Champion)モデルを本番で動かしつつ、新候補(Challenger) を 5〜20% のトラフィックで 。指標で勝てば置換。 EP.17/18 の A/B テスト手法そのもの。
Python 実装:データドリフト検出
PSI(Population Stability Index)
Python
import numpy as np
def psi(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float: """学習データ vs 本番データの分布差を測る。 < 0.1: 安定 / 0.1〜0.25: 軽微な変化 / > 0.25: 大変化""" # 学習データに基づいて bin を決定 bin_edges = np.percentile(expected, np.linspace(0, 100, bins + 1)) bin_edges = np.unique(bin_edges) # 重複削除
expected_pct = np.histogram(expected, bins=bin_edges)[0] / len(expected) actual_pct = np.histogram(actual, bins=bin_edges)[0] / len(actual)
# 0 除算防止 expected_pct = np.where(expected_pct == 0, 0.0001, expected_pct) actual_pct = np.where(actual_pct == 0, 0.0001, actual_pct)
return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
# 使用例psi_value = psi(train_features, prod_features)print(f"PSI: {psi_value:.3f}")if psi_value > 0.25: print("⚠️ 大きなドリフト検出 - モデル再学習を検討")Wasserstein 距離で多変量ドリフト検出
Python
from scipy.stats import wasserstein_distance, ks_2sampimport pandas as pd
def detect_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame, threshold: float = 0.1): """各特徴量についてドリフトを検出""" results = [] for col in train_df.select_dtypes(include="number").columns: # KS 検定 ks_stat, ks_p = ks_2samp(train_df[col].dropna(), prod_df[col].dropna()) # Wasserstein 距離 wd = wasserstein_distance(train_df[col].dropna(), prod_df[col].dropna()) results.append({ "column": col, "ks_stat": ks_stat, "ks_p": ks_p, "wasserstein": wd, "drifted": ks_p < 0.05, }) return pd.DataFrame(results)
drift_df = detect_drift(train_df, prod_df)print(drift_df[drift_df["drifted"]])Champion / Challenger A/B 評価
Python
import numpy as np
def champion_vs_challenger( champion_scores: np.ndarray, challenger_scores: np.ndarray, y_true: np.ndarray,): """両モデルを同じデータで比較し、勝者を決める""" from sklearn.metrics import average_precision_score champion_pr = average_precision_score(y_true, champion_scores) challenger_pr = average_precision_score(y_true, challenger_scores)
diff = challenger_pr - champion_pr print(f"Champion PR-AUC: {champion_pr:.4f}") print(f"Challenger PR-AUC: {challenger_pr:.4f}") print(f"差分: {diff:+.4f}")
# ブートストラップで信頼区間 n_bootstrap = 1000 diffs = [] n = len(y_true) for _ in range(n_bootstrap): idx = np.random.choice(n, n, replace=True) d = ( average_precision_score(y_true[idx], challenger_scores[idx]) - average_precision_score(y_true[idx], champion_scores[idx]) ) diffs.append(d) ci_low, ci_high = np.percentile(diffs, [2.5, 97.5]) print(f"95% CI: [{ci_low:+.4f}, {ci_high:+.4f}]")
if ci_low > 0: print("→ Challenger 採用(統計的有意)") else: print("→ 様子見 or Champion 維持")
# 使用例champion_vs_challenger(champion_scores, challenger_scores, y_true)ふくふくの進め方
「異常検知システムを本番運用したい」というご相談には、設計(1 ヶ月)→ MLOps 構築 → 運用伴走を 3〜6 ヶ月で。運用が回り始めて初めて成功です。
ここまでのまとめ
EP.01〜10 で異常検知の主要工程を一巡しました。古典統計から深層学習、ストリーミングから MLOps まで。現場で使える異常検知システムを作る指針として活用ください。続編は読者リアクションや現場フィードバックに応じて随時追加していきます。
この記事の感想を教えてください
あなたの 1 クリックで、本当にこの記事は更新されます。「もっと詳しく」「続編希望」が一定数集まった記事は、 ふくふくが 実際に内容を拡充したり続編記事を公開 します。 送信したリアクションはお使いのブラウザに記録され、再カウントされません。