bigquery load(bqへのデータのアップロード)のチートシート
概要
bq
コマンドでアップロードする方法とpython等のスクリプトでload(アップロード)する方法などがある
pythonでデータをアップロードする
- twitterのランダムサンプル情報(pickle化したもの)をbigqueryにloadする例
- pandasの
pandas.io.json.build_table_schema
関数を使用することでschema情報をダンプすることができる - load(アップロード)に失敗した場合はwebUIから確認できる
具体的な実装例
# ref. https://stackoverflow.com/questions/59681072/can-you-load-json-formatted-data-to-a-bigquery-table-using-python-and-load-table
import glob
import pandas as pd
from pandas.io.json import build_table_schema
from google.cloud import bigquery
import io
from loguru import logger
from tqdm.auto import tqdm
def load_to_bigquery(x: pd.DataFrame):
# BQで読み込み可能なスキーマ情報を、pandasの機能で作成する
# NULLABLE情報を追加
table_schema = build_table_schema(x, index=False)["fields"]
for schema in table_schema:
schema["mode"] = "NULLABLE"
# そのままjson化すると、unixtimeになってしまう
x["created_at"] = x["created_at"].dt.strftime("%Y-%m-%d %H:%M:%S")
data = x.to_json(orient="records", lines=True)
# 末尾のブランクの改行を削除
data = data.strip()
lines = data.split("\n")
# 書き込み先のGCPプロジェクト, dataset, テーブル名を指定
# datasetが存在しない場合、エラーになる
client = bigquery.Client(project="starry-lattice-256603")
dataset = client.dataset("research_gimpei")
table = dataset.table("tmp")
# デフォルトでは追記する形でBQにデータを保存する
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = table_schema
# パーティションを使用しない場合はこの設定は必要ない
job_config.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at", # パーティションに使うカラム
# expiration_ms=7776000000, # 90 days.
)
logger.info(f"table_schema = {table_schema}")
# ファイルとして指定することで読み込むが、io.StringIO経由でも読み込める
job = client.load_table_from_file(io.StringIO(data), table, job_config=job_config)
# 結果を出力
logger.info(f"result = {job.result()}")
# 以下は参考程度
for filename in tqdm(glob.glob("./twitter-random-sample/_agg_tweets/*.pkl")):
x = pd.read_pickle(filename)
x = x[['screen_name', 'name', 'description', 'url', 'created_at', 'text', 'source']]
x["created_at"] = pd.to_datetime(x["created_at"], format="%a %b %d %H:%M:%S +0000 %Y") + pd.DateOffset(hours=9)
load_to_bigquery(x)