bigquery load(bqへのデータのアップロード)のチートシート
概要
bq
コマンドでアップロードする方法とpython等のスクリプトでload(アップロード)する方法などがある
pythonでデータをアップロードする
- pandasの
pandas.io.json.build_table_schema
関数を使用することでschema情報をダンプすることができる - load(アップロード)に失敗した場合はwebUIから確認できる
具体的な実装例
# ref. https://stackoverflow.com/questions/59681072/can-you-load-json-formatted-data-to-a-bigquery-table-using-python-and-load-table
import glob
import pandas as pd
from pandas.io.json import build_table_schema
from google.cloud import bigquery
import io
from loguru import logger
from tqdm.auto import tqdm
project = "your_project_id"
dataset_name = "your_dataset_name"
def load_to_bigquery(x: pd.DataFrame, table_name: str, mode: str = "replace"):
# BQで読み込み可能なスキーマ情報を、pandasの機能で作成する
# NULLABLE情報を追加
table_schema = build_table_schema(x, index=False)["fields"]
for schema in table_schema:
if schema["type"] == "number":
schema["type"] = "FLOAT" # NUMBER を FLOAT に修正
schema["mode"] = "NULLABLE"
# そのままjson化すると、unixtimeになってしまうので、文字列に変換(必要に応じて変更)
x["timestamp"] = x["timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S")
data = x.to_json(orient="records", lines=True)
# 末尾のブランクの改行を削除
data = data.strip()
lines = data.split("\n")
# 書き込み先のGCPプロジェクト, dataset, テーブル名を指定
# datasetが存在しない場合、エラーになる
client = bigquery.Client(project=project)
dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
# デフォルトでは追記する形でBQにデータを保存する
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = table_schema
# パーティションを使用しない場合はこの設定は必要ない
# この例ではDAY単位でパーティションを作成すが、他にもHOUR, MONTH, YEARが指定可能
# (DAYではパーティションの更新界数が多すぎてクオータ制限に引っかかることがあるのでその場合はMONTHなどに変更する)
job_config.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="timestamp", # パーティションに使うカラム
# expiration_ms=7776000000, # 90 days.
)
# mode オプションで追記 (append) かリプレース (replace) かを選択
if mode == "replace":
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
logger.info(f"table_schema = {table_schema}")
# ファイルとして指定することで読み込むが、io.StringIO経由でも読み込める
job = client.load_table_from_file(io.StringIO(data), table, job_config=job_config)
# 結果を出力
logger.info(f"result = {job.result()}")