Creating custom Java 8 Stream collectors

The streams feature (which makes heavy use of Functional Interfaces) is arguably the strongest feature in Java 8 (and above). The introduction of lambdas in Java in the same release to target Functional Interfaces also meant that creating chained operations in a functional style has never been easier in Java.

That being said, there are plenty of examples in the official docs that show how streams can be “collected” (effectively reduced/folded depending on your preference of the terminology) into Collection types – List, Set, or even Map. Now, I must make it absolutely clear at this stage that I love Java streams, and barring the absence (or rather, deprecation) of the zip iterator, it’s almost comprehensive. However, there are no examples in the docs to show how we might a series of intermediate operations into a custom type. Of course the helper class, Collectors has several helper methods such as groupingBy, partitioningBy, filtering, and reducing, but they either return a Map, or expect a reducible expression which may not always be the case as explained next.

Recently, I did a project in which I needed to process a stream of integers (the lack of zip forced me to take quite the peripatetic approach to finally make things work. Perhaps more on that in a later blogpost) acting as indices into a simple wrapper around a List of integers, and then ultimately collect the updated values of the list into a new instance of the custom type. It was quite an interesting experience that sparked interest in exploring how much more we could push the collect mechanism. (If you are interested in checking out the code for the mentioned example, you can find it here – Functional Nim.

For some more examples of custom Collector implementations, you can check out my Github page.

Use Case

For the purposes of this blog, to keep things simple, let us consider a a hypothetical example. Suppose we have a Point class with the following structure:

package com.z0ltan.custom.collectors.types;

public class Point {
	private int x;
	private int y;

	public Point(final int x, final int y) {
		this.x = x;
		this.y = y;
	}

	public int x() {
		return this.x;
	}

	public int y() {
		return this.y;
	}

	@Override
	public int hashCode() {
		return (this.x + this.y) % 31;
	}

	@Override
	public boolean equals(Object other) {
		if (other == null || !(other instanceof Point)) {
			return false;
		}

		Point p = (Point) other;

		return p.x == this.x && p.y == this.y;
	}

	@Override
	public String toString() {
		return "Point { x = " + x + ", y = " + y + " }";
	}
}

and we have a List of such points. Now suppose we wish to collate the information into a custom Points object which has the following structure:

package com.z0ltan.custom.collectors.types;

import java.util.Set;

public class Points {
	private Set<Integer> xs;
	private Set<Integer> ys;

	public Points(final Set<Integer> xs, final Set<Integer> ys) {
		this.xs = xs;
		this.ys = ys;
	}

	@Override
	public int hashCode() {
		return (this.xs.hashCode() + this.ys.hashCode()) % 31;
	}

	@Override
	public boolean equals(Object other) {
		if (other == null || !(other instanceof Points)) {
			return false;
		}

		Points p = (Points) other;

		return p.xs.equals(this.xs) && p.ys.equals(this.ys);
	}

	@Override
	public String toString() {
		return "Points { xs = " + xs + ", ys = " + ys + " }";
	}
}

As can be seen, we wish to retain only the unique x and y coordinate values into our final object.

A simple and logical way would be to collect the items from this stream (collect being a “terminal operation” defined in the Stream interface) into our Points object using a custom collector. Before we can do that, let us first understand what is involved in implementing a custom collector.

How to implement a custom collector

In brief, there are two forms of operations involved in Java streams – non-terminal or intermediate operations, which produce streams of their own, and terminal operations, which effectively stop the pipeline resulting in a final result of some sort.

As mentioned before, in most cases, the helper class, Collectors, provides enough functionality to cater to almost any requirement imaginable. However, in cases such as these, where we want to collect data into a custom type, we might be better off defining our own custom collector.

To do that, let us examine the signature of the collect method in the Stream interface. In fact, we will find that there are two versions of this method available to us:

 R collect​(Collector collector)

and

 R collect​(Supplier supplier,
              BiConsumer accumulator,
              BiConsumer combiner)

So which one do we use? Well, we can actually use either one for our purposes. In fact, the former is preferable if we have some involved logic, and we wish to encapsulate all of that in a nice class. However, functionally speaking, the latter is exactly the same. This can be further clarified by examining the Collector interface (showing only the abstract methods):

public interface Collector<T, A, R> {
        Supplier<A>	supplier​()
        BiConsumer<A,T>	accumulator​();	
        BinaryOperator<A>	combiner​()	
        Function<A,R>	finisher​()
        Set<Collector.Characteristics>	characteristics​()	
 }

As can be seen, if we do implement the Collector interface, we will have to implement essentially the same methods as that used by the second version of the collect method. In addition, we have a couple of extra methods which are not only interesting, but quite vital if we wish to implement the interface:

  • finisher: This is the main method that we need to implement as per our collection logic. This is the actual part where the accumulated values of the stream are massaged together into the final return value. The type parameters give a big hint in this regard – the return type,
    R is the same as that returned by the overall collect
    method.
  • characteristics: This is where we need to be careful. The enum has three variants – CONCURRENT, IDENTITY_FINISH, and UNORDERED. The bottomline is this – always use CONCURRENT for your custom types if the final value depends on the ordering of the values in the stream, or use UNORDERED if they do not. In the case of collecting values into a custom non-collection type, I don’t see any scenario where you would want to use IDENTITY_FINISH (unless you are a big fan of unsolicited ClassCastExceptionS).

    In short, this variant indicates that the finisher function is essentially an identity function, meaning that it can be skipped, and the currently accumulated value returned as the overall result (which is precisely what we wish to avoid).

One final comment to understand the collect method once and for all – what all those terms mean!

  • Supplier: This is the mechanism by which the input values are supplied to the collect method.
  • Accumulator: This is where the elements of the stream are combined with a running accumulator (which may be of a different type from the elements themselves), “reduced”, or “folded” in Functional terms.
  • Combiner: Similar to the accumulator, but the elements being combined together are of the same type. In most cases, this type would be a collection type, and finally,
  • Finisher: This is the meat of the whole collector. This is where the actual custom logic goes into to take the values produced by the combiner into the final result of the given return type.

Now that we’ve analysed the signature of the collect method, we must be in a position to realise that we can actually create custom collectors in multiple ways:

  • Using the static of methods in the Collector interface by supplying the correct supplier, accumulator, combiner, finisher, and Collector characteristics,
  • By creating a class that implements the Collector interface itself, and thereby providing implementations of the same supplier, accumulator, combiner, finisher, and Collector characteristics,
  • By simply creating any anonymous class conforming to the Collector interface, and providing the same inputs as in the previous two cases, or
  • Using any combination of the above.

To keep matters simple, let us create a custom class that implements the Collector interface. This will not only make things easier to understand, but also allow us to maintain code cleanliness.

Now let’s proceed with the implementation of the given use case to solidify these concepts.

Implementation and Demo

Let’s create a simple Maven project called custom_stream_collectors:

Macushla:Blog z0ltan$ mvn archetype:generate -DgroupId=com.z0ltan.custom.collectors -DartifactId=custom-collector -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------

               <elided>

[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 30.967 s
[INFO] Finished at: 2017-07-11T20:57:45+05:30
[INFO] Final Memory: 18M/62M
[INFO] ------------------------------------------------------------------------

After customising the project to our heart’s desire, let’s fill in our custom collector class:

package com.z0ltan.custom.collectors.collectors;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import com.z0ltan.custom.collectors.types.Point;
import com.z0ltan.custom.collectors.types.Points;

public class PointToPointsCollector implements Collector<Point, List<Point>, Points> {
	@Override
	public Supplier<List<Point>> supplier() {
		return ArrayList::new;
	}

	@Override
	public BiConsumer<List<Point>, Point> accumulator() {
		return List::add;
	}

	@Override
	public BinaryOperator<List<Point>> combiner() {
		return (acc, ps) -> {
			acc.addAll(ps);
			return acc;
		};
	}

	@Override
	public Function<List<Point>, Points> finisher() {
		return (points) -> {
			final Set<Integer> xs = new HashSet<>();
			final Set<Integer> ys = new HashSet<>();
			
			for (Point p : points) {
				xs.add(p.x());
				ys.add(p.y());
			}
			
			return new Points(xs, ys);
		};
	}

	@Override
	public Set<java.util.stream.Collector.Characteristics> characteristics() {
		return Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED));
	}
}

We use ArrayList::new (method references are another excellent feature in Java 8 and beyond) for our Supplier since we start off with a blank slate, and for the Accumulator, we use List::add since the last section made it clear that the accumulator’s only job is to keep collecting items into running value of another type (a List in this case).

Then we have the Combiner which is implemented by the little lambda expression:

   (acc, ps) -> { acc.addAll(ps); return acc; }

As mentioned in the previous section, the combiner simply flattens the collections together into a single collection. In case of confusion, always look to the type signature for clarity.

And finally, we have the Finisher:

return (points) -> {
			final Set<Integer> xs = new HashSet<>();
			final Set<Integer> ys = new HashSet<>();
			
			for (Point p : points) {
				xs.add(p.x());
				ys.add(p.y());
			}
			
			return new Points(xs, ys);
		};

At this point of the stream pipeline, the points variable holds the list of accumulated Point objects. All we do then is to create an instance of the Points class by using the data available in the points variable. The whole point (if you will forgive the pun) is that this method will have logic peculiar to your specific use case, so the implementation will vary tremendously (which is more than can be said about the others – supplier, accumulator, and combiner).

And finally, here is our main class:

package com.z0ltan.custom.collectors;

import java.util.Arrays;
import java.util.List;

import com.z0ltan.custom.collectors.collectors.PointToPointsCollector;
import com.z0ltan.custom.collectors.types.Point;
import com.z0ltan.custom.collectors.types.Points;

public class Main {
	public static void main(String[] args) {
		final List<Point> points = Arrays.asList(new Point(1, 2), new Point(1, 2), new Point(3, 4), new Point(4, 3),
				new Point(2, 5), new Point(2, 5));

		// the result of our custom collector
		final Points pointsData = points.stream().collect(new PointToPointsCollector());

		System.out.printf("\npoints = %s\n", points);
		System.out.printf("\npoints data = %s\n", pointsData);
	}
}

Well, let’s run it and see the output!

Macushla:custom-collector z0ltan$ mvn package && java -jar target/custom-collector-1.0-SNAPSHOT.jar
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building custom-collector 1.0-SNAPSHOT
        <elided>
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.627 s
[INFO] Finished at: 2017-07-11T21:31:19+05:30
[INFO] Final Memory: 16M/55M
[INFO] ------------------------------------------------------------------------

points = [Point { x = 1, y = 2 }, Point { x = 1, y = 2 }, Point { x = 3, y = 4 }, Point { x = 4, y = 3 }, Point { x = 2, y = 5 }, Point { x = 2, y = 5 }]

points data = Points { xs = [1, 2, 3, 4], ys = [2, 3, 4, 5] }

Success!

Creating custom Java 8 Stream collectors