民主化の入口として最も効くのが Slack Bot。「先週の売上は?」と書くだけで集計結果が返ってくる。実装はシンプルですが、ガードレールを最初から内蔵しないと EP.02 の暴走パターンを引き寄せます。本記事では、最小だが本番投入できる構成を 1 本通します。
本記事のコア部分(許可テーブル抽出 → → SQL 検査 → DRY RUN コスト見積 → 実行 → マスキング → ログ)を `/notebooks/dd-04-slack-sql-bot.ipynb` で動かせます。Slack 連携部分は説明のみ、コアロジックは即動作。
全体アーキテクチャ
- 1ユーザ入力受付 (Slack の app_mention or slash command)
- 2メタデータ取得 (許可テーブル一覧 + 各カラムの説明)
- 3LLM で SQL 生成 (プロンプトに許可テーブル list を埋め込む)
- 4SQL 静的検査 (`*` 使用 / DML / 許可外テーブル を弾く)
- 5DRY RUN でコスト見積 ( `--dry-run` / EXPLAIN)
- 6閾値超過なら確認 (「3GB スキャンします、続けますか?」)
- 7実行 + 行数制限 (LIMIT 1000 を強制)
- 8結果のマスキング ( カラムを正規表現で `***`)
- 9結果整形 + Slack 投稿 (表 / グラフのスニペット添付)
- 10監査ログ書き込み (ユーザ/SQL/コスト/結果サンプルの hash)
1. メタデータからの許可テーブル列挙
import jsonfrom pathlib import Path
manifest = json.loads(Path("target/manifest.json").read_text())
# dbt model の meta.published == True のものだけを Bot に開放allowed_tables = {}for node_id, node in manifest["nodes"].items(): if node["resource_type"] != "model": continue if not node.get("meta", {}).get("published"): continue fqn = f"{node['database']}.{node['schema']}.{node['name']}" allowed_tables[fqn] = { "description": node.get("description", ""), "columns": { c: cfg.get("description", "") for c, cfg in node.get("columns", {}).items() }, }
print(f"許可テーブル数: {len(allowed_tables)}")2. LLM での SQL 生成プロンプト
import anthropic
client = anthropic.Anthropic()
def generate_sql(question: str, allowed_tables: dict) -> str: # メタデータを構造化テキストで埋め込む schema_doc = "\n\n".join( f"-- TABLE: {t}\n-- DESC: {info['description']}\n" + "\n".join(f"-- {c}: {d}" for c, d in info["columns"].items()) for t, info in allowed_tables.items() )
prompt = f"""あなたは BigQuery SQL を書くアナリストです。以下のテーブルだけが使えます。これ以外のテーブルを参照する SQL は書かないでください。
{schema_doc}
ルール:- SELECT 文のみ。INSERT/UPDATE/DELETE/DDL は不可- LIMIT 1000 を必ず付ける- WHERE 句で _PARTITIONTIME を 30 日以内に絞る- カラム名を必ず指定(SELECT * は禁止)- 結果を返すだけでなく簡単なコメントで何を計算したかを書く
質問: {question}""" msg = client.messages.create( model="claude-opus-4-7", max_tokens=2000, messages=[{"role": "user", "content": prompt}], ) return msg.content[0].text3. SQL の静的検査(守りの中核)
import reimport sqlparse
DENY_PATTERNS = [ (r"\bSELECT\s+\*", "SELECT * は禁止です。カラム名を明示してください"), (r"\b(INSERT|UPDATE|DELETE|DROP|TRUNCATE|CREATE|ALTER|GRANT|REVOKE)\b", "DML/DDL は禁止です。SELECT 文のみ受け付けます"), (r"\bINFORMATION_SCHEMA\b", "メタデータは Bot 経由で参照してください"),]
def static_check(sql: str, allowed_tables: set[str]) -> tuple[bool, str]: sql_upper = sql.upper()
for pat, msg in DENY_PATTERNS: if re.search(pat, sql_upper): return False, msg
# 参照テーブル抽出(簡易: FROM/JOIN の直後) refs = re.findall(r"\b(?:FROM|JOIN)\s+\`?([\w\.\-]+)\`?", sql_upper) for ref in refs: if ref.lower() not in {t.lower() for t in allowed_tables}: return False, f"テーブル {ref} は許可されていません"
# LIMIT があるか if not re.search(r"\bLIMIT\s+\d+", sql_upper): return False, "LIMIT 句が必要です(最大 1000)"
return True, "OK"4. DRY RUN でコスト見積 + 閾値ガード
from google.cloud import bigquery
bq = bigquery.Client()COST_USD_LIMIT = 1.0 # 1 クエリ $1 を超えたら確認を求めるPRICE_PER_TB = 5.0
def estimate_cost(sql: str) -> dict: job = bq.query(sql, job_config=bigquery.QueryJobConfig(dry_run=True)) bytes_billed = job.total_bytes_processed cost_usd = (bytes_billed / 1e12) * PRICE_PER_TB return {"bytes": bytes_billed, "tb": bytes_billed / 1e12, "usd": cost_usd}
def execute_with_guard(sql: str, user: str): est = estimate_cost(sql) if est["usd"] > COST_USD_LIMIT: return { "status": "needs_confirmation", "message": f"このクエリは約 ${est['usd']:.2f} かかります({est['tb']:.2f} TB スキャン)。続行しますか?", "estimate": est, } job = bq.query(sql, job_config=bigquery.QueryJobConfig(maximum_bytes_billed=int(est["bytes"] * 1.5))) return {"status": "ok", "rows": [dict(r) for r in job.result()], "estimate": est}5. 結果のマスキング
import re
PATTERNS = { "email": re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+"), "phone": re.compile(r"\b0\d{1,4}-?\d{1,4}-?\d{4}\b"), "card": re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"),}
def mask_row(row: dict) -> dict: out = {} for k, v in row.items(): sv = str(v) if v is not None else "" for label, pat in PATTERNS.items(): sv = pat.sub(f"<{label}_masked>", sv) out[k] = sv return outColumn-Level Security(EP.05)でカラム単位に 側で制御するのが第一防壁。Bot 側の正規表現は 第二の安全網として「ロール設定漏れ」をカバーする目的。両方使う。
6. 監査ログ
import json, hashlibfrom datetime import datetimefrom google.cloud import bigquery
audit_table = "analytics.bot_audit_log"
def audit_log(*, user: str, question: str, sql: str, result_rows: int, cost_usd: float, status: str, masked_sample: list[dict]): bq = bigquery.Client() sample_hash = hashlib.sha256(json.dumps(masked_sample, sort_keys=True).encode()).hexdigest()[:16] bq.insert_rows_json(audit_table, [{ "ts": datetime.utcnow().isoformat(), "user": user, "question": question, "sql": sql, "result_rows": result_rows, "cost_usd": cost_usd, "status": status, "result_sample_hash": sample_hash, # 内容ではなくハッシュだけ残す(PII漏洩防止) }])7. 全体の繋ぎ
def handle_slack_message(event): user = event["user"] question = event["text"]
# 1) SQL 生成 sql = generate_sql(question, allowed_tables)
# 2) 静的検査 ok, msg = static_check(sql, set(allowed_tables.keys())) if not ok: post_to_slack(user, f"❌ {msg}\n生成された SQL:\n\`\`\`{sql}\`\`\`") audit_log(user=user, question=question, sql=sql, result_rows=0, cost_usd=0.0, status="rejected", masked_sample=[]) return
# 3) DRY RUN + 実行 result = execute_with_guard(sql, user) if result["status"] == "needs_confirmation": post_to_slack(user, result["message"] + "\n\`\`\`{sql}\`\`\`\nReact 👍 で実行") # 実行ボタンの実装は省略。React で確認後に execute() を呼び戻す return
# 4) マスキング masked = [mask_row(r) for r in result["rows"][:50]]
# 5) 投稿 + 監査 post_to_slack(user, format_table(masked)) audit_log(user=user, question=question, sql=sql, result_rows=len(result["rows"]), cost_usd=result["estimate"]["usd"], status="ok", masked_sample=masked[:5])観測すべき
| 指標 | 閾値の目安 | 対応 |
|---|---|---|
| 静的検査リジェクト率 | 20% 以下 | 高い場合: プロンプトを改善 / 教育 |
| 平均クエリコスト | $0.05 以下 | 高い場合: 許可テーブルを絞る / カラム説明を充実 |
| クエリエラー率 | 5% 以下 | 高い場合: 生成プロンプト見直し |
| 週次アクティブユーザ | 対象部門の 30% 以上 | 低い場合: 価値の刺さらないユースケース |
| 平均応答時間 | 10 秒以内 | 遅い場合: メタデータキャッシュ / DRY RUN 並列化 |
ふくふくの構築実績パターン
1〜2 部門での PoC 構築は 2 週間〜1 ヶ月。許可テーブル 10〜30 個から始めて、監査ログを毎週レビューしながら エラーパターンに応じてプロンプトを改善するサイクルを 3 ヶ月続けると、業務に定着します。重要なのは 「最初の正解率を上げる」より 「事故を起こさない」を優先する設計。
次の話
EP.05 では、Bot のガードレール下層にあたる権限管理。BigQuery / Snowflake で Row-Level Security と Column-Level Security をどう設計・運用するかを扱います。
この記事の感想を教えてください
あなたの 1 クリックで、本当にこの記事は更新されます。「もっと詳しく」「続編希望」が一定数集まった記事は、 ふくふくが 実際に内容を拡充したり続編記事を公開 します。 送信したリアクションはお使いのブラウザに記録され、再カウントされません。