2024-12-08

Trying ForkJoinPool

Notes
This article was translated by GPT-5.2-Codex. The original is here.

Introduction

I originally planned to write about CompletableFuture, but while researching ForkJoinPool as a prerequisite, it grew into a standalone article. This is a memo about writing programs using ForkJoinPool, which is used under the hood when you write parallel processing with CompletableFuture or Stream.

What is ForkJoinPool?

ForkJoinPool is an ExecutorService for executing ForkJoinTask (as the Javadoc says).

In ForkJoinPool, you process parallel computations using classes that implement ForkJoinTask. The standard library provides three such classes:

CountedCompleter is unfamiliar, but it seems to be used internally in the JDK to implement parallel streams.1 2 In this article, I focus on RecursiveAction and RecursiveTask. The difference is whether you can obtain a result value after the parallel computation. If the main purpose is side effects, use RecursiveAction. If the purpose is to produce a new value, use RecursiveTask.

How to use ForkJoinPool

Let's look at how to use RecursiveAction and RecursiveTask.

Example using RecursiveAction

As an example, let's look at the sort shown in the Javadoc. The Javadoc code omits indentation, blank lines, and braces for if and for, so I formatted it, added the @Override annotation to the required compute method, added imports, and renamed the class from SortTask to SortAction.

1
import java.util.Arrays;
2
import java.util.concurrent.RecursiveAction;
3
4
public class SortAction extends RecursiveAction {
5
6
final long[] array;
7
final int lo, hi;
8
9
SortAction(long[] array, int lo, int hi) {
10
this.array = array;
11
this.lo = lo;
12
this.hi = hi;
13
}
14
15
SortAction(long[] array) {
16
this(array, 0, array.length);
17
}
18
19
@Override
20
protected void compute() {
21
if (hi - lo < THRESHOLD) {
22
sortSequentially(lo, hi);
23
} else {
24
int mid = (lo + hi) >>> 1; // same as dividing by 2
25
26
invokeAll(new SortAction(array, lo, mid), new SortAction(array, mid, hi));
27
28
merge(lo, mid, hi);
29
}
30
}
31
32
// implementation details follow:
33
static final int THRESHOLD = 1000;
34
35
void sortSequentially(int lo, int hi) {
36
Arrays.sort(array, lo, hi);
37
}
38
39
void merge(int lo, int mid, int hi) {
40
// reuse the back half after splitting the array
41
long[] buf = Arrays.copyOfRange(array, lo, mid);
42
for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
43
array[j] = (k == hi || buf[i] < array[k])
44
? buf[i++]
45
: array[k++];
46
}
47
}
48
}

Although not explicitly stated in the Javadoc, this sorts by combining merge sort with Java's standard sort. If the number of elements is less than 1000, it uses the standard library sort; otherwise it splits the array into two ranges and recursively calls itself (splitting by index; the array is not copied). Because sorting performance depends on input size, this simple trick is cost-effective for reducing performance degradation. Arrays#sort() is implemented with Dual pivot Quicksort.

The two split ranges are passed to the SortAction constructor, and the tasks are forked with invokeAll. After both tasks complete, merge merges the sorted data into the target array in the same way as merge sort.

RecursiveTask

As a simple example of a parallelizable RecursiveTask, you might think of computing the nth Fibonacci number, but there is already a Qiita article using Fibonacci, so here I implement merge sort with RecursiveTask. It overlaps with the RecursiveAction example, but this version is non-destructive. It also does not switch algorithms based on input size.

1
import java.util.ArrayList;
2
import java.util.Collections;
3
import java.util.Comparator;
4
import java.util.List;
5
import java.util.concurrent.RecursiveTask;
6
7
public class MergeSortTask<T extends Comparable<? super T>, U extends Comparator<T>> extends RecursiveTask<List<T>> {
8
9
/** List to sort. */
10
private final List<T> xs;
11
12
private final Comparator<T> comparator;
13
14
public MergeSortTask(List<T> xs, Comparator<T> comparator) {
15
this.xs = xs;
16
this.comparator = comparator;
17
}
18
19
@Override
20
protected List<T> compute() {
21
// If the list is empty or has one element, it is already sorted.
22
if (xs.size() <= 1) {
23
return xs;
24
}
25
26
// Split the list into left and right.
27
final var mid = Math.floorDiv(xs.size(), 2);
28
final var left = new MergeSortTask<>(xs.subList(0, mid), comparator);
29
final var right = new MergeSortTask<>(xs.subList(mid, xs.size()), comparator);
30
31
// Adjust async execution of the right side.
32
right.fork();
33
34
final var sortedLeft = left.compute();
35
final var sortedRight = right.join();
36
37
final var sorted = new ArrayList<T>();
38
int l = 0, r = 0;
39
while (l < sortedLeft.size() && r < sortedRight.size()) {
40
final var x = sortedLeft.get(l);
41
final var y = sortedRight.get(r);
42
43
if (comparator.compare(x, y) <= 0) {
44
sorted.add(x);
45
l++;
46
} else {
47
sorted.add(y);
48
r++;
49
}
50
}
51
52
while (l < sortedLeft.size()) {
53
sorted.add(sortedLeft.get(l++));
54
}
55
56
while (r < sortedRight.size()) {
57
sorted.add(sortedRight.get(r++));
58
}
59
60
return Collections.unmodifiableList(sorted);
61
}
62
}

The algorithm is standard merge sort, so nothing special there. The only notable difference is that unlike the Javadoc RecursiveAction example, it supports arbitrary comparable types (Comparable), so the generics are slightly more complex.

Performance

Let's compare execution times for each implementation. Ideally we should compare memory usage as well, but here I focus only on time.

In addition to SortAction and MergeSortTask, I compare Stream#sorted() and Stream#parallelStream()#sorted().

Before even comparing, I expect the order to be SortAction > Stream#parallelStream()#sorted() > MergeSortTask > Stream#sort(), but here are the results for arrays/lists with 100,000,000 elements.

ImplementationTime (s)
Stream#sort()36.263888375
MergeSortTask13.619412583
SortAction1.5119055
Stream#parallelStream()#sorted()4.816473459

The time to generate the data is not included. From this, we can see that copying data is extremely expensive, and parallelization is useful for speed. If you need a non-destructive sort, copying the array and passing it to SortAction would likely be fastest, but it would probably be similar to parallelStream.

The measurements were done on:

  • Mac mini 2024
    • Chip: Apple M4 Pro
    • CPU: 14 cores
    • GPU: 16 cores
    • Memory: 64 GB
    • SSD: 1 TB
    • macOS: Squoia 15.1

As of 2024/12/08 this is a high-end machine, so it would take longer on a typical PC. Even if there is room for optimization, 1.5 seconds for sorting 100 million elements feels slow. Maybe it would be faster in C. The measurement method is rough, so it may be better in reality.

As mentioned later, I did not adjust parallelism in this measurement. So, tuning the thread count based on java - Setting Ideal size of Thread Pool - Stack Overflow might improve performance.

ForkJoinPool parallelism

I did not do any tuning when using ForkJoinPool this time. In the constructor, parameters that affect performance include:

  • int parallelism
  • int corePoolSize
  • int maximumPoolSize
  • int minimumRunnable
  • long keepAliveTime
ForkJoinPool.javajava
1
public ForkJoinPool(int parallelism,
2
ForkJoinWorkerThreadFactory factory,
3
UncaughtExceptionHandler handler,
4
boolean asyncMode,
5
int corePoolSize,
6
int maximumPoolSize,
7
int minimumRunnable,
8
Predicate<? super ForkJoinPool> saturate,
9
long keepAliveTime,
10
TimeUnit unit) {
11
checkPermission();
12
int p = parallelism;
13
if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)
14
throw new IllegalArgumentException();
15
if (factory == null || unit == null)
16
throw new NullPointerException();
17
this.parallelism = p;
18
this.factory = factory;
19
this.ueh = handler;
20
this.saturate = saturate;
21
this.config = asyncMode ? FIFO : 0;
22
this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
23
int corep = Math.clamp(corePoolSize, p, MAX_CAP);
24
int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
25
int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
26
this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |
27
((long)corep << 32);
28
int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
29
this.registrationLock = new ReentrantLock();
30
this.queues = new WorkQueue[size];
31
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
32
String name = "ForkJoinPool-" + pid;
33
this.workerNamePrefix = name + "-worker-";
34
this.container = SharedThreadContainer.create(name);
35
}

How ForkJoinPool works

The following is written while I am still learning ForkJoinPool, so it may contain errors. Please read it as my current understanding.

When you create a ForkJoinPool with the no-argument constructor, it uses the number of processors as parallelism (parallelism level). parallelism corresponds to the number of threads created by the pool. The documentation says that, by default, 256 threads are reserved as the maximum number of spare threads to maintain parallelism, but in my experiments I never observed more threads running than parallelism.

In ForkJoinPool, each thread has a WorkerQueue, and tasks are added to that queue. If another thread is idle, it steals tasks from other queues and executes them. This makes it less likely that any thread sits idle. For work stealing, see http://www.yasugi.ai.kyutech.ac.jp/2012/4/k6.html.

Maybe the number of threads increases when the WorkerQueue limit is exceeded. If so, it would make sense. But I haven't read the code, so I don't know.

Details of the ForkJoinPool algorithm

To understand the ForkJoinPool source code, the following references are recommended.

Understanding the outline of the algorithm doesn't seem too hard, but it looks like there are optimizations, so understanding the details will take time.

Conclusion

In this article, I learned about ForkJoinPool, which is used by default for parallel computation in Java CompletableFuture and Stream. I was able to look at how to use it and its rough mechanism, but the differences from the docs and the algorithm details remain unclear.

After writing about CompletableFuture, I want to come back and dig deeper into ForkJoinPool.

Footnotes

  1. repo:openjdk21 CountedCompleter | Code search results

  2. Detailed Java SE 8 Vol.22 Concurrency Utilities Updates Part 4 | Nikkei xTECH

Amazon アソシエイトについて

この記事には Amazon アソシエイトのリンクが含まれています。Amazonのアソシエイトとして、SuzumiyaAoba は適格販売により収入を得ています。