Notes
Introduction
I originally planned to write about CompletableFuture, but while researching ForkJoinPool as a prerequisite, it grew into a standalone article.
This is a memo about writing programs using ForkJoinPool, which is used under the hood when you write parallel processing with CompletableFuture or Stream.
What is ForkJoinPool?
ForkJoinPool is an ExecutorService for executing ForkJoinTask (as the Javadoc says).
In ForkJoinPool, you process parallel computations using classes that implement ForkJoinTask.
The standard library provides three such classes:
CountedCompleter is unfamiliar, but it seems to be used internally in the JDK to implement parallel streams.1 2
In this article, I focus on RecursiveAction and RecursiveTask.
The difference is whether you can obtain a result value after the parallel computation.
If the main purpose is side effects, use RecursiveAction.
If the purpose is to produce a new value, use RecursiveTask.
How to use ForkJoinPool
Let's look at how to use RecursiveAction and RecursiveTask.
Example using RecursiveAction
As an example, let's look at the sort shown in the Javadoc.
The Javadoc code omits indentation, blank lines, and braces for if and for, so I formatted it,
added the @Override annotation to the required compute method,
added imports, and renamed the class from SortTask to SortAction.
1import java.util.Arrays;2import java.util.concurrent.RecursiveAction;34public class SortAction extends RecursiveAction {56final long[] array;7final int lo, hi;89SortAction(long[] array, int lo, int hi) {10this.array = array;11this.lo = lo;12this.hi = hi;13}1415SortAction(long[] array) {16this(array, 0, array.length);17}1819@Override20protected void compute() {21if (hi - lo < THRESHOLD) {22sortSequentially(lo, hi);23} else {24int mid = (lo + hi) >>> 1; // same as dividing by 22526invokeAll(new SortAction(array, lo, mid), new SortAction(array, mid, hi));2728merge(lo, mid, hi);29}30}3132// implementation details follow:33static final int THRESHOLD = 1000;3435void sortSequentially(int lo, int hi) {36Arrays.sort(array, lo, hi);37}3839void merge(int lo, int mid, int hi) {40// reuse the back half after splitting the array41long[] buf = Arrays.copyOfRange(array, lo, mid);42for (int i = 0, j = lo, k = mid; i < buf.length; j++) {43array[j] = (k == hi || buf[i] < array[k])44? buf[i++]45: array[k++];46}47}48}
Although not explicitly stated in the Javadoc, this sorts by combining merge sort with Java's standard sort.
If the number of elements is less than 1000, it uses the standard library sort; otherwise it splits the array into two ranges and recursively calls itself (splitting by index; the array is not copied).
Because sorting performance depends on input size, this simple trick is cost-effective for reducing performance degradation.
Arrays#sort() is implemented with Dual pivot Quicksort.
The two split ranges are passed to the SortAction constructor, and the tasks are forked with invokeAll.
After both tasks complete, merge merges the sorted data into the target array in the same way as merge sort.
RecursiveTask
As a simple example of a parallelizable RecursiveTask, you might think of computing the nth Fibonacci number,
but there is already a Qiita article using Fibonacci,
so here I implement merge sort with RecursiveTask.
It overlaps with the RecursiveAction example, but this version is non-destructive.
It also does not switch algorithms based on input size.
1import java.util.ArrayList;2import java.util.Collections;3import java.util.Comparator;4import java.util.List;5import java.util.concurrent.RecursiveTask;67public class MergeSortTask<T extends Comparable<? super T>, U extends Comparator<T>> extends RecursiveTask<List<T>> {89/** List to sort. */10private final List<T> xs;1112private final Comparator<T> comparator;1314public MergeSortTask(List<T> xs, Comparator<T> comparator) {15this.xs = xs;16this.comparator = comparator;17}1819@Override20protected List<T> compute() {21// If the list is empty or has one element, it is already sorted.22if (xs.size() <= 1) {23return xs;24}2526// Split the list into left and right.27final var mid = Math.floorDiv(xs.size(), 2);28final var left = new MergeSortTask<>(xs.subList(0, mid), comparator);29final var right = new MergeSortTask<>(xs.subList(mid, xs.size()), comparator);3031// Adjust async execution of the right side.32right.fork();3334final var sortedLeft = left.compute();35final var sortedRight = right.join();3637final var sorted = new ArrayList<T>();38int l = 0, r = 0;39while (l < sortedLeft.size() && r < sortedRight.size()) {40final var x = sortedLeft.get(l);41final var y = sortedRight.get(r);4243if (comparator.compare(x, y) <= 0) {44sorted.add(x);45l++;46} else {47sorted.add(y);48r++;49}50}5152while (l < sortedLeft.size()) {53sorted.add(sortedLeft.get(l++));54}5556while (r < sortedRight.size()) {57sorted.add(sortedRight.get(r++));58}5960return Collections.unmodifiableList(sorted);61}62}
The algorithm is standard merge sort, so nothing special there.
The only notable difference is that unlike the Javadoc RecursiveAction example, it supports arbitrary comparable types (Comparable), so the generics are slightly more complex.
Performance
Let's compare execution times for each implementation. Ideally we should compare memory usage as well, but here I focus only on time.
In addition to SortAction and MergeSortTask, I compare Stream#sorted() and Stream#parallelStream()#sorted().
Before even comparing, I expect the order to be SortAction > Stream#parallelStream()#sorted() > MergeSortTask > Stream#sort(),
but here are the results for arrays/lists with 100,000,000 elements.
| Implementation | Time (s) |
|---|---|
Stream#sort() | 36.263888375 |
MergeSortTask | 13.619412583 |
SortAction | 1.5119055 |
Stream#parallelStream()#sorted() | 4.816473459 |
The time to generate the data is not included.
From this, we can see that copying data is extremely expensive, and parallelization is useful for speed.
If you need a non-destructive sort, copying the array and passing it to SortAction would likely be fastest,
but it would probably be similar to parallelStream.
The measurements were done on:
- Mac mini 2024
- Chip: Apple M4 Pro
- CPU: 14 cores
- GPU: 16 cores
- Memory: 64 GB
- SSD: 1 TB
- macOS: Squoia 15.1
As of 2024/12/08 this is a high-end machine, so it would take longer on a typical PC. Even if there is room for optimization, 1.5 seconds for sorting 100 million elements feels slow. Maybe it would be faster in C. The measurement method is rough, so it may be better in reality.
As mentioned later, I did not adjust parallelism in this measurement.
So, tuning the thread count based on java - Setting Ideal size of Thread Pool - Stack Overflow might improve performance.
ForkJoinPool parallelism
I did not do any tuning when using ForkJoinPool this time.
In the constructor, parameters that affect performance include:
int parallelismint corePoolSizeint maximumPoolSizeint minimumRunnablelong keepAliveTime
1public ForkJoinPool(int parallelism,2ForkJoinWorkerThreadFactory factory,3UncaughtExceptionHandler handler,4boolean asyncMode,5int corePoolSize,6int maximumPoolSize,7int minimumRunnable,8Predicate<? super ForkJoinPool> saturate,9long keepAliveTime,10TimeUnit unit) {11checkPermission();12int p = parallelism;13if (p <= 0 || p > MAX_CAP || p > maximumPoolSize || keepAliveTime <= 0L)14throw new IllegalArgumentException();15if (factory == null || unit == null)16throw new NullPointerException();17this.parallelism = p;18this.factory = factory;19this.ueh = handler;20this.saturate = saturate;21this.config = asyncMode ? FIFO : 0;22this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);23int corep = Math.clamp(corePoolSize, p, MAX_CAP);24int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);25int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);26this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |27((long)corep << 32);28int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));29this.registrationLock = new ReentrantLock();30this.queues = new WorkQueue[size];31String pid = Integer.toString(getAndAddPoolIds(1) + 1);32String name = "ForkJoinPool-" + pid;33this.workerNamePrefix = name + "-worker-";34this.container = SharedThreadContainer.create(name);35}
How ForkJoinPool works
The following is written while I am still learning ForkJoinPool, so it may contain errors.
Please read it as my current understanding.
When you create a ForkJoinPool with the no-argument constructor, it uses the number of processors as parallelism (parallelism level).
parallelism corresponds to the number of threads created by the pool.
The documentation says that, by default, 256 threads are reserved as the maximum number of spare threads to maintain parallelism,
but in my experiments I never observed more threads running than parallelism.
In ForkJoinPool, each thread has a WorkerQueue, and tasks are added to that queue.
If another thread is idle, it steals tasks from other queues and executes them.
This makes it less likely that any thread sits idle.
For work stealing, see http://www.yasugi.ai.kyutech.ac.jp/2012/4/k6.html.
Maybe the number of threads increases when the WorkerQueue limit is exceeded. If so, it would make sense.
But I haven't read the code, so I don't know.
Details of the ForkJoinPool algorithm
To understand the ForkJoinPool source code, the following references are recommended.
- Herlihy and Shavit's book "The Art of Multiprocessor programming", chapter 16
- "DynamicCircular Work-Stealing Deque" by Chase and Lev, SPAA 2005
- Link is dead 😄
- "Idempotent work stealing" by Michael, Saraswat, and Vechev, PPoPP 2009
- You can find PDFs by searching the title, but they don't look like author uploads, so I won't link them.
- "Correct and Efficient Work-Stealing for Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
Understanding the outline of the algorithm doesn't seem too hard, but it looks like there are optimizations, so understanding the details will take time.
Conclusion
In this article, I learned about ForkJoinPool, which is used by default for parallel computation in Java CompletableFuture and Stream.
I was able to look at how to use it and its rough mechanism, but the differences from the docs and the algorithm details remain unclear.
After writing about CompletableFuture, I want to come back and dig deeper into ForkJoinPool.

