Getting a result from a parallel task in Java using CompletableFuture

In this post we saw how to start several processes on different threads using the CompletableFuture class. The example concentrated on methods with no return value. We let CompletableFuture finish the tasks in parallel before continuing with another process.

In this post we’ll see a usage of CompletableFuture for functions with a return value. We’ll reuse several elements we saw in the post that concentrated on the Future class.

The demo objects

We’ll simulate a number of long running calculation processes that all return an integer but are independent of each other.

We have the following interface to implement various heavy calculations:

public interface CalculationService
{
    public int calculate(int first, int second);
}

…and we have the following implementations:

public class AdditionService implements CalculationService
{

    @Override
    public int calculate(int first, int second)
    {
        try
        {
            Thread.sleep(1000);
        } catch (InterruptedException ex)
        {
            //ignore
        }
        return first + second;
    }
    
}

public class SubtractionService implements CalculationService
{

    @Override
    public int calculate(int first, int second)
    {
        try
        {
            Thread.sleep(2000);
        } catch (InterruptedException ex)
        {
            //ignore
        }
        return first - second;
    }    
}

public class MultiplicationService implements CalculationService
{
    @Override
    public int calculate(int first, int second)
    {
        try
        {
            Thread.sleep(3000);
        } catch (InterruptedException ex)
        {
            //ignore
        }
        return first * second;
    }
}

public class DivisionService implements CalculationService
{
    @Override
    public int calculate(int first, int second)
    {
        try
        {
            Thread.sleep(4000);
        } catch (InterruptedException ex)
        {
            //ignore
        }
        return first / second;
    }
}

The sleeping threads are meant to simulate that a long running calculation produces the results.

In the referenced post above we used the runAsync method of CompletableFuture to start the processes. The runAsync method was specifically designed for void processes where we were not expecting any return value. The function to use for processes that do have a return value is called supplyAsync. SupplyAsync accepts an object that implements the Supplier of T interface. A Supplier is similar to a Callable which is used in conjunction of the Future interface. The type parameter will be of the return type, integer in our case. A Supplier implements a method called get where the actual code is implemented to return the end result.

Here’s our supplier to return the calculation result:

import java.util.function.Supplier;

public class CalculationServiceSupplier implements Supplier<Integer>
{
    private final CalculationService calculationService;
    private final int firstOperand;
    private final int secondOperand;

    public CalculationServiceSupplier(CalculationService calculationService, int firstOperand, int secondOperand)
    {
        this.calculationService = calculationService;
        this.firstOperand = firstOperand;
        this.secondOperand = secondOperand;
    }
    
    @Override
    public Integer get()
    {
        return calculationService.calculate(firstOperand, secondOperand);
    }
    
}

The demo code

In the below example we build a completable future for each of the calculation tasks, i.e. there will be 4 in total. We use the supplyAsync function of CompletableFuture as we hinted at above. Then for each process we’ll see the whenComplete function in action. whenComplete is attached to a completable future and is executed when the future has completed. It accepts a consumer with two parameters: the result of the process and a throwable object. The result will be the result returned by the parallel action and the throwable, i.e. the exception is populated in case there’s an exception in the parallel process.

Here’s a possible implementation:

private void tryCompletableFutureTasks()
{
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    CalculationService adder = new AdditionService();
    CalculationService subtractor = new SubtractionService();
    CalculationService multiplier = new MultiplicationService();
    CalculationService divider = new DivisionService();
    int firstOperand = 10;
    int secondOperand = 5;
    CompletableFuture<Integer> additionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(adder, firstOperand, secondOperand), cachedThreadPool);
    CompletableFuture<Integer> subtractionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(subtractor, firstOperand, secondOperand), cachedThreadPool);
    CompletableFuture<Integer> multiplicationTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(multiplier, firstOperand, secondOperand), cachedThreadPool);
    CompletableFuture<Integer> divisionTask = CompletableFuture.supplyAsync(new CalculationServiceSupplier(divider, firstOperand, secondOperand), cachedThreadPool);
    List<CompletableFuture<Integer>> allTasks = new ArrayList<>();
    allTasks.add(additionTask);
    allTasks.add(subtractionTask);
    allTasks.add(multiplicationTask);
    allTasks.add(divisionTask);

    for (CompletableFuture<Integer> task : allTasks)
    {
        task.whenComplete((result, exception)
                -> 
                {
                    if (exception == null)
                    {
                        System.out.println(result);
                    } else
                    {
                        task.completeExceptionally(exception);
                        System.out.println(exception.getMessage());
                    }
        });
    }   
}

Run the function and the calculation results will appear in the debug window one by one as they are returned by their respective parallel functions.

An alternative solution is to wait for all tasks to complete using the CompletableFuture.allOf method we saw in the post referenced in the first sentence of this post. Then we ask each completable future to return their results using the get function:

CompletableFuture.allOf(additionTask, subtractionTask, multiplicationTask, divisionTask);
    allTasks.stream().forEach((task) ->
    {
        try
        {
            int result = task.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException ex)
        {
            System.err.println(ex.getMessage());
        }
    });

View all posts related to Java here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: