ForkJoinPool を使ってみる
はじめに
最初は CompletableFuture
についての記事を書こうとしてたが、その前段として ForkJoinPool
について調べていたところ、単独の記事とした方がいいようなボリュームとなった。この記事では CompletableFuture
や Stream
で並列処理を記述したときに裏側で使われる ForkJoinPool
を用いてプログラムを書いてみた備忘録。
ForkJoinPool
とは?
ForkJoinPool
は ForkJoinTask
を実行するための ExecutorServie
(Javadoc まま)。
ForkJoinPool
では ForkJoinTask
を実装したクラスを用いて並列計算を処理していく。標準ライブラリでは ForkJoinTask
を実装したクラスとして次の 3 つを提供している。
CountedCompleter
は馴染みがないクラスだが Stream
の並列処理を実現するために JDK 内部で使われているようだ1 2。本記事では RecursiveAction
と RecursiveTask
について注目する。
RecursiveAction
と RecursiveTask
の違いは並列処理の実行後に計算結果を値として取得できるか否かになる。つまり、計算の主目的が副作用を発生させることであれば RecursiveAction
を使い、新しい値を作ることが目的であれば RecursiveTask
を使うことになる。
ForkJoinPool
の使い方
RecursiveAction
と RecursiveTask
の使い方を見てみよう。
RecursiveAction
を使った例
RecursiveAction
を使った実装例として Javadoc に載っているソートを見てみよう。
Javadoc のコードは紙面の都合からなのかインデントや行間の空行や if
、for
の括弧が省略されてしまっているので少し整えて、実装を強制される compute
メソッドに @Override
アノテーションを追加している。また import
文の追加し、クラス名を SortTask
から SortAction
に変更している。
1import java.util.Arrays;2import java.util.concurrent.RecursiveAction;34public class SortAction extends RecursiveAction {56final long[] array;7final int lo, hi;89SortAction(long[] array, int lo, int hi) {10this.array = array;11this.lo = lo;12this.hi = hi;13}1415SortAction(long[] array) {16this(array, 0, array.length);17}1819@Override20protected void compute() {21if (hi - lo < THRESHOLD) {22sortSequentially(lo, hi);23} else {24int mid = (lo + hi) >>> 1; // 2 で割るのと同等2526invokeAll(new SortAction(array, lo, mid), new SortAction(array, mid, hi));2728merge(lo, mid, hi);29}30}3132// implementation details follow:33static final int THRESHOLD = 1000;3435void sortSequentially(int lo, int hi) {36Arrays.sort(array, lo, hi);37}3839void merge(int lo, int mid, int hi) {40// array を分割した後ろは再利用している41long[] buf = Arrays.copyOfRange(array, lo, mid);42for (int i = 0, j = lo, k = mid; i < buf.length; j++) {43array[j] = (k == hi || buf[i] < array[k])44? buf[i++]45: array[k++];46}47}48}
Javadoc では直接的に書かれていないがマージソートと Java 標準ライブラリの sort
の組み合わせによってソートしている。要素数が 1000
未満の場合は標準ライブラリを用いてソート行い、それよりも大きい場合は配列を二つの領域に分割して再帰的に自身を呼び出している (インデックスによる分割であり、配列の複製はされていない)。ソートアルゴリズムのパフォーマンスは要素数によって左右されるため、この手の簡単な工夫によって要素数による性能劣化を抑制する手法はコスパがいい。
Arrays#sort()
は Dual pivot Quicksort で実装されている。
分割された二つの領域について SortAction
のコンストラクタにそれぞれの領域を渡すことで再帰的な呼び出しを行い、invokeAll
メソッドに渡してそれぞれのタスクをフォークし、実行の完了後に merge
メソッドによってソート対象の配列にマージソートと同じ要領でソートされたデータを詰めている。
RecursiveTask
RecursiveAction
と違い、RecursiveTask
で並列化できる処理の簡単な例としてフィボナッチ数列の n
番目を求めるような問題が思いつくが、既に Qiita にフィボナッチ数列を使った解説があるため、ここではマージソートを RecursiveTask
を使って実装する。
RecursiveAction
の例と一緒になってしまっているが、こちらは非破壊的なソートとなっている。また、要素数によってソートアルゴリズムを切り替えるような工夫はしていない。
1import java.util.ArrayList;2import java.util.Collections;3import java.util.Comparator;4import java.util.List;5import java.util.concurrent.RecursiveTask;67public class MergeSortTask<T extends Comparable<? super T>, U extends Comparator<T>> extends RecursiveTask<List<T>> {89/** ソート対象リスト。 */10private final List<T> xs;1112private final Comparator<T> comparator;1314public MergeSortTask(List<T> xs, Comparator<T> comparator) {15this.xs = xs;16this.comparator = comparator;17}1819@Override20protected List<T> compute() {21// リストが空、もしくは要素数が 1 の場合はソート済みとして返す22if (xs.size() <= 1) {23return xs;24}2526// リストを左右に分割27final var mid = Math.floorDiv(xs.size(), 2);28final var left = new MergeSortTask<>(xs.subList(0, mid), comparator);29final var right = new MergeSortTask<>(xs.subList(mid, xs.size()), comparator);3031// 右側のソートの非同期実行を調整32right.fork();3334final var sortedLeft = left.compute();35final var sortedRight = right.join();3637final var sorted = new ArrayList<T>();38int l = 0, r = 0;39while (l < sortedLeft.size() && r < sortedRight.size()) {40final var x = sortedLeft.get(l);41final var y = sortedRight.get(r);4243if (comparator.compare(x, y) <= 0) {44sorted.add(x);45l++;46} else {47sorted.add(y);48r++;49}50}5152while (l < sortedLeft.size()) {53sorted.add(sortedLeft.get(l++));54}5556while (r < sortedRight.size()) {57sorted.add(sortedRight.get(r++));58}5960return Collections.unmodifiableList(sorted);61}62}
実装しているアルゴリズムはオーソドックスなマージソートとなっているので特筆すべき点はないだろう。アルゴリズム以外の違いとしては Javadoc に載っている RecursiveAction
の例とは異なり、任意の比較可能な型 (Comparable
) に対応しているためジェネリクス部分が少しだけ複雑になっていることくらいだろうか。
パフォーマンス
それぞれの実装について実行速度を比較してみよう。実行速度以外にメモリの比較も本来すべきだが今回は時間だけに注目する。
先に紹介した SortAction
、MergeSortTask
の他に Stream#sorted()
、 Stream#parallelStream()#sorted()
を比較する。
比較するまでもなく SortAction
> Stream#parallelStream()#sorted()
> MergeSortTask
> Stream#sort()
の順だと思うが要素数 100,000,000 (= 1 億) の配列、リストに対して実行した結果を載せておく。
実装 | 実行時間 (s) |
---|---|
Stream#sort() | 36.263888375 |
MergeSortTask | 13.619412583 |
SortAction | 1.5119055 |
Stream#parallelStream()#sorted() | 4.816473459 |
ソート対象のデータを生成する部分は計測に含めていない。このことからデータ複製は恐しく高コストだし、並列化は高速化にとって有用であることがわかる。非破壊的なソートを実装する場合は配列を複製して SortAction
に渡すのが一番高速になりそうだが、恐らく paralellStream
を使う例と同じくらいの実行速度になるだろう。
実行速度は以下の環境で行った。
- Mac mini 2024
- チップ: Apple M4 Pro
- CPU: 14 コア
- GPU: 16 コア
- メモリ: 64 GB
- SSD: 1 TB
- macOS: Squoia 15.1
2024/12/08 時点においてはスペック高めだと思うので一般的な PC であればもっと時間がかかるだろう。高速化の余地があるとは言え 1 億個の要素のソートで 1.5 秒もかかってしまうのかー。C で書いたらもっと早いのかなー。測定の方法が雑なので実際にはもっとよくなるかもしれない。
後述するが今回の測定では parallelism
の調整を行っていない。そのため、java - Setting Ideal size of Thread Pool - Stack Overflow を参考にスレッド数を調整するとより高速に行えるか可能性がある。
ForkJoinPool
の並列度
今回は ForkJoinPool
の使用において何のチューニングも行なわなかった。
ForkJoinPool
のコンストラクタではパフォーマンスに影響するパラメータとして、
int parallelism
int corePoolSize
int maximumPoolSize
int minimumRunnable
long keepAliveTime
を引数で指定できる。
1public ForkJoinPool(int parallelism,2ForkJoinWorkerThreadFactory factory,3UncaughtExceptionHandler handler,4boolean asyncMode,5int corePoolSize,6int maximumPoolSize,7int minimumRunnable,8Predicate<? super ForkJoinPool> saturate,9long keepAliveTime,10TimeUnit unit) {11checkPermission();12int p = parallelism;13if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)14throw new IllegalArgumentException();15if (factory == null || unit == null)16throw new NullPointerException();17this.parallelism = p;18this.factory = factory;19this.ueh = handler;20this.saturate = saturate;21this.config = asyncMode ? FIFO : 0;22this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);23int corep = Math.clamp(corePoolSize, p, MAX_CAP);24int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);25int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);26this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |27((long)corep << 32);28int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));29this.registrationLock = new ReentrantLock();30this.queues = new WorkQueue[size];31String pid = Integer.toString(getAndAddPoolIds(1) + 1);32String name = "ForkJoinPool-" + pid;33this.workerNamePrefix = name + "-worker-";34this.container = SharedThreadContainer.create(name);35}
ForkJoinPool
の仕組み
ここから書く内容は、ForkJoinPool
について学習中に書かれているため誤った情報を含む可能性がある。あくまで筆者の理解ということを念頭に置いて読んで欲しい。
ForkJoinPool
は引数無しのコンストラクタで生成したとき、プロセッサ数を parallelism
(並列度レベル) としてインスタンスを生成する。
parallelism
は ForkJoinPool
が生成するスレッド数と対応する。ドキュメントを読むと並列処理を維持するためにデフォルトでは 256 スレッドが余分に確保されるスレッドの最大数として書かれているが、手元でいくつかコードを書いてみたが parallelism
を越える数のスレッドが実行されることを確認できなかった。
ForkJoinPool
ではスレッドごとに WorkerQueue
を持っており、そのキューにタスクを追加していく。各スレッドの WorkerQueue
に入ったタスクは他のスレッドで空きがあるとそれを盗んで (steal) 実行する。そうすることで全てのスレッドにおいて空きが発生し難いようになっている。ワークスティール (work stealing) 方式については http://www.yasugi.ai.kyutech.ac.jp/2012/4/k6.html が参考になる。
もしかしたら WorkerQueue
の上限を越えた場合にスレッドが増えるのかもしれない。それであれば理に適っているように感じる。しかし、コードを読めていないので実際のところは不明だ。
ForkJoinPool
のアルゴリズムの詳細
ForkJoinPool
のソースコードを読むと理解するためには以下の資料を読むといいらしい。
- Herlihy and Shavit's book "The Art of Multiprocessor programming", chapter 16
- "DynamicCircular Work-Stealing Deque" by Chase and Lev, SPAA 2005
- リンク切れてる 😄
- "Idempotent work stealing" by Michael, Saraswat, and Vechev, PPoPP 2009
- 論文のタイトルでググると PDF が出てきますが著者がアップロードしているものではないように見えるのでリンクは貼らない
- "Correct and Efficient Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
アルゴリズムの概要だけを理解するのは難しくないように思うが、最適化されている部分があるようでそこまで含めて理解するのは時間がかかりそうだ…。
おわりに
この記事では Java の CompletableFuture
や Stream
の並列計算をするとき、デフォルトとして使われる ForkJoinPool
について学んだ。
ForkJoinPool
の使い方や大まかな仕組みについて見ることができたが、ドキュメントとの差異やアルゴリズムの詳細については謎が深まってしまった。
CompletableFuture
についての記事を書いた後に ForkJoinPool
の詳細に帰ってこようと思う。