Getting a result from a parallel task in Java using CompletableFuture
September 14, 2017 Leave a comment
In this post we saw how to start several processes on different threads using the CompletableFuture class. The example concentrated on methods with no return value. We let CompletableFuture finish the tasks in parallel before continuing with another process.
In this post we’ll see a usage of CompletableFuture for functions with a return value. We’ll reuse several elements we saw in the post that concentrated on the Future class.
The demo objects
We’ll simulate a number of long running calculation processes that all return an integer but are independent of each other.
We have the following interface to implement various heavy calculations:
public interface CalculationService { public int calculate(int first, int second); }
…and we have the following implementations:
public class AdditionService implements CalculationService { @Override public int calculate(int first, int second) { try { Thread.sleep(1000); } catch (InterruptedException ex) { //ignore } return first + second; } } public class SubtractionService implements CalculationService { @Override public int calculate(int first, int second) { try { Thread.sleep(2000); } catch (InterruptedException ex) { //ignore } return first - second; } } public class MultiplicationService implements CalculationService { @Override public int calculate(int first, int second) { try { Thread.sleep(3000); } catch (InterruptedException ex) { //ignore } return first * second; } } public class DivisionService implements CalculationService { @Override public int calculate(int first, int second) { try { Thread.sleep(4000); } catch (InterruptedException ex) { //ignore } return first / second; } }
The sleeping threads are meant to simulate that a long running calculation produces the results.
In the referenced post above we used the runAsync method of CompletableFuture to start the processes. The runAsync method was specifically designed for void processes where we were not expecting any return value. The function to use for processes that do have a return value is called supplyAsync. SupplyAsync accepts an object that implements the Supplier of T interface. A Supplier is similar to a Callable which is used in conjunction of the Future interface. The type parameter will be of the return type, integer in our case. A Supplier implements a method called get where the actual code is implemented to return the end result.
Here’s our supplier to return the calculation result:
import java.util.function.Supplier; public class CalculationServiceSupplier implements Supplier<Integer> { private final CalculationService calculationService; private final int firstOperand; private final int secondOperand; public CalculationServiceSupplier(CalculationService calculationService, int firstOperand, int secondOperand) { this.calculationService = calculationService; this.firstOperand = firstOperand; this.secondOperand = secondOperand; } @Override public Integer get() { return calculationService.calculate(firstOperand, secondOperand); } }
The demo code
In the below example we build a completable future for each of the calculation tasks, i.e. there will be 4 in total. We use the supplyAsync function of CompletableFuture as we hinted at above. Then for each process we’ll see the whenComplete function in action. whenComplete is attached to a completable future and is executed when the future has completed. It accepts a consumer with two parameters: the result of the process and a throwable object. The result will be the result returned by the parallel action and the throwable, i.e. the exception is populated in case there’s an exception in the parallel process.
Here’s a possible implementation:
private void tryCompletableFutureTasks() { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); CalculationService adder = new AdditionService(); CalculationService subtractor = new SubtractionService(); CalculationService multiplier = new MultiplicationService(); CalculationService divider = new DivisionService(); int firstOperand = 10; int secondOperand = 5; CompletableFuture<Integer> additionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(adder, firstOperand, secondOperand), cachedThreadPool); CompletableFuture<Integer> subtractionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(subtractor, firstOperand, secondOperand), cachedThreadPool); CompletableFuture<Integer> multiplicationTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(multiplier, firstOperand, secondOperand), cachedThreadPool); CompletableFuture<Integer> divisionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(divider, firstOperand, secondOperand), cachedThreadPool); List<CompletableFuture<Integer>> allTasks = new ArrayList<>(); allTasks.add(additionTask); allTasks.add(subtractionTask); allTasks.add(multiplicationTask); allTasks.add(divisionTask); for (CompletableFuture<Integer> task : allTasks) { task.whenComplete((result, exception) -> { if (exception == null) { System.out.println(result); } else { task.completeExceptionally(exception); System.out.println(exception.getMessage()); } }); } }
Run the function and the calculation results will appear in the debug window one by one as they are returned by their respective parallel functions.
An alternative solution is to wait for all tasks to complete using the CompletableFuture.allOf method we saw in the post referenced in the first sentence of this post. Then we ask each completable future to return their results using the get function:
CompletableFuture.allOf(additionTask, subtractionTask, multiplicationTask, divisionTask); allTasks.stream().forEach((task) -> { try { int result = task.get(); System.out.println(result); } catch (InterruptedException | ExecutionException ex) { System.err.println(ex.getMessage()); } });
View all posts related to Java here.