ふくふくHukuhuku Inc.
EP.11Algorithms 8分公開: 2026-05-11

Reservoir Sampling:1 兆件のストリームから k 件を等確率で取る 5 行の魔法

「全件読まずに、ストリームから k 件をランダム抽出」を可能にする Reservoir Sampling。BigQuery TABLESAMPLE / ABTest / ログ取得の中身を Python で実装。

#Reservoir Sampling#サンプリング#ストリーミング
CO📔 Google Colab で開く(上から順にセルを実行)
シェア

「Twitter の 1 日 5 億ツイートから、ランダムな 1000 件を等確率で抽出する」 ── ストリームの総数を事前に知らずに、メモリ 1000 件だけで偏りなくサンプリングする方法が Reservoir Sampling。1985 年に発明され、いまも BigQuery TABLESAMPLE / Apache Spark / 機械学習 の中核で動く 5 行の魔法。

1. これで何が動いているか

  • BigQuery TABLESAMPLE BERNOULLI: 大規模テーブルのサンプリング
  • Apache Spark の RDD.takeSample: 分散データの抽出
  • 機械学習: 訓練データのバッチ抽出 (mini-batch)
  • Twitter / Mastodon API: firehose のサブサンプリング
  • A/B テスト: 実験対象の無作為割当
  • Apache Kafka: log compaction の sampling

2. 仕組みのざっくり

  • reservoir という配列を size k で用意
  • 最初の k 件 はそのまま reservoir に入れる
  • i 番目 (i ≥ k) の要素について、確率 k/i で reservoir のランダムな位置と置換
  • ストリーム終了時の reservoir が、等確率 (k/N) のサンプル

3. Python 実装 (動作確認済)

わずか 5 行の Algorithm R
Python
import random
def reservoir_sample(stream, k):    '''ストリームから k 件を等確率でサンプリング'''    res = []    for i, item in enumerate(stream):        if i < k:            res.append(item)  # 最初の k 件はそのまま        else:            j = random.randint(0, i)  # [0, i] の整数            if j < k:                res[j] = item  # k/i の確率で置換    return res
# 100 万件のストリームから 10 件抽出random.seed(0)sample = reservoir_sample(range(1_000_000), k=10)print(f'Sample: {sample}')
# 等確率の検証: 100 万試行counts = [0] * 100for _ in range(100_000):    sample = reservoir_sample(range(100), k=10)    for s in sample:        counts[s] += 1
avg = sum(counts) / 100print(f'\n各要素の出現回数: 平均 {avg:.0f}')print(f'最大: {max(counts)}, 最小: {min(counts)}')print(f'偏り: {(max(counts) - min(counts)) / avg * 100:.1f}%')

4. 重み付き版 (A-Res)

重みに比例した確率でサンプリング
Python
import randomimport heapq
def weighted_reservoir_sample(stream, k):    '''    各要素 (item, weight) に対し、key = random()^(1/weight) を計算。    key の大きい上位 k 件を残す (min-heap で管理)。    '''    heap = []    for item, weight in stream:        key = random.random() ** (1 / weight)        if len(heap) < k:            heapq.heappush(heap, (key, item))        elif key > heap[0][0]:            heapq.heapreplace(heap, (key, item))    return [item for _, item in heap]
# サンプル: 重み付きアイテムから 5 件items = [    ('A', 1), ('B', 2), ('C', 1), ('D', 5), ('E', 1),    ('F', 10), ('G', 1), ('H', 3), ('I', 1), ('J', 1)]random.seed(42)samples = []for _ in range(10000):    samples.extend(weighted_reservoir_sample(items, k=3))
# F (重み 10) が最も多く出るはずfrom collections import Countercounts = Counter(samples)print('重み付きサンプリングの分布:')for item, w in sorted(items, key=lambda x: -x[1]):    print(f'  {item} (w={w:2d}): {counts[item]:4d} 回')

5. SQL での使い方

BigQuery / Snowflake の TABLESAMPLE
SQL
-- BigQuery: テーブル全体の 0.1% をサンプリングSELECT * FROM `project.dataset.events`TABLESAMPLE SYSTEM (0.1 PERCENT);
-- Snowflake: 1000 行をランダム抽出 (Bernoulli sampling)SELECT * FROM events SAMPLE (1000 ROWS);
-- PostgreSQL: TABLESAMPLE BERNOULLISELECT * FROM events TABLESAMPLE BERNOULLI(1);  -- 1% 抽出
-- 内部実装は基本的に Reservoir Sampling 系統

6. メリットとデメリット

  • メリット: 1 パス、メモリ O(k) で巨大ストリームに対応
  • メリット: 等確率 (k/N) を数学的に保証
  • メリット: 重み付き拡張が容易
  • デメリット: ランダムシードに依存 (再現性のためには seed 固定)
  • デメリット: 「最後の N 件 = 最近のデータを優先」 等の偏りは別アルゴリズム (Sliding Window Sampling)

7. 次の話

EP.12 では Skip List を扱います。Redis の Sorted Set / LevelDB / RocksDB が使う、O(log n) で挿入・検索・順序取得が全部できる 確率的データ構造。

シェア

この記事の感想を教えてください

あなたの 1 クリックで、本当にこの記事は更新されます。「もっと詳しく」「続編希望」が一定数集まった記事は、 ふくふくが 実際に内容を拡充したり続編記事を公開 します。 送信したリアクションはお使いのブラウザに記録され、再カウントされません。

シリーズの外も探す:

まずは、現状を聞かせてください。

要件が固まっていなくて大丈夫です。現状診断と方針提案までを無料でお手伝いします。

無料相談フォームへ hello [at] hukuhuku [dot] co [dot] jp