Sharing numeric values across threads using Java 8 LongAdder
September 18, 2017 Leave a comment
In this post we saw how to share primitive values across threads using the various atomic objects in the java.util.concurrent.atomic package. The example code demonstrated the AtomicInteger object which is the thread-safe variant of a “normal” integer. Mathematical operations like adding a value to an integer are carried out atomically for that object. This means that the low-level instructions involved in adding two integers are carried out as one unit without the risk of another interfering thread. The same package includes atomic versions of other primitive values such as AtomicBoolean or AtomicLong.
In this post we’ll take a quick look at an addition in Java 8 relevant to sharing integers, longs and doubles.
The java.util.concurrent.atomic package includes the LongAdder and DoubleAdder classes in Java 8. They are better suited than their Atomic* counterparts in situations where a high number of threads try to update the same variable.
LongAdder has methods similar to those in AtomicInteger to increment, decrement or extract the current value. The sum() method returns the current value as a long and it’s equivalent to the longValue() method. There are other methods to convert the sum to floats and integers.
Here’s the code of the SharedPrimitiveTesterService class from the previous post:
import java.util.concurrent.atomic.AtomicInteger; public class SharedPrimitiveTesterService { private final AtomicInteger sharedCounter = new AtomicInteger(0); public int increment() { return sharedCounter.incrementAndGet(); } public int decrement() { return sharedCounter.decrementAndGet(); } public int getValue() { return sharedCounter.get(); } }
Let’s factor this out to an interface:
public interface PrimitiveTesterService { public int increment(); public int decrement(); public int getValue(); }
…and modify the SharedPrimitiveTesterService declaration:
public class SharedPrimitiveTesterService implements PrimitiveTesterService
Here comes the class based around LongAdder which fulfils the same interface:
import java.util.concurrent.atomic.LongAdder; public class SharedPrimitiveTesterServiceWithAdder implements PrimitiveTesterService { private final LongAdder sharedCounter = new LongAdder(); @Override public int increment() { sharedCounter.increment(); return sharedCounter.intValue(); } @Override public int decrement() { sharedCounter.decrement(); return sharedCounter.intValue(); } @Override public int getValue() { return sharedCounter.intValue(); } }
Next we change the type of the shared service in IncrementTask and DecrementTask so that we can pass in any concrete type for testing:
public class DecrementTask implements Callable<Integer> { private final PrimitiveTesterService sharedObjectService; private final int numberOfTimes; public DecrementTask(PrimitiveTesterService sharedObjectService, int numberOfTimes) { this.sharedObjectService = sharedObjectService; this.numberOfTimes = numberOfTimes; } //rest of code ignored } public class IncrementTask implements Callable<Integer> { private final PrimitiveTesterService sharedObjectService; private final int numberOfTimes; public IncrementTask(PrimitiveTesterService sharedObjectService, int numberOfTimes) { this.sharedObjectService = sharedObjectService; this.numberOfTimes = numberOfTimes; } //rest of code ignored }
Finally here’s our revised test code. We can easily switch between the SharedPrimitiveTesterServiceWithAdder and SharedPrimitiveTesterService implementations:
private static void trySharedPrimitives() { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); PrimitiveTesterService sharedObjectService = new SharedPrimitiveTesterServiceWithAdder(); Callable<Integer> incrementTask = new IncrementTask(sharedObjectService, 1000000); Callable<Integer> decrementTask = new DecrementTask(sharedObjectService, 400000); List<Callable<Integer>> calcTasks = new ArrayList<>(); calcTasks.add(decrementTask); calcTasks.add(incrementTask); try { List<Future<Integer>> futures = newCachedThreadPool.invokeAll(calcTasks); for (Future<Integer> future : futures) { future.get(); } int res = sharedObjectService.getValue(); System.out.println(res); } catch (InterruptedException | ExecutionException ex) { System.out.println(ex.getMessage()); } }
The “res” variable will be 600,000 as expected.
View all posts related to Java here.