ForkJoinPool を使ってみる

2024/12/08

はじめに

最初は CompletableFuture についての記事を書こうとしてたが、その前段として ForkJoinPool について調べていたところ、単独の記事とした方がいいようなボリュームとなった。この記事では CompletableFutureStream で並列処理を記述したときに裏側で使われる ForkJoinPool を用いてプログラムを書いてみた備忘録。

ForkJoinPool とは?

ForkJoinPoolForkJoinTask を実行するための ExecutorServie (Javadoc まま)。

ForkJoinPool では ForkJoinTask を実装したクラスを用いて並列計算を処理していく。標準ライブラリでは ForkJoinTask を実装したクラスとして次の 3 つを提供している。

CountedCompleter は馴染みがないクラスだが Stream の並列処理を実現するために JDK 内部で使われているようだ1 2。本記事では RecursiveActionRecursiveTask について注目する。 RecursiveActionRecursiveTask の違いは並列処理の実行後に計算結果を値として取得できるか否かになる。つまり、計算の主目的が副作用を発生させることであれば RecursiveAction を使い、新しい値を作ることが目的であれば RecursiveTask を使うことになる。

ForkJoinPool の使い方

RecursiveActionRecursiveTask の使い方を見てみよう。

RecursiveAction を使った例

RecursiveAction を使った実装例として Javadoc に載っているソートを見てみよう。 Javadoc のコードは紙面の都合からなのかインデントや行間の空行や iffor の括弧が省略されてしまっているので少し整えて、実装を強制される compute メソッドに @Override アノテーションを追加している。また import 文の追加し、クラス名を SortTask から SortAction に変更している。

Loading code...

Javadoc では直接的に書かれていないがマージソートと Java 標準ライブラリの sort の組み合わせによってソートしている。要素数が 1000 未満の場合は標準ライブラリを用いてソート行い、それよりも大きい場合は配列を二つの領域に分割して再帰的に自身を呼び出している (インデックスによる分割であり、配列の複製はされていない)。ソートアルゴリズムのパフォーマンスは要素数によって左右されるため、この手の簡単な工夫によって要素数による性能劣化を抑制する手法はコスパがいい。 Arrays#sort()Dual pivot Quicksort で実装されている。

分割された二つの領域について SortAction のコンストラクタにそれぞれの領域を渡すことで再帰的な呼び出しを行い、invokeAll メソッドに渡してそれぞれのタスクをフォークし、実行の完了後に merge メソッドによってソート対象の配列にマージソートと同じ要領でソートされたデータを詰めている。

RecursiveTask

RecursiveAction と違い、RecursiveTask で並列化できる処理の簡単な例としてフィボナッチ数列の n 番目を求めるような問題が思いつくが、既に Qiita にフィボナッチ数列を使った解説があるため、ここではマージソートRecursiveTask を使って実装する。 RecursiveAction の例と一緒になってしまっているが、こちらは非破壊的なソートとなっている。また、要素数によってソートアルゴリズムを切り替えるような工夫はしていない。

Loading code...

実装しているアルゴリズムはオーソドックスなマージソートとなっているので特筆すべき点はないだろう。アルゴリズム以外の違いとしては Javadoc に載っている RecursiveAction の例とは異なり、任意の比較可能な型 (Comparable) に対応しているためジェネリクス部分が少しだけ複雑になっていることくらいだろうか。

パフォーマンス

それぞれの実装について実行速度を比較してみよう。実行速度以外にメモリの比較も本来すべきだが今回は時間だけに注目する。

先に紹介した SortActionMergeSortTask の他に Stream#sorted()Stream#parallelStream()#sorted() を比較する。

比較するまでもなく SortAction > Stream#parallelStream()#sorted() > MergeSortTask > Stream#sort() の順だと思うが要素数 100,000,000 (= 1 億) の配列、リストに対して実行した結果を載せておく。

実装実行時間 (s)
Stream#sort()36.263888375
MergeSortTask13.619412583
SortAction1.5119055
Stream#parallelStream()#sorted()4.816473459

ソート対象のデータを生成する部分は計測に含めていない。このことからデータ複製は恐しく高コストだし、並列化は高速化にとって有用であることがわかる。非破壊的なソートを実装する場合は配列を複製して SortAction に渡すのが一番高速になりそうだが、恐らく paralellStream を使う例と同じくらいの実行速度になるだろう。

実行速度は以下の環境で行った。

  • Mac mini 2024
    • チップ: Apple M4 Pro
    • CPU: 14 コア
    • GPU: 16 コア
    • メモリ: 64 GB
    • SSD: 1 TB
    • macOS: Squoia 15.1

2024/12/08 時点においてはスペック高めだと思うので一般的な PC であればもっと時間がかかるだろう。高速化の余地があるとは言え 1 億個の要素のソートで 1.5 秒もかかってしまうのかー。C で書いたらもっと早いのかなー。測定の方法が雑なので実際にはもっとよくなるかもしれない。

後述するが今回の測定では parallelism の調整を行っていない。そのため、java - Setting Ideal size of Thread Pool - Stack Overflow を参考にスレッド数を調整するとより高速に行えるか可能性がある。

ForkJoinPool の並列度

今回は ForkJoinPool の使用において何のチューニングも行なわなかった。 ForkJoinPool のコンストラクタではパフォーマンスに影響するパラメータとして、

  • int parallelism
  • int corePoolSize
  • int maximumPoolSize
  • int minimumRunnable
  • long keepAliveTime

を引数で指定できる。

Loading code...

ForkJoinPool の仕組み

ここから書く内容は、ForkJoinPool について学習中に書かれているため誤った情報を含む可能性がある。あくまで筆者の理解ということを念頭に置いて読んで欲しい。

ForkJoinPool は引数無しのコンストラクタで生成したとき、プロセッサ数を parallelism (並列度レベル) としてインスタンスを生成する。 parallelismForkJoinPool が生成するスレッド数と対応する。ドキュメントを読むと並列処理を維持するためにデフォルトでは 256 スレッドが余分に確保されるスレッドの最大数として書かれているが、手元でいくつかコードを書いてみたが parallelism を越える数のスレッドが実行されることを確認できなかった。

ForkJoinPool ではスレッドごとに WorkerQueue を持っており、そのキューにタスクを追加していく。各スレッドの WorkerQueue に入ったタスクは他のスレッドで空きがあるとそれを盗んで (steal) 実行する。そうすることで全てのスレッドにおいて空きが発生し難いようになっている。ワークスティール (work stealing) 方式については http://www.yasugi.ai.kyutech.ac.jp/2012/4/k6.html が参考になる。

もしかしたら WorkerQueue の上限を越えた場合にスレッドが増えるのかもしれない。それであれば理に適っているように感じる。しかし、コードを読めていないので実際のところは不明だ。

ForkJoinPool のアルゴリズムの詳細

ForkJoinPool のソースコードを読むと理解するためには以下の資料を読むといいらしい。

アルゴリズムの概要だけを理解するのは難しくないように思うが、最適化されている部分があるようでそこまで含めて理解するのは時間がかかりそうだ…。

おわりに

この記事では Java の CompletableFutureStream の並列計算をするとき、デフォルトとして使われる ForkJoinPool について学んだ。 ForkJoinPool の使い方や大まかな仕組みについて見ることができたが、ドキュメントとの差異やアルゴリズムの詳細については謎が深まってしまった。

CompletableFuture についての記事を書いた後に ForkJoinPool の詳細に帰ってこようと思う。

Footnotes

  1. repo:openjdk21 CountedCompleter | Code search results

  2. 詳解 Java SE 8 第 22 回 Concurrency Utilities のアップデート その 4 | 日経クロステック(xTECH)

SuzumiyaAobaのプロフィール画像

SuzumiyaAoba

プログラミング、技術、その他の話題について共有するブログを書いてます。 主にScala、Java、TypeScriptなどの技術について興味あり。

ScalaJavaTypeScriptReact

Buy Me A Coffee