ふくふくHukuhuku Inc.
EP.10Anomaly 10分公開: 2026-05-10

異常検知パイプラインの構築・運用:ml と組み合わせる

「精度が出るモデル」より「運用できるパイプライン」。データドリフト・モデル劣化・Feedback Loop の設計まで。

#ml#pipeline
CO📔 Google Colab で開く(上から順にセルを実行)
シェア

精度が出るモデルを作るより、運用できるパイプラインを作る方が事業インパクト大。本記事は本編の到達点として、 の観点から異常検知システムを扱います。

データドリフトの検出

指標意味閾値
(Population Stability Index)分布の安定性< 0.1 安定、> 0.25 大変化
Wasserstein 距離確率分布間の距離業務文脈による
KS 検定分布の同一性p < 0.05 で異なる

モデル再学習のトリガー

  • スケジュールベース:週次 / 月次で機械的に
  • 性能ベース:F1 が閾値割れしたら
  • ドリフトベース:PSI > 0.25 で発火
  • 手動:ドメイン専門家が判断

Feedback Loop

人手レビューの結果(「これは異常」「これはノイズ」)を学習データに戻す。Active Learning で最も学習価値の高いケースを優先レビュー。

Champion / Challenger

本番モデルの新陳代謝

現行(Champion)モデルを本番で動かしつつ、新候補(Challenger) を 5〜20% のトラフィックで 。指標で勝てば置換。 EP.17/18 の A/B テスト手法そのもの。

Python 実装:データドリフト検出

PSI(Population Stability Index)
Python
import numpy as np
def psi(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:    """学習データ vs 本番データの分布差を測る。    < 0.1: 安定 / 0.1〜0.25: 軽微な変化 / > 0.25: 大変化"""    # 学習データに基づいて bin を決定    bin_edges = np.percentile(expected, np.linspace(0, 100, bins + 1))    bin_edges = np.unique(bin_edges)  # 重複削除
    expected_pct = np.histogram(expected, bins=bin_edges)[0] / len(expected)    actual_pct = np.histogram(actual, bins=bin_edges)[0] / len(actual)
    # 0 除算防止    expected_pct = np.where(expected_pct == 0, 0.0001, expected_pct)    actual_pct = np.where(actual_pct == 0, 0.0001, actual_pct)
    return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
# 使用例psi_value = psi(train_features, prod_features)print(f"PSI: {psi_value:.3f}")if psi_value > 0.25:    print("⚠️ 大きなドリフト検出 - モデル再学習を検討")
Wasserstein 距離で多変量ドリフト検出
Python
from scipy.stats import wasserstein_distance, ks_2sampimport pandas as pd
def detect_drift(train_df: pd.DataFrame, prod_df: pd.DataFrame, threshold: float = 0.1):    """各特徴量についてドリフトを検出"""    results = []    for col in train_df.select_dtypes(include="number").columns:        # KS 検定        ks_stat, ks_p = ks_2samp(train_df[col].dropna(), prod_df[col].dropna())        # Wasserstein 距離        wd = wasserstein_distance(train_df[col].dropna(), prod_df[col].dropna())        results.append({            "column": col,            "ks_stat": ks_stat, "ks_p": ks_p,            "wasserstein": wd,            "drifted": ks_p < 0.05,        })    return pd.DataFrame(results)
drift_df = detect_drift(train_df, prod_df)print(drift_df[drift_df["drifted"]])
Champion / Challenger A/B 評価
Python
import numpy as np
def champion_vs_challenger(    champion_scores: np.ndarray,    challenger_scores: np.ndarray,    y_true: np.ndarray,):    """両モデルを同じデータで比較し、勝者を決める"""    from sklearn.metrics import average_precision_score    champion_pr = average_precision_score(y_true, champion_scores)    challenger_pr = average_precision_score(y_true, challenger_scores)
    diff = challenger_pr - champion_pr    print(f"Champion  PR-AUC: {champion_pr:.4f}")    print(f"Challenger PR-AUC: {challenger_pr:.4f}")    print(f"差分: {diff:+.4f}")
    # ブートストラップで信頼区間    n_bootstrap = 1000    diffs = []    n = len(y_true)    for _ in range(n_bootstrap):        idx = np.random.choice(n, n, replace=True)        d = (            average_precision_score(y_true[idx], challenger_scores[idx]) -            average_precision_score(y_true[idx], champion_scores[idx])        )        diffs.append(d)    ci_low, ci_high = np.percentile(diffs, [2.5, 97.5])    print(f"95% CI: [{ci_low:+.4f}, {ci_high:+.4f}]")
    if ci_low > 0:        print("→ Challenger 採用(統計的有意)")    else:        print("→ 様子見 or Champion 維持")
# 使用例champion_vs_challenger(champion_scores, challenger_scores, y_true)

ふくふくの進め方

異常検知システムを本番運用したい」というご相談には、設計(1 ヶ月)→ MLOps 構築 → 運用伴走を 3〜6 ヶ月で。運用が回り始めて初めて成功です。

ここまでのまとめ

EP.01〜10 で異常検知の主要工程を一巡しました。古典統計から深層学習、ストリーミングから MLOps まで。現場で使える異常検知システムを作る指針として活用ください。続編は読者リアクションや現場フィードバックに応じて随時追加していきます。

シェア

この記事の感想を教えてください

あなたの 1 クリックで、本当にこの記事は更新されます。「もっと詳しく」「続編希望」が一定数集まった記事は、 ふくふくが 実際に内容を拡充したり続編記事を公開 します。 送信したリアクションはお使いのブラウザに記録され、再カウントされません。

シリーズの外も探す:

まずは、現状を聞かせてください。

要件が固まっていなくて大丈夫です。現状診断と方針提案までを無料でお手伝いします。

無料相談フォームへ hello [at] hukuhuku [dot] co [dot] jp