概要
Pythonで並列計算を実施したい時、joblibは最も手軽なライブラリの1つです。本記事では、このjoblibを使った並列計算の実施方法とそのパラメータの効果を整理しました。なお、joblibのバージョンは0.11です。
joblibの使用方法
joblibを使うためには、以下の3手順が必要です。
- joblibのインストール
- 処理内容のfunction化
- 並列計算のコーディング
joblibのインストール
pipを使って簡単にインストールできます。
pip install joblib
ちなみに、マニュアルは以下にあります。
https://media.readthedocs.org/pdf/joblib/latest/joblib.pdf
処理内容のfunction化
並列計算を実行するためには、処理をファンクションにする必要があります。例えば、現状のコードが
#各ループ内でhoge1〜hogenの処理を実施する。 for i in ii: hoge1 hoge2 ... hogen
のようにループ内で書き下されている状態であれば、下記のように処理を関数化し、functionを呼び出すだけでループが回せるように書き換える必要があります。
#hoge1〜hogenをファンクションにまとめる。paramsは適当な引数。 #ここには書いていないが戻り値があるファンクションでももちろん問題なし。 def function(params, i): hoge1 hoge2 ... hogen #ループ内はファンクションの呼び出しのみ。 for i in ii: function(params, i)
並列計算のコーディング
最後に以下のようにコーディングすれば並列計算を実行することができ、2並列(n_jobs)で計算が実行されます。並列計算のコード自体はたったの1行であることが最大の特徴です。なお、Windowsの場合、if __name__…の処理がないとフリーズするので注意してください。
#ライブラリのインポート from joblib import Parallel, delayed #ファンクションの定義は従来通り def function(params, i): hoge1 hoge2 ... hogen if __name__ == "__main__": #Parallel(パラメータ)(delayed(関数)(関数の引数) for ループ条件) という書き方 Parallel(n_jobs=2)(delayed(function)(params, i) for i in ii)
引数一覧
joblibのParallelには以下の9引数があります。以降、それぞれについて説明していきます。
- n_jobs
- backend
- verbose
- timeout
- pre_dispatch
- batch_size
- temp_folder
- max_nbytes
- mmap_mode
n_jobs
並列処理数を指定するパラメータ。デフォルトは1(並列処理なし)。この数を大きくするほど並列数が増え、処理速度が早くなりますが、当然CPUの数以上に指定してもそれ以上は計算速度は上がりません。
-1を指定するとコア数と等しい(=すべてのコアを使用)並列数となります。-2以下も指定することができ、(n_cpus + 1 + n_jobs)の並列数となります。これは、-2を指定した場合、1コア残して残りすべてを使う、-3を指定した場合、2コア残して残りすべてを使う、ことを示します。システム開発の時などでデータベースやウェブアプリケーション等、別にCPUを使う場合に便利です。
以下のようなサンプルでコア数を変えて実行してみました。
#ライブラリのインポート from joblib import Parallel, delayed import numpy as np from time import time #dataの全要素の足し合わせを100回行い、valを足す関数 def testfunc(data, val): temp = val for j in range(100): for i in range(len(data)): temp += data[i] return temp if __name__ == "__main__": #dataは長さ1000の配列とする target = np.arange(0, 1000, 1) #計算時間測定開始 start = time() #並列処理 result = Parallel(n_jobs=2)(delayed(testfunc)(target, i) for i in range(1000)) #計算時間出力 print('{:.3f}秒'.format(time() - start))
結果は以下。コア数が2の場合、計算時間がおおよそ半減していることが分かり、並列計算の効果を確認できました。
backend
マルチプロセスとするか、マルチスレッドにするか、を指定できます。デフォルトはマルチプロセスです。マルチプロセスとスレッドの違いは下記のサイト等が参考になるかと思います。
https://qiita.com/shotaTsuge/items/0ad41fcee63a00a52f68
基本的な考え方は、並列計算で引数に渡すパラメータを独立に使うものはマルチプロセス、共有して使うものはマルチスレッドが有効です。例えば、以下の例のように、共通のパラメータが非常にサイズが大きく、独立に扱うとメモリの専有やオーバヘッド(並列計算の準備にかかる時間)が大きくなる場合、処理のパラメータの上書きがないのであれば、マルチスレッドを指定すると結果はそのままで、計算速度が改善する可能性があります。
#ファンクションは和を返すだけ def testfunc2(data, val): return data.sum() #極端にサイズの大きいデータを使用 target = np.arange(0, 500000000, 1) #通常の並列計算(マルチプロセス) Parallel(n_jobs=4)(delayed(testfunc2)(target, i) for i in range(4)) #threadingを指定する場合 Parallel(n_jobs=4, backend="threading")(delayed(testfunc2)(target, i) for i in range(4))
結果は以下。マルチスレッドの場合、計算時間(オーバヘッドの時間)が1/60程度に短縮しています。joblibが遅いと思った時に有効かもしれません。
verbose
並列計算のログの頻度を指定するパラメータ。デフォルトは0(ログ無し)。1からログが出るようになり、10が最も高頻度になります。また、51より大きい値を指定すると結果がログに出るようになります。動作を確認したい時に有効です。
#最高頻度の場合 Parallel(n_jobs=2, verbose=10)(delayed(testfunc)(target, i) for i in range(100)) #結果 [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 1.7s [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 3.2s [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 7.9s finished
#結果も出力させる場合 Parallel(n_jobs=2, verbose=100)(delayed(testfunc)(target, i) for i in range(100)) #結果 Pickling array (shape=(100000,), dtype=int64). Pickling array (shape=(100000,), dtype=int64). Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 1.7s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 2 tasks | elapsed: 1.7s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 3 tasks | elapsed: 3.2s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 3.2s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 5 tasks | elapsed: 4.8s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 6 tasks | elapsed: 4.8s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 7 tasks | elapsed: 6.4s Pickling array (shape=(100000,), dtype=int64). [Parallel(n_jobs=2)]: Done 8 out of 10 | elapsed: 6.5s remaining: 1.6s [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 8.6s remaining: 0.0s [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 8.6s finished
timeout
タスクが所定の時間を超えた場合にエラーを返すためのパラメータ(単位は秒だと思います。)。デフォルトはNone(時間制限無し)。エラーを検知した場合、そのタスクだけを落とすわけではなく、処理全体が落ちてしまうようです。使いどころは少し微妙ですね。なお、n_jobsが1の時は機能しません。
#valが大きくなるほど処理が長くなるような関数を作成 def testfunc(data, val): temp = 0 for j in range(1000000): for i in range(val): temp += data[i] return temp #valがだんだん大きくなるような計算を回す if __name__ == "__main__": target = np.arange(0, 100000, 1) start = time() #タイムアウトは1秒に設定 result = Parallel(n_jobs=2, verbose=10, timeout=1)(delayed(testfunc)(target, i) for i in range(100)) print('{:.3f}秒'.format(time() - start))
途中で処理が1秒を超えたためエラーが発生し、処理が停止しました。
[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.3s [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 1.2s [Parallel(n_jobs=2)]: Done 9 tasks | elapsed: 4.7s Traceback (most recent call last): raise TimeoutError TimeoutError
pre_dispatch
タスクに計算資源が割り当てられる前に、何個のタスクを生成させておくか、というパラメータ。デフォルトは2*n_jobs(並列数の2倍)。この値がreasonableと書かれており、あまり調整する必要はなさそうです。パラメータを変えて検証した感じでも、1等の極端に小さな値にしない限り、計算速度に大きな影響はなさそうでした。
batch_size
1プロセスにいくつのタスクを割り当てるかを設定するパラメータ。デフォルトはauto(自動)。この値が小さすぎると、処理が早い時にタスクの生成が間に合わなくなる等で全体の処理が遅くなってしまいますが、autoにしておけば自動でバッチサイズを大きく調整してくれます。下記の例では、バッチサイズが36に調整されています。この値も特に調整する必要はなさそうですね。
[Parallel(n_jobs=4)]: Batch computation too fast (0.0107s.) Setting batch_size=36. [Parallel(n_jobs=4)]: Done 12 out of 100 | elapsed: 0.1s remaining: 0.8s [Parallel(n_jobs=4)]: Done 23 out of 100 | elapsed: 0.3s remaining: 0.9s
temp_folder
各タスクにデータを渡すときに一定サイズ(max_nbytes)を超える場合、ここで指定したtemp_folderに保存して共有されるようになります(多分)。自動にしているとシステムのテンポラリフォルダ等が使われるようです。基本的に変更の必要はないのですが、まれに大容量のファイルを多数の並列計算で展開するときは、以下に示す「IOError: [Errno 28] No space left on device」というエラーが発生することがあります。
IOError: [Errno 28] No space left on device が発生した時
テンポラリファイル等では容量が不足していることが原因ですので、保存ファイルを別のフォルダに変えましょう。
#tempフォルダに保存するように設定する Parallel(n_jobs=4, verbose=10, temp_folder='temp')(delayed(testfunc2)(target, i) for i in range(20))
メッセージを鵜呑みにして、HD容量を削減したり、PCを再起動しても直りませんので気をつけてください。(私はハマりました)
max_nbytes
temp_folderに指定するしきい値を設定するパラメータ。デフォルトは1M(メガバイト)。大容量のメモリを搭載している時など、メモリに乗せて処理したいときは大きめの値を設定すると処理が早くなるかもしれません。
mmap_mode
上記の保存時のオプションで、デフォルトは’r’。候補は{None, ‘r+’, ‘r’, ‘w+’, ‘c’}とのこと。read, write, closeとかでしょうか。ドキュメントを呼んでもちょっと良くわかりませんでした。
まとめ
Pythonで並列計算を実施したい時に便利なjoblibのパラメータについて検証しました。基本的には「n_jobs」と「verbose」だけ使えば良いと思いますが、ファイルサイズが大きくなってくると、「backend」や「temp_folder」の出番も出てくると考えられます。