The Java Stream API part 5: collection reducers

Introduction

In the previous post we saw how to handle an ambiguous terminal reduction result of a Stream. There’s in fact another type of reducer function in Java 8 that we haven’t discussed so far: collectors, represented by the collect() function available for Stream objects. The first overload of the collect function accepts an object that implements the Collector interface.

Implementing the Collector interface involves implementing 5 functions: a supplier, an accumulator, a combiner, a finisher and characteristics provider. At this point I’m not sure how to implement all those methods. Luckily for us the Collectors object provides a long range of ready-made implementing classes that can be supplied to the collect function.

Purpose and first example

Collectors are similar to Maps and the Reducers we’ve seen up to now in this series at the same time. Depending on the exact implementation you take the collect function can e.g. map a certain numeric field of a custom object into an intermediary stream and calculate the average of that field in one step.

Let’s see that in action. We’ll revisit our Employee class:

public class Employee
{
    private UUID id;
    private String name;
    private int age;

    public Employee(UUID id, String name, int age)
    {
        this.id = id;
        this.name = name;
        this.age = age;
    }
        
    public UUID getId()
    {
        return id;
    }

    public void setId(UUID id)
    {
        this.id = id;
    }

    public String getName()
    {
        return name;
    }

    public void setName(String name)
    {
        this.name = name;
    }    
    
    public int getAge()
    {
        return age;
    }

    public void setAge(int age)
    {
        this.age = age;
    }
    
    public boolean isCool(EmployeeCoolnessJudger coolnessJudger)
    {
        return coolnessJudger.isCool(this);
    }
    
    public void saySomething(EmployeeSpeaker speaker)
    {
        speaker.speak();
    }
}

We’ve seen that some aggregation functions have ready-made methods in the Stream class: min, max, count and some others. However, there’s nothing for counting the average. What if I’d like to calculate the average age of my employees?

List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(UUID.randomUUID(), "Elvis", 50));
        employees.add(new Employee(UUID.randomUUID(), "Marylin", 18));
        employees.add(new Employee(UUID.randomUUID(), "Freddie", 25));
        employees.add(new Employee(UUID.randomUUID(), "Mario", 43));
        employees.add(new Employee(UUID.randomUUID(), "John", 35));
        employees.add(new Employee(UUID.randomUUID(), "Julia", 55));        
        employees.add(new Employee(UUID.randomUUID(), "Lotta", 52));
        employees.add(new Employee(UUID.randomUUID(), "Eva", 42));
        employees.add(new Employee(UUID.randomUUID(), "Anna", 20)); 

It may not be obvious at first but the collect function can perform that – and a lot more. The Collectors class includes a ready-made implementation of Collector: averagingInt which accepts a ToIntFunction of T. The ToIntFunction will return an integer from the T object. In our case we need the age values so we can calculate the average age as follows:

ToIntFunction<Employee> toInt = Employee::getAge;
Double averageAge = employees.stream().collect(Collectors.averagingInt(toInt));     

averageAge will be 37.78.

Other examples

Collect all the names into a string list:

List<String> names = employees.stream().map(Employee::getName).collect(Collectors.toList());     

Compute sum of all ages in a different way:

int totalAge = employees.stream().collect(Collectors.summingInt(Employee::getAge));

Let’s change the age values a little before the next example:

employees.add(new Employee(UUID.randomUUID(), "Elvis", 50));
        employees.add(new Employee(UUID.randomUUID(), "Marilyn", 20));
        employees.add(new Employee(UUID.randomUUID(), "Freddie", 20));
        employees.add(new Employee(UUID.randomUUID(), "Mario", 30));
        employees.add(new Employee(UUID.randomUUID(), "John", 30));
        employees.add(new Employee(UUID.randomUUID(), "Julia", 50));
        employees.add(new Employee(UUID.randomUUID(), "Lotta", 30));
        employees.add(new Employee(UUID.randomUUID(), "Eva", 40));
        employees.add(new Employee(UUID.randomUUID(), "Anna", 20));    

We can group the employees by age into a Map of Integers:

Map<Integer, List<Employee>> employeesByAge = employees.stream().collect(Collectors.groupingBy(Employee::getAge));  

Here you’ll see that the key 20 will have 3 employees, key 50 will have 2 employees etc.

You can also supply another Collector to the groupingBy function if you want to have some different type as the value in the Map. E.g. the following will do the same as above except that the value will show the number of employees within an age group:

Map<Integer, Long> employeesByAge = employees.stream().collect(Collectors.groupingBy(Employee::getAge, Collectors.counting()));

You can partition the collection based on some boolean condition. Here we build a Map by putting the employees into one of two groups: younger than 40 or older. The partitionBy function will help solve this:

Map<Boolean, List<Employee>> agePartitioning = employees.stream().collect(Collectors.partitioningBy(emp -> emp.getAge()>= 40));

agePartitioning will have 6 employees who are younger than 40 and 3 who are either 40 or older which is the correct result.

You can create something like an ad-hoc toString() function:

String allEmployees = employees.stream().map(emp -> emp.getName().concat(",  ").concat(Integer.toString(emp.getAge()))).collect(Collectors.joining(" | "));

The above function will go through each employee, create a “name + , + age” string of each of them and then join all individual strings by a pipe character. The result will look like this:

Elvis, 50 | Marilyn, 20 | Freddie, 20 | Mario, 30 | John, 30 | Julia, 50 | Lotta, 30 | Eva, 40 | Anna, 20

Notice that the collector was intelligent not to put the pipe character after the last element.

The Collectors class has a lot more ready-made collectors. Just type “Collectors.” in an IDE which supports IntelliSense and you’ll be able to view the whole list. Chances are that if you need to perform a composite MapReduce operation on a collection then you’ll find something useful here.

This post concludes our discussion on the new Stream API in Java 8.

View all posts related to Java here.

The Java Stream API part 3: the Reduce phase

Introduction

In the previous part of the Java Stream API course we looked at streams in more detail. We discussed why streams are really empty shells to describe our intentions but do not themselves contain any data. We saw the difference between terminal and intermediary operations and we looked at a couple of examples for both types. At the end of the post we discussed the first part of the MapReduce algorithm i.e. the map() and flatMap() functions.

We’ll move onto the Reduce phase of the MapReduce algorithm.

Reduce

Now that we know how to do the mapping we can look at the “Reduce” part of MapReduce. In .NET there is a range of pre-defined Reduce operations, like the classic SQL ones such as Min, Max, Sum, Average. There are similar functions – reducers – in the Stream API.

The most generic method to represent the Reduce phase is the “reduce” method. We’ll return to our Employee collection to run the examples:

List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(UUID.randomUUID(), "Elvis", 50));
        employees.add(new Employee(UUID.randomUUID(), "Marylin", 18));
        employees.add(new Employee(UUID.randomUUID(), "Freddie", 25));
        employees.add(new Employee(UUID.randomUUID(), "Mario", 43));
        employees.add(new Employee(UUID.randomUUID(), "John", 35));
        employees.add(new Employee(UUID.randomUUID(), "Julia", 55));        
        employees.add(new Employee(UUID.randomUUID(), "Lotta", 52));
        employees.add(new Employee(UUID.randomUUID(), "Eva", 42));
        employees.add(new Employee(UUID.randomUUID(), "Anna", 20)); 

Say we want to calculate the sum of the ages in the collection. Not a very useful statistics but it’s fine for the demo. We can see the Map and Reduce phases in action:

Stream<Integer> employeeAges = employees.stream().map(emp -> emp.getAge());
int totalAge = employeeAges.reduce(0, (empAge1, empAge2) -> empAge1 + empAge2);

A quick tip, the lambda expression…:

(empAge1, empAge2) -> empAge1 + empAge2

…can be substituted with the static sum() method of Integer using the :: shorthand notation:

Integer::sum

The first line maps the Employee objects into integers through a lambda expression which selects the age property of each employee. Then the stream of integers is reduced by the “reduce” function. This particular overload of the reduce function accepts an identity for the reducer function and the reducer function itself.

Let’s look at the reducer function first. It is of type BinaryOperator from the java.util.function package which we discussed in this post. It is a specialised version of the BiFunction interface which accepts two parameters and returns a third one. BinaryOperator assumes that the input and output parameters are of the same type. In the above example we want to add the ages of the employees therefore we pass in two age integers and simply add them. As the reduce function is terminal, we can read the result in “totalAge”. In its current form totalAge will be equal to 340 which is in fact the sum of the ages.

The identity field will be an initial input into the reducer. If you run the above code with an identity of 100 instead of 0 then totalAge will be 440. The identity parameter will be inserted into the equation to calculate the first result, i.e. 0 + 50 = 50, which will be passed into the second step, i.e. 50 + 18 = 68 which in turn will be used as a parameter in the next step, and so on and so forth. Note that the reductions steps may well be executed in parallel without you adding any extra code. Hence don’t assume anything about the correct ordering of the steps but it doesn’t really matter as we’re adding numbers.

To make this point clearer let’s suppose we want to multiply all ages, i.e. 50*18*25…. We’ll need to change the age values otherwise not even a long will be able to hold the total. Let’s go with some small numbers – and risk being accused of favouring child employment:

List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(UUID.randomUUID(), "Elvis", 1));
        employees.add(new Employee(UUID.randomUUID(), "Marylin", 2));
        employees.add(new Employee(UUID.randomUUID(), "Freddie", 3));
        employees.add(new Employee(UUID.randomUUID(), "Mario", 4));
        employees.add(new Employee(UUID.randomUUID(), "John", 5));
        employees.add(new Employee(UUID.randomUUID(), "Julia", 6));        
        employees.add(new Employee(UUID.randomUUID(), "Lotta", 7));
        employees.add(new Employee(UUID.randomUUID(), "Eva", 8));
        employees.add(new Employee(UUID.randomUUID(), "Anna", 9)); 

What do you think will be the result of the below calculation?

Stream<Integer> employeeAges = employees.stream().map(emp -> emp.getAge());
int totalAge = employeeAges.reduce(0, (empAge1, empAge2) -> empAge1 * empAge2);

Those who responded with “0” are correct. 0 is passed in as the first parameter in the first step along with the first age. 0 multiplied by any number is 0 so even the second step will yield 0 and so on. So for a multiplication you’ll need to provide 1:

Stream<Integer> employeeAges = employees.stream().map(emp -> emp.getAge());
int totalAge = employeeAges.reduce(1, (empAge1, empAge2) -> empAge1 * empAge2);

…where totalAge will hold the correct value of 362880.

The identity value has another usage as well: if the source stream is empty after a terminal operation, i.e. if “employees” has no Employee objects at all then even the “employeeAges” stream will be empty. In that case the reduce function has nothing to work on so the identity value will be returned.

Example:

List<Employee> employees = new ArrayList<>();
Stream<Integer> employeeAges = employees.stream().map(emp -> emp.getAge());
int totalAge = employeeAges.reduce(10, (empAge1, empAge2) -> empAge1 + empAge2);

totalAge will be 10.

Also, if the source stream yields only one element then the result will be that element and the identity combined.

Example:

List<Employee> employees = new ArrayList<>();
employees.add(new Employee(UUID.randomUUID(), "Elvis", 50));
Stream<Integer> employeeAges = employees.stream().map(emp -> emp.getAge());
int totalAge = employeeAges.reduce(10, (empAge1, empAge2) -> empAge1 + empAge2);

totalAge will be 10 + 50 = 60.

There are other Reduce functions for streams that are pretty self-explanatory:

  • count()
  • allMatch(), noneMatch(), anyMatch()
  • min, max
  • findFirst, findAny

We will look at min, max, findFirst and findAny in the next post as they are slightly different from the others.

One last note before we finish: if you try to run two terminal operations on the same stream then you’ll get an exception. You can only execute one terminal operation on a stream and it will be closed after that. To prevent that you should avoid assigning a variable to the stream and instead call [collection].stream() every time you want to create a new stream.

In the next post we’ll take a look at cases when the reducer function may not return anything.

View all posts related to Java here.

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: