Twitter の「いまホット」トレンド、Cloudflare の DDoS 検知、リアルタイム広告 CTR 集計。これら 「ストリーミング (流れてくる) データの中から、頻度の高いアイテムを少ないメモリで見つける」 問題を解くのが Count-Min Sketch (CMS)。
1. これで何が動いているか
- Twitter のリアルタイムトレンド検出
- Google Trends の検索クエリ集計
- Cloudflare の DDoS 攻撃検知 (頻出 IP)
- Apache Kafka Streams の集計演算
- Apache Spark / Flink の Streaming Aggregation
- Redis の `CMS.*` コマンド (RedisBloom モジュール)
2. 仕組みのざっくり
- 幅 w × 深さ d の 2 次元カウンタ表 (例: 2048 × 5)
- 各深さ層に 異なるハッシュ関数 が割当てられる
- 追加 (add): アイテムを d 個のハッシュにかけ、各層の対応セルを +1
- 検索 (count): アイテムを d 個のハッシュにかけ、各層のセル値の 最小値を返す
- 最小値を返すのがポイント: ハッシュ衝突は値を「上振れ」させるが、d 個のうち 1 つは衝突なし or 衝突が少ないと期待できる
3. Python 実装 (動作確認済)
純粋 Python の Count-Min Sketch
Python
import hashlib
class CountMinSketch: def __init__(self, width=2048, depth=5): self.w, self.d = width, depth self.table = [[0] * width for _ in range(depth)] # 異なるハッシュ関数のための seed self.hashes = [hashlib.sha256(str(i).encode()).digest() for i in range(depth)]
def _idx(self, item, i): h = hashlib.blake2b(str(item).encode() + self.hashes[i], digest_size=4).digest() return int.from_bytes(h, 'big') % self.w
def add(self, item, count=1): for i in range(self.d): self.table[i][self._idx(item, i)] += count
def count(self, item): # 全ハッシュ層の最小値 (上振れを抑えるため) return min(self.table[i][self._idx(item, i)] for i in range(self.d))
def merge(self, other): '''複数の CMS を結合 (分散集計)''' for i in range(self.d): for j in range(self.w): self.table[i][j] += other.table[i][j]
# テスト: 8 単語のストリームを 100,000 件処理import randomrandom.seed(0)words = ['the', 'quick', 'brown', 'fox', 'jumps', 'over', 'lazy', 'dog']weights = [100, 50, 30, 30, 20, 20, 10, 5]
cms = CountMinSketch(width=512, depth=5)true = {}for _ in range(100_000): w = random.choices(words, weights=weights)[0] cms.add(w) true[w] = true.get(w, 0) + 1
print('Word | True | CMS | Error')for w in words: est = cms.count(w) err = abs(est - true[w]) / true[w] * 100 print(f' {w:8s} | {true[w]:6d} | {est:6d} | {err:.2f}%')
# 実機実行結果: メモリ ~10KB、誤差 0% (この規模)4. パラメータと精度
| w (幅) | d (深さ) | メモリ | 誤差 ε | 信頼度 |
|---|---|---|---|---|
| 256 | 3 | 3 KB | 1.06% | 87.5% |
| 1024 | 5 | 20 KB | 0.27% | 96.9% |
| 2048 | 5 | 40 KB | 0.13% | 96.9% |
| 8192 | 7 | 224 KB | 0.03% | 99.2% |
5. Heavy Hitters (頻出 TOP K) の実装
CMS + heap で TOP K 検出
Python
import heapq
class TopK: def __init__(self, k=10, cms_width=2048, cms_depth=5): self.k = k self.cms = CountMinSketch(width=cms_width, depth=cms_depth) self.heap = [] # (count, item) の最小ヒープ self.in_heap = set()
def add(self, item): self.cms.add(item) c = self.cms.count(item) if item in self.in_heap: # 既に heap にあれば再構築 for i, (cnt, it) in enumerate(self.heap): if it == item: self.heap[i] = (c, it) heapq.heapify(self.heap) return if len(self.heap) < self.k: heapq.heappush(self.heap, (c, item)) self.in_heap.add(item) elif c > self.heap[0][0]: old = heapq.heapreplace(self.heap, (c, item)) self.in_heap.discard(old[1]) self.in_heap.add(item)
def top(self): return sorted(self.heap, key=lambda x: -x[0])6. メリットとデメリット
| 観点 | CMS | ハッシュテーブル |
|---|---|---|
| メモリ | 固定 (KB) | アイテム数に比例 (GB) |
| 速度 | O(d) per op | O(1) per op |
| 精度 | ε 誤差 (上振れ方向のみ) | 100% |
| マージ | 可能 (要素ごとに加算) | 全データ再走査 |
| 減算 | 可能 (Count-Min-Mean Sketch) | 可能 |
| 用途 | ストリーミング・大規模 | 正確性必須・小規模 |
7. 関連ツール
- RedisBloom: Redis の `CMS.*` コマンド
- Apache DataSketches: CMS / HLL / Theta Sketch を統合
- stream-lib (Java): Twitter で使われる
- probables (Python): BloomFilter / CountMin / TopK 統合ライブラリ
8. 次の話
EP.05 では Bloom Filter を扱います。「この URL は犯罪サイトか?」を 117KB で 99% 以上正答する確率的フィルタの実装と運用。
この記事の感想を教えてください
あなたの 1 クリックで、本当にこの記事は更新されます。「もっと詳しく」「続編希望」が一定数集まった記事は、 ふくふくが 実際に内容を拡充したり続編記事を公開 します。 送信したリアクションはお使いのブラウザに記録され、再カウントされません。