All You Need Is

ForkJoinPool を使ってみる

2024/12/08
プログラミングJava並列プログラミング

はじめに

最初は CompletableFuture についての記事を書こうとしてたが、その前段として ForkJoinPool について調べていたところ、単独の記事とした方がいいようなボリュームとなった。この記事では CompletableFutureStream で並列処理を記述したときに裏側で使われる ForkJoinPool を用いてプログラムを書いてみた備忘録。

ForkJoinPool とは?

ForkJoinPoolForkJoinTask を実行するための ExecutorServie (Javadoc まま)。

ForkJoinPool では ForkJoinTask を実装したクラスを用いて並列計算を処理していく。標準ライブラリでは ForkJoinTask を実装したクラスとして次の 3 つを提供している。

CountedCompleter は馴染みがないクラスだが Stream の並列処理を実現するために JDK 内部で使われているようだ1 2。本記事では RecursiveActionRecursiveTask について注目する。 RecursiveActionRecursiveTask の違いは並列処理の実行後に計算結果を値として取得できるか否かになる。つまり、計算の主目的が副作用を発生させることであれば RecursiveAction を使い、新しい値を作ることが目的であれば RecursiveTask を使うことになる。

ForkJoinPool の使い方

RecursiveActionRecursiveTask の使い方を見てみよう。

RecursiveAction を使った例

RecursiveAction を使った実装例として Javadoc に載っているソートを見てみよう。 Javadoc のコードは紙面の都合からなのかインデントや行間の空行や iffor の括弧が省略されてしまっているので少し整えて、実装を強制される compute メソッドに @Override アノテーションを追加している。また import 文の追加し、クラス名を SortTask から SortAction に変更している。

1
import java.util.Arrays;
2
import java.util.concurrent.RecursiveAction;
3
4
public class SortAction extends RecursiveAction {
5
6
final long[] array;
7
final int lo, hi;
8
9
SortAction(long[] array, int lo, int hi) {
10
this.array = array;
11
this.lo = lo;
12
this.hi = hi;
13
}
14
15
SortAction(long[] array) {
16
this(array, 0, array.length);
17
}
18
19
@Override
20
protected void compute() {
21
if (hi - lo < THRESHOLD) {
22
sortSequentially(lo, hi);
23
} else {
24
int mid = (lo + hi) >>> 1; // 2 で割るのと同等
25
26
invokeAll(new SortAction(array, lo, mid), new SortAction(array, mid, hi));
27
28
merge(lo, mid, hi);
29
}
30
}
31
32
// implementation details follow:
33
static final int THRESHOLD = 1000;
34
35
void sortSequentially(int lo, int hi) {
36
Arrays.sort(array, lo, hi);
37
}
38
39
void merge(int lo, int mid, int hi) {
40
// array を分割した後ろは再利用している
41
long[] buf = Arrays.copyOfRange(array, lo, mid);
42
for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
43
array[j] = (k == hi || buf[i] < array[k])
44
? buf[i++]
45
: array[k++];
46
}
47
}
48
}

Javadoc では直接的に書かれていないがマージソートと Java 標準ライブラリの sort の組み合わせによってソートしている。要素数が 1000 未満の場合は標準ライブラリを用いてソート行い、それよりも大きい場合は配列を二つの領域に分割して再帰的に自身を呼び出している (インデックスによる分割であり、配列の複製はされていない)。ソートアルゴリズムのパフォーマンスは要素数によって左右されるため、この手の簡単な工夫によって要素数による性能劣化を抑制する手法はコスパがいい。 Arrays#sort()Dual pivot Quicksort で実装されている。

分割された二つの領域について SortAction のコンストラクタにそれぞれの領域を渡すことで再帰的な呼び出しを行い、invokeAll メソッドに渡してそれぞれのタスクをフォークし、実行の完了後に merge メソッドによってソート対象の配列にマージソートと同じ要領でソートされたデータを詰めている。

RecursiveTask

RecursiveAction と違い、RecursiveTask で並列化できる処理の簡単な例としてフィボナッチ数列の n 番目を求めるような問題が思いつくが、既に Qiita にフィボナッチ数列を使った解説があるため、ここではマージソートRecursiveTask を使って実装する。 RecursiveAction の例と一緒になってしまっているが、こちらは非破壊的なソートとなっている。また、要素数によってソートアルゴリズムを切り替えるような工夫はしていない。

1
import java.util.ArrayList;
2
import java.util.Collections;
3
import java.util.Comparator;
4
import java.util.List;
5
import java.util.concurrent.RecursiveTask;
6
7
public class MergeSortTask<T extends Comparable<? super T>, U extends Comparator<T>> extends RecursiveTask<List<T>> {
8
9
/** ソート対象リスト。 */
10
private final List<T> xs;
11
12
private final Comparator<T> comparator;
13
14
public MergeSortTask(List<T> xs, Comparator<T> comparator) {
15
this.xs = xs;
16
this.comparator = comparator;
17
}
18
19
@Override
20
protected List<T> compute() {
21
// リストが空、もしくは要素数が 1 の場合はソート済みとして返す
22
if (xs.size() <= 1) {
23
return xs;
24
}
25
26
// リストを左右に分割
27
final var mid = Math.floorDiv(xs.size(), 2);
28
final var left = new MergeSortTask<>(xs.subList(0, mid), comparator);
29
final var right = new MergeSortTask<>(xs.subList(mid, xs.size()), comparator);
30
31
// 右側のソートの非同期実行を調整
32
right.fork();
33
34
final var sortedLeft = left.compute();
35
final var sortedRight = right.join();
36
37
final var sorted = new ArrayList<T>();
38
int l = 0, r = 0;
39
while (l < sortedLeft.size() && r < sortedRight.size()) {
40
final var x = sortedLeft.get(l);
41
final var y = sortedRight.get(r);
42
43
if (comparator.compare(x, y) <= 0) {
44
sorted.add(x);
45
l++;
46
} else {
47
sorted.add(y);
48
r++;
49
}
50
}
51
52
while (l < sortedLeft.size()) {
53
sorted.add(sortedLeft.get(l++));
54
}
55
56
while (r < sortedRight.size()) {
57
sorted.add(sortedRight.get(r++));
58
}
59
60
return Collections.unmodifiableList(sorted);
61
}
62
}

実装しているアルゴリズムはオーソドックスなマージソートとなっているので特筆すべき点はないだろう。アルゴリズム以外の違いとしては Javadoc に載っている RecursiveAction の例とは異なり、任意の比較可能な型 (Comparable) に対応しているためジェネリクス部分が少しだけ複雑になっていることくらいだろうか。

パフォーマンス

それぞれの実装について実行速度を比較してみよう。実行速度以外にメモリの比較も本来すべきだが今回は時間だけに注目する。

先に紹介した SortActionMergeSortTask の他に Stream#sorted()Stream#parallelStream()#sorted() を比較する。

比較するまでもなく SortAction > Stream#parallelStream()#sorted() > MergeSortTask > Stream#sort() の順だと思うが要素数 100,000,000 (= 1 億) の配列、リストに対して実行した結果を載せておく。

実装実行時間 (s)
Stream#sort()36.263888375
MergeSortTask13.619412583
SortAction1.5119055
Stream#parallelStream()#sorted()4.816473459

ソート対象のデータを生成する部分は計測に含めていない。このことからデータ複製は恐しく高コストだし、並列化は高速化にとって有用であることがわかる。非破壊的なソートを実装する場合は配列を複製して SortAction に渡すのが一番高速になりそうだが、恐らく paralellStream を使う例と同じくらいの実行速度になるだろう。

実行速度は以下の環境で行った。

2024/12/08 時点においてはスペック高めだと思うので一般的な PC であればもっと時間がかかるだろう。高速化の余地があるとは言え 1 億個の要素のソートで 1.5 秒もかかってしまうのかー。C で書いたらもっと早いのかなー。測定の方法が雑なので実際にはもっとよくなるかもしれない。

後述するが今回の測定では parallelism の調整を行っていない。そのため、java - Setting Ideal size of Thread Pool - Stack Overflow を参考にスレッド数を調整するとより高速に行えるか可能性がある。

ForkJoinPool の並列度

今回は ForkJoinPool の使用において何のチューニングも行なわなかった。 ForkJoinPool のコンストラクタではパフォーマンスに影響するパラメータとして、

を引数で指定できる。

ForkJoinPool.java
1
public ForkJoinPool(int parallelism,
2
ForkJoinWorkerThreadFactory factory,
3
UncaughtExceptionHandler handler,
4
boolean asyncMode,
5
int corePoolSize,
6
int maximumPoolSize,
7
int minimumRunnable,
8
Predicate<? super ForkJoinPool> saturate,
9
long keepAliveTime,
10
TimeUnit unit) {
11
checkPermission();
12
int p = parallelism;
13
if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
14
throw new IllegalArgumentException();
15
if (factory == null || unit == null)
16
throw new NullPointerException();
17
this.parallelism = p;
18
this.factory = factory;
19
this.ueh = handler;
20
this.saturate = saturate;
21
this.config = asyncMode ? FIFO : 0;
22
this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
23
int corep = Math.clamp(corePoolSize, p, MAX_CAP);
24
int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
25
int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
26
this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |
27
((long)corep << 32);
28
int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
29
this.registrationLock = new ReentrantLock();
30
this.queues = new WorkQueue[size];
31
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
32
String name = "ForkJoinPool-" + pid;
33
this.workerNamePrefix = name + "-worker-";
34
this.container = SharedThreadContainer.create(name);
35
}

ForkJoinPool の仕組み

ここから書く内容は、ForkJoinPool について学習中に書かれているため誤った情報を含む可能性がある。あくまで筆者の理解ということを念頭に置いて読んで欲しい。

ForkJoinPool は引数無しのコンストラクタで生成したとき、プロセッサ数を parallelism (並列度レベル) としてインスタンスを生成する。 parallelismForkJoinPool が生成するスレッド数と対応する。ドキュメントを読むと並列処理を維持するためにデフォルトでは 256 スレッドが余分に確保されるスレッドの最大数として書かれているが、手元でいくつかコードを書いてみたが parallelism を越える数のスレッドが実行されることを確認できなかった。

ForkJoinPool ではスレッドごとに WorkerQueue を持っており、そのキューにタスクを追加していく。各スレッドの WorkerQueue に入ったタスクは他のスレッドで空きがあるとそれを盗んで (steal) 実行する。そうすることで全てのスレッドにおいて空きが発生し難いようになっている。ワークスティール (work stealing) 方式については http://www.yasugi.ai.kyutech.ac.jp/2012/4/k6.html が参考になる。

もしかしたら WorkerQueue の上限を越えた場合にスレッドが増えるのかもしれない。それであれば理に適っているように感じる。しかし、コードを読めていないので実際のところは不明だ。

ForkJoinPool のアルゴリズムの詳細

ForkJoinPool のソースコードを読むと理解するためには以下の資料を読むといいらしい。

アルゴリズムの概要だけを理解するのは難しくないように思うが、最適化されている部分があるようでそこまで含めて理解するのは時間がかかりそうだ…。

おわりに

この記事では Java の CompletableFutureStream の並列計算をするとき、デフォルトとして使われる ForkJoinPool について学んだ。 ForkJoinPool の使い方や大まかな仕組みについて見ることができたが、ドキュメントとの差異やアルゴリズムの詳細については謎が深まってしまった。

CompletableFuture についての記事を書いた後に ForkJoinPool の詳細に帰ってこようと思う。

Footnotes

  1. repo:openjdk21 CountedCompleter | Code search results

  2. 詳解 Java SE 8 第 22 回 Concurrency Utilities のアップデート その 4 | 日経クロステック(xTECH)


Buy Me A Coffee