JoblibのParallelの全引数を解説

概要

Pythonで並列計算を実施したい時、joblibは最も手軽なライブラリの1つです。本記事では、このjoblibを使った並列計算の実施方法とそのパラメータの効果を整理しました。なお、joblibのバージョンは0.11です。

joblibの使用方法

joblibを使うためには、以下の3手順が必要です。

  1. joblibのインストール
  2. 処理内容のfunction化
  3. 並列計算のコーディング

joblibのインストール

pipを使って簡単にインストールできます。

pip install joblib

ちなみに、マニュアルは以下にあります。
https://media.readthedocs.org/pdf/joblib/latest/joblib.pdf

処理内容のfunction化

並列計算を実行するためには、処理をファンクションにする必要があります。例えば、現状のコードが

#各ループ内でhoge1〜hogenの処理を実施する。
for i in ii:
   hoge1
   hoge2
   ...
   hogen

のようにループ内で書き下されている状態であれば、下記のように処理を関数化し、functionを呼び出すだけでループが回せるように書き換える必要があります。

#hoge1〜hogenをファンクションにまとめる。paramsは適当な引数。
#ここには書いていないが戻り値があるファンクションでももちろん問題なし。
def function(params, i):
    hoge1
    hoge2
    ...
    hogen

#ループ内はファンクションの呼び出しのみ。
for i in ii:
   function(params, i)

並列計算のコーディング

最後に以下のようにコーディングすれば並列計算を実行することができ、2並列(n_jobs)で計算が実行されます。並列計算のコード自体はたったの1行であることが最大の特徴です。なお、Windowsの場合、if __name__…の処理がないとフリーズするので注意してください。

#ライブラリのインポート
from joblib import Parallel, delayed

#ファンクションの定義は従来通り
def function(params, i):
    hoge1
    hoge2
    ...
    hogen

if __name__ == "__main__":
    #Parallel(パラメータ)(delayed(関数)(関数の引数) for ループ条件) という書き方
    Parallel(n_jobs=2)(delayed(function)(params, i) for i in ii)

引数一覧

joblibのParallelには以下の9引数があります。以降、それぞれについて説明していきます。

  1. n_jobs
  2. backend
  3. verbose
  4. timeout
  5. pre_dispatch
  6. batch_size
  7. temp_folder
  8. max_nbytes
  9. mmap_mode

n_jobs

並列処理数を指定するパラメータ。デフォルトは1(並列処理なし)。この数を大きくするほど並列数が増え、処理速度が早くなりますが、当然CPUの数以上に指定してもそれ以上は計算速度は上がりません。
-1を指定するとコア数と等しい(=すべてのコアを使用)並列数となります。-2以下も指定することができ、(n_cpus + 1 + n_jobs)の並列数となります。これは、-2を指定した場合、1コア残して残りすべてを使う、-3を指定した場合、2コア残して残りすべてを使う、ことを示します。システム開発の時などでデータベースやウェブアプリケーション等、別にCPUを使う場合に便利です。
以下のようなサンプルでコア数を変えて実行してみました。

#ライブラリのインポート
from joblib import Parallel, delayed
import numpy as np
from time import time

#dataの全要素の足し合わせを100回行い、valを足す関数
def testfunc(data, val):
    temp = val
    for j in range(100):
        for i in range(len(data)):
            temp += data[i]      
    return temp

if __name__ == "__main__":
    #dataは長さ1000の配列とする
    target = np.arange(0, 1000, 1)
    #計算時間測定開始
    start = time()
    #並列処理
    result = Parallel(n_jobs=2)(delayed(testfunc)(target, i) for i in range(1000))
    #計算時間出力
    print('{:.3f}秒'.format(time() - start))

結果は以下。コア数が2の場合、計算時間がおおよそ半減していることが分かり、並列計算の効果を確認できました。

  • 1コアの場合:15.376秒
  • 2コアの場合:8.063秒
  • backend

    マルチプロセスとするか、マルチスレッドにするか、を指定できます。デフォルトはマルチプロセスです。マルチプロセスとスレッドの違いは下記のサイト等が参考になるかと思います。
    https://qiita.com/shotaTsuge/items/0ad41fcee63a00a52f68
    基本的な考え方は、並列計算で引数に渡すパラメータを独立に使うものはマルチプロセス、共有して使うものはマルチスレッドが有効です。例えば、以下の例のように、共通のパラメータが非常にサイズが大きく、独立に扱うとメモリの専有やオーバヘッド(並列計算の準備にかかる時間)が大きくなる場合、処理のパラメータの上書きがないのであれば、マルチスレッドを指定すると結果はそのままで、計算速度が改善する可能性があります。

    #ファンクションは和を返すだけ
    def testfunc2(data, val):
        return data.sum()
    
    #極端にサイズの大きいデータを使用
    target = np.arange(0, 500000000, 1)
    #通常の並列計算(マルチプロセス)
    Parallel(n_jobs=4)(delayed(testfunc2)(target, i) for i in range(4))
    #threadingを指定する場合
    Parallel(n_jobs=4, backend="threading")(delayed(testfunc2)(target, i) for i in range(4))
    

    結果は以下。マルチスレッドの場合、計算時間(オーバヘッドの時間)が1/60程度に短縮しています。joblibが遅いと思った時に有効かもしれません。

  • マルチプロセスの場合:67.247秒
  • マルチスレッドの場合:1.108秒
  • verbose

    並列計算のログの頻度を指定するパラメータ。デフォルトは0(ログ無し)。1からログが出るようになり、10が最も高頻度になります。また、51より大きい値を指定すると結果がログに出るようになります。動作を確認したい時に有効です。

    #最高頻度の場合
     Parallel(n_jobs=2, verbose=10)(delayed(testfunc)(target, i) for i in range(100))
    #結果
    [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    1.7s
    [Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    3.2s
    [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    7.9s finished
    
    #結果も出力させる場合
     Parallel(n_jobs=2, verbose=100)(delayed(testfunc)(target, i) for i in range(100))
    #結果
    Pickling array (shape=(100000,), dtype=int64).
    Pickling array (shape=(100000,), dtype=int64).
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    1.7s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    1.7s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    3.2s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    3.2s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    4.8s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    4.8s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    6.4s
    Pickling array (shape=(100000,), dtype=int64).
    [Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:    6.5s remaining:    1.6s
    [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    8.6s remaining:    0.0s
    [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    8.6s finished
    

    timeout

    タスクが所定の時間を超えた場合にエラーを返すためのパラメータ(単位は秒だと思います。)。デフォルトはNone(時間制限無し)。エラーを検知した場合、そのタスクだけを落とすわけではなく、処理全体が落ちてしまうようです。使いどころは少し微妙ですね。なお、n_jobsが1の時は機能しません。

    #valが大きくなるほど処理が長くなるような関数を作成
    def testfunc(data, val):
        temp = 0
        for j in range(1000000):
            for i in range(val):
                temp += data[i]      
        return temp
    
    #valがだんだん大きくなるような計算を回す
    if __name__ == "__main__":
        target = np.arange(0, 100000, 1)
        start = time()
        #タイムアウトは1秒に設定
        result = Parallel(n_jobs=2, verbose=10, timeout=1)(delayed(testfunc)(target, i) for i in range(100))
        print('{:.3f}秒'.format(time() - start))
    

    途中で処理が1秒を超えたためエラーが発生し、処理が停止しました。

    [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.3s
    [Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    1.2s
    [Parallel(n_jobs=2)]: Done   9 tasks      | elapsed:    4.7s
    Traceback (most recent call last):
        raise TimeoutError
    TimeoutError
    

    pre_dispatch

    タスクに計算資源が割り当てられる前に、何個のタスクを生成させておくか、というパラメータ。デフォルトは2*n_jobs(並列数の2倍)。この値がreasonableと書かれており、あまり調整する必要はなさそうです。パラメータを変えて検証した感じでも、1等の極端に小さな値にしない限り、計算速度に大きな影響はなさそうでした。

    batch_size

    1プロセスにいくつのタスクを割り当てるかを設定するパラメータ。デフォルトはauto(自動)。この値が小さすぎると、処理が早い時にタスクの生成が間に合わなくなる等で全体の処理が遅くなってしまいますが、autoにしておけば自動でバッチサイズを大きく調整してくれます。下記の例では、バッチサイズが36に調整されています。この値も特に調整する必要はなさそうですね。

    [Parallel(n_jobs=4)]: Batch computation too fast (0.0107s.) Setting batch_size=36.
    [Parallel(n_jobs=4)]: Done  12 out of 100 | elapsed:    0.1s remaining:    0.8s
    [Parallel(n_jobs=4)]: Done  23 out of 100 | elapsed:    0.3s remaining:    0.9s
    

    temp_folder

    各タスクにデータを渡すときに一定サイズ(max_nbytes)を超える場合、ここで指定したtemp_folderに保存して共有されるようになります(多分)。自動にしているとシステムのテンポラリフォルダ等が使われるようです。基本的に変更の必要はないのですが、まれに大容量のファイルを多数の並列計算で展開するときは、以下に示す「IOError: [Errno 28] No space left on device」というエラーが発生することがあります。

    IOError: [Errno 28] No space left on device が発生した時

    テンポラリファイル等では容量が不足していることが原因ですので、保存ファイルを別のフォルダに変えましょう。

    #tempフォルダに保存するように設定する
    Parallel(n_jobs=4, verbose=10, temp_folder='temp')(delayed(testfunc2)(target, i) for i in range(20))
    

    メッセージを鵜呑みにして、HD容量を削減したり、PCを再起動しても直りませんので気をつけてください。(私はハマりました)

    max_nbytes

    temp_folderに指定するしきい値を設定するパラメータ。デフォルトは1M(メガバイト)。大容量のメモリを搭載している時など、メモリに乗せて処理したいときは大きめの値を設定すると処理が早くなるかもしれません。

    mmap_mode

    上記の保存時のオプションで、デフォルトは’r’。候補は{None, ‘r+’, ‘r’, ‘w+’, ‘c’}とのこと。read, write, closeとかでしょうか。ドキュメントを呼んでもちょっと良くわかりませんでした。

    まとめ

    Pythonで並列計算を実施したい時に便利なjoblibのパラメータについて検証しました。基本的には「n_jobs」と「verbose」だけ使えば良いと思いますが、ファイルサイズが大きくなってくると、「backend」や「temp_folder」の出番も出てくると考えられます。

    コメントを残す

    メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

    CAPTCHA