daskの使い方
概要
- pandasを並列化処理をできるようにしたライブラリ
- pandasより不安定で公式でも多用しすぎないようにアナウンスされている
- 定義した動作は遅延評価になるので、最後にcomputeしないと計算されない
インストール
$ python3 -m pip install dask
ユースケース
- たくさんのcsvを高速に読み込む
- pandasのdataframeをいくつかに分割してcsv, parquetなどに複数のファイルに出力する
基本的な使い方
csv読み込み
複数のファイルを読み込む時、正規表現か、リストで与える
import dask.dataframe as dd
df = dd.read_csv("*.csv")
df = dd.read_csv(["1.csv", "2.csv", ...])
- read_pickleはサポートしていない
遅延評価を実行してpandasのDataFrameを得る
df = df.compute()
並列化の方法を指定
with dask.config.set(scheduler="processes"):
....
"processes"
- マルチプロセス
"threads"
- スレッドベース
pandasのDataFrameをdaskのDataFrameに変換する
df = dd.from_pandas(df, npartitions=16)
npartitions
- パーティションのサイズ
複数のファイルに分けて保存する
df = dd.from_pandas(df, npartitions=16)
df.to_parquet("<output-dir>")
<output-dir>
が作成され、partitionの個数のファイルが出力される
具体例
たくさんのcsvファイルを読み込んで高速化する
import dask.dataframe as dd
import dask
from loguru import logger
with dask.config.set(scheduler="processes"):
logger.info("start to read csv with dask")
df = dd.read_csv("*.csv")
df = df.compute()
logger.info(f"finish to read csv with dask, size = {len(df)}")
# 以下通常のpandasの操作
df = df.rename(columns={"0": "url"})
df = df.drop_duplicates(subset=["url"])
df["digest"] = df["url"].apply(get_digest)