Waiting for background tasks to finish using the CompletableFuture class in Java
September 22, 2017 Leave a comment
Introduction
In this post we saw how to wait for a number background tasks to finish using the CountDownLatch class. The starting point for the discussion was the following situation:
Imagine that you execute a number of long running methods. Also, let’s say that the very last time consuming process depends on the previous processes, let’s call them prerequisites. The dependence is “sequential” meaning that the final stage should only run if the prerequisites have all completed and returned. The first implementation may very well be sequential where the long running methods are called one after the other and each of them blocks the main thread.
However, in case the prerequisites can be executed independently then there’s a much better solution: we can execute them in parallel instead. Independence in this case means that prerequisite A doesn’t need any return value from prerequisite B in which case parallel execution of A and B is not an option.
In this post we’ll look at an alternative solution using the CompletableFuture class. It is way more versatile than CountDownLatch which is really only sort of like a simple lock object. CompletableFuture offers a wide range of possibilities to organise your threads with a fluent API. Here we’ll start off easy with a simple application of this class.
Let’s first repeat what kind of interfaces and implementations we work with in the demo.
The service interface
We’ll put the long running processes behind an interface:
public interface MessagePrinterService { public void print(String message); }
The prerequisites will be represented by the following 4 implementations:
public class AnnoyedMessagePrinterService implements MessagePrinterService { @Override public void print(String message) { try { Thread.sleep(5000); } catch (InterruptedException ex) { //ignore } System.out.println("What now??? ".concat(message)); } } public class BlockCapitalsMessagePrinterService implements MessagePrinterService { @Override public void print(String message) { try { Thread.sleep(4000); } catch (InterruptedException ex) { //ignore } System.out.println(message.toUpperCase()); } } public class ReversedMessagePrinterService implements MessagePrinterService { @Override public void print(String message) { try { Thread.sleep(3000); } catch (InterruptedException ex) { //ignore } System.out.println(new StringBuilder(message).reverse().toString()); } } public class ScrambledMessagePrinterService implements MessagePrinterService { @Override public void print(String message) { try { Thread.sleep(2000); } catch (InterruptedException ex) { //ignore } ArrayList<Character> chars = new ArrayList<>(message.length()); for (char c : message.toCharArray()) { chars.add(c); } Collections.shuffle(chars); char[] shuffled = new char[chars.size()]; for (int i = 0; i < shuffled.length; i++) { shuffled[i] = chars.get(i); } System.out.println(new String(shuffled)); } }
We also have a fifth implementation that will simply print the supplied message without any changes. This implementation will be the final one to be called in our demo code later on:
public class UnchangedMessagePrinterService implements MessagePrinterService { @Override public void print(String message) { try { Thread.sleep(1000); } catch (InterruptedException ex) { //ignore } System.out.println(message); } }
A sequential solution would simply call each printer service to print the message one after the other with the UnchangedMessagePrinterService coming last. The total execution time will be around 15 seconds. It is the sum of all Thread.sleep wait times in the 5 message printers.
The threaded solution
We can immediately see that the prerequisite message printers can be called in parallel. There’s nothing in e.g. ReversedMessagePrinterService that depends on AnnoyedMessagePrinterService.
The CompletableFuture class is similar to Futures but it offers a lot more functions and extensions to arrange our threads. A completable future can be void, i.e. return nothing, or it can have a return value. CompletableFuture has a number of static methods that help with the construction of threads. In the below example we’ll see the following functions in action:
- runAsync: accepts a Runnable, i.e. a class with a run method that’s called when the thread starts. It also optionally accepts a thread pool
- allOf: accepts an array of tasks that must be completed. This is where the prerequisite tasks are waited upon in parallel
- thenRunAsync: an example of the fluent API behind CompletableFuture. This function also accepts a Runnable and this is where we’ll put the last continuation task, i.e. the UnchangedMessagePrinterService message printer
- exceptionally: a function that deals with exceptions in the preceding completable futures
The below example calls the various message printers using Lambda expressions. If you’re new to lambdas in Java then start here.
private static void tryVoidCompletableFutureTasks() { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); MessagePrinterService annoyed = new AnnoyedMessagePrinterService(); MessagePrinterService blockCapitals = new BlockCapitalsMessagePrinterService(); MessagePrinterService reversed = new ReversedMessagePrinterService(); MessagePrinterService scrambled = new ScrambledMessagePrinterService(); MessagePrinterService unchanged = new UnchangedMessagePrinterService(); String message = "My latest invention is going to save the world!"; CompletableFuture<Void> annoyedTask = CompletableFuture.runAsync(() -> annoyed.print(message), cachedThreadPool); CompletableFuture<Void> blockCapitalsTask = CompletableFuture.runAsync(() -> blockCapitals.print(message), cachedThreadPool); CompletableFuture<Void> reversedTask = CompletableFuture.runAsync(() -> reversed.print(message), cachedThreadPool); CompletableFuture<Void> scrambledTask = CompletableFuture.runAsync(() -> scrambled.print(message), cachedThreadPool); Instant start = Instant.now(); CompletableFuture.allOf(annoyedTask, blockCapitalsTask, reversedTask, scrambledTask) .thenRunAsync( (() -> { unchanged.print(message); Instant finish = Instant.now(); Duration duration = Duration.between(start, finish); long seconds = duration.getSeconds(); System.out.println(seconds); } ), cachedThreadPool) .exceptionally((ex) -> { System.out.println(ex.getMessage()); return null; }); }
Note how the allOf, thenRunAsync and exceptionally functions can be called in succession in a fluent way. The exceptionally function also returns a void completable future. It acts as a “placeholder” task in case of an exception. It’s fine to return null if we only want to print the exception message.
Running the function will provide an output similar to the following:
vadve sn one h gsgoMino totey tts ili irlew!tna
!dlrow eht evas ot gniog si noitnevni tsetal yM
MY LATEST INVENTION IS GOING TO SAVE THE WORLD!
What now??? My latest invention is going to save the world!
My latest invention is going to save the world!
6
…where 6 is the number of seconds it took to complete all tasks. That’s a great improvement compared to the 15 with the sequential solution.
View all posts related to Java here.