dbt(data build tool)
概要
- dbt はデータウェアハウス上の変換レイヤーを SQL + Jinja テンプレートで管理するツール
- ELT(Extract → Load → Transform)の Transform 部分を担う
- モデルごとに
.sqlファイルを作成し、ref()関数で依存関係を宣言すると DAG(有向非巡回グラフ)を自動生成する - BigQuery, Snowflake, Redshift, DuckDB などの主要なデータウェアハウスに対応
- テスト・ドキュメント生成・スナップショット(SCD)などの機能が標準で含まれる
- dbt Core(OSS)と dbt Cloud(マネージドサービス)がある
メダリオンアーキテクチャとの対応
dbt のレイヤー設計はメダリオンアーキテクチャ(Bronze / Silver / Gold)と対応させると整理しやすい
| メダリオン | dbt レイヤー | 役割 |
|---|---|---|
| Bronze | staging | ソースからの生データを型変換・リネームのみで加工 |
| Silver | intermediate | 複数テーブルの JOIN や中間集計など再利用される変換 |
| Gold | marts | BI ツールや分析者が直接使うビジネスロジック層 |
- staging はソース 1 テーブルに対し 1 モデルを作るのが原則
- intermediate は staging 同士を結合したり、共通の加工ロジックをまとめる中間層
- marts は fact / dim テーブルや集計テーブルとして最終成果物を提供する
ディレクトリ構造
my_project/
├── dbt_project.yml # プロジェクト設定
├── profiles.yml # 接続先設定(~/.dbt/ に置くことが多い)
├── models/
│ ├── staging/ # Bronze: ソースを型変換・リネームのみで加工
│ │ └── stg_orders.sql
│ ├── intermediate/ # Silver: staging を結合・中間集計する層
│ │ └── int_orders_with_users.sql
│ └── marts/ # Gold: ビジネスロジックを集約する最終成果物
│ └── fct_orders.sql
├── tests/ # カスタムテスト
├── macros/ # Jinja マクロ
├── snapshots/ # SCD (Slowly Changing Dimensions)
└── seeds/ # CSV を直接ロードするファイル
インストールと初期化
pip install dbt-bigquery # アダプタに応じて変更(dbt-snowflake, dbt-duckdb 等)
dbt init my_project # プロジェクト作成
dbt debug # 接続テスト
dbt_project.yml の例
name: my_project
version: "1.0.0"
profile: my_profile
models:
my_project:
staging:
+materialized: view # Bronze: view として作成
intermediate:
+materialized: view # Silver: 基本は view(重い場合は table や ephemeral に変更)
marts:
+materialized: table # Gold: table として作成
+persist_docs:
relation: true # テーブル・ビューのコメントを DWH に反映
columns: true # カラムコメントを DWH に反映
モデルの書き方
staging モデル(stg_orders.sql)
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
)
SELECT
order_id,
user_id,
DATE(created_at) AS order_date,
status,
amount
FROM source
intermediate モデル(int_orders_with_users.sql)
staging モデルを結合・加工し、複数の mart モデルから再利用できる中間テーブルを作る
-- models/intermediate/int_orders_with_users.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
users AS (
SELECT * FROM {{ ref('stg_users') }}
)
SELECT
o.order_id,
o.order_date,
o.status,
o.amount,
u.user_name,
u.user_segment
FROM orders AS o
LEFT JOIN users AS u USING (user_id)
- intermediate のモデルはプレフィックス
int_を付けるのが慣例 - staging をそのまま mart で結合すると mart が肥大化するため、共通の JOIN ロジックはここに切り出す
- マテリアライゼーションは
viewかephemeralが多い(mart が参照する CTE として埋め込む場合はephemeral)
mart モデル(fct_orders.sql)
intermediate を参照してビジネスロジックを追加する
-- models/marts/fct_orders.sql
WITH base AS (
SELECT * FROM {{ ref('int_orders_with_users') }}
)
SELECT
order_id,
order_date,
user_name,
user_segment,
amount,
CASE
WHEN status = 'completed' THEN amount
ELSE 0
END AS revenue
FROM base
{{ source('schema', 'table') }}でソーステーブルを参照する{{ ref('model_name') }}で他のモデルを参照し、依存関係を自動解決する
スキーマファイル(schema.yml)
version: 2
sources:
- name: raw
database: my_project
schema: raw_data
tables:
- name: orders
models:
- name: stg_orders
description: "受注データのステージングモデル"
columns:
- name: order_id
description: "注文ID"
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ["pending", "completed", "cancelled"]
カラム説明の DWH への反映(persist_docs)
- schema.yml に書いた
descriptionは dbt docs にしか反映されないのがデフォルト persist_docsを有効にすると BigQuery や Snowflake のカラムコメントとして書き込まれる- データカタログや BigQuery Studio 上でカラムの意味が確認できるようになる
dbt_project.yml での設定
models:
my_project:
+persist_docs:
relation: true # テーブル/ビュー自体の description をコメントとして保存
columns: true # カラムの description をコメントとして保存
モデル単位で個別に設定することも可能
{{
config(
persist_docs={"relation": true, "columns": true}
)
}}
SELECT ...
- BigQuery では
dbt run後にテーブルのスキーマ欄でカラムの説明が確認できる relation: trueのみでも効果があるが、カラムレベルまで反映するにはcolumns: trueも必要
主要コマンド
dbt run # 全モデルを実行
dbt run --select stg_orders # 特定モデルのみ実行
dbt run --select +fct_orders # fct_orders と上流モデルを実行
dbt run --select fct_orders+ # fct_orders と下流モデルを実行
dbt test # 全テストを実行
dbt test --select stg_orders # 特定モデルのテストのみ実行
dbt docs generate # ドキュメントを生成
dbt docs serve # ドキュメントをブラウザで確認
dbt seed # seeds/ の CSV をロードする
dbt snapshot # snapshots/ を実行する
dbt compile # SQL をコンパイルして target/ に出力(実行はしない)
dbt build # run + test + seed + snapshot をまとめて実行
マクロ(macros/)
再利用可能な Jinja マクロを定義できる
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)
{% endmacro %}
-- モデル内での使用
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_orders') }}
スナップショット(SCD Type 2)
レコードの変更履歴を保持する Slowly Changing Dimensions に対応
-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='order_id',
strategy='timestamp',
updated_at='updated_at'
)
}}
SELECT * FROM {{ source('raw', 'orders') }}
{% endsnapshot %}
BigQuery との組み合わせ
profiles.yml の設定例
my_profile:
target: dev
outputs:
dev:
type: bigquery
method: oauth
project: my_gcp_project
dataset: dbt_dev
threads: 4
timeout_seconds: 300
パーティション・クラスタリングの設定
{{
config(
materialized='table',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=["user_id"]
)
}}
SELECT * FROM {{ ref('stg_orders') }}
マテリアライゼーションの種類
view: ビューとして作成(デフォルト)table: テーブルとして毎回全量作成incremental: 差分のみ追記・更新する(大テーブルに有効)ephemeral: テーブルを作らず CTE として他モデルに埋め込む
incremental の例
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}