• home
  • about
  • 全ての投稿
  • ソフトウェア・ハードウェアの設定のまとめ
  • 分析関連のまとめ
  • ヘルスケア関連のまとめ
  • 生涯学習関連のまとめ

dask

date: 2022-01-02 excerpt: daskの使い方

tag: python3dask


daskの使い方

概要

  • pandasを並列化処理をできるようにしたライブラリ
  • pandasより不安定で公式でも多用しすぎないようにアナウンスされている
  • 定義した動作は遅延評価になるので、最後にcomputeしないと計算されない

インストール

$ python3 -m pip install dask

ユースケース

  • たくさんのcsvを高速に読み込む
  • pandasのdataframeをいくつかに分割してcsv, parquetなどに複数のファイルに出力する

基本的な使い方

csv読み込み

複数のファイルを読み込む時、正規表現か、リストで与える

import dask.dataframe as dd
df = dd.read_csv("*.csv")
df = dd.read_csv(["1.csv", "2.csv", ...])
  • read_pickleはサポートしていない

遅延評価を実行してpandasのDataFrameを得る

df = df.compute()

並列化の方法を指定

with dask.config.set(scheduler="processes"):
    ....
  • "processes"
    • マルチプロセス
  • "threads"
    • スレッドベース

pandasのDataFrameをdaskのDataFrameに変換する

df = dd.from_pandas(df, npartitions=16)
  • npartitions
    • パーティションのサイズ

複数のファイルに分けて保存する

df = dd.from_pandas(df, npartitions=16)
df.to_parquet("<output-dir>")
  • <output-dir>が作成され、partitionの個数のファイルが出力される

具体例

たくさんのcsvファイルを読み込んで高速化する

import dask.dataframe as dd
import dask
from loguru import logger
with dask.config.set(scheduler="processes"):
    logger.info("start to read csv with dask")
    df = dd.read_csv("*.csv")
    df = df.compute()
    logger.info(f"finish to read csv with dask, size = {len(df)}")

# 以下通常のpandasの操作
df = df.rename(columns={"0": "url"})
df = df.drop_duplicates(subset=["url"])
df["digest"] = df["url"].apply(get_digest)


python3dask Share Tweet