Contents

Stream Gatherers in practice Part 1

Łukasz Rola

15 Mar 2024.10 minutes read

Stream Gatherers in practice Part 1 webp image

Java 22 introduces an interesting preview feature with JEP 461: Stream Gatherers, a pivotal upgrade to the Stream API. This feature centers around a new gather method, enhancing the API's customization, flexibility, and efficiency. It's designed to:

  • Elevate the Stream API's adaptability, allowing for more tailored stream operations.
  • Enhance code reusability for stream-based tasks.
  • Simplify the understanding and implementation of intricate stream operations.

Crucially, these improvements come without significantly complicating the existing Stream API.

This exploration is divided into two parts: this article delves into the foundational concepts of Gatherers and their application in practical operations. The subsequent article will explore built-in gatherers and how they can be combined to construct intricate operations.

All code examples presented in this article are available for further exploration and experimentation under a given repository.

Understanding the Gatherer Interface

The Gatherer interface is a cornerstone of JEP 461, enabling the customization of stream operations in Java 22. It consists of several key methods that facilitate complex data processing in a streamlined and efficient manner:

  • Initializer Method: This method is optional and is particularly relevant when an internal state is utilized in the integrator or finisher. In such cases, the Initializer Method is responsible for properly initializing this internal state before any data processing begins.
  • Integrator Method: Must always be implemented. This is the core method where the actual data processing logic is defined. It is essential in every Gatherer to dictate how each stream element is processed and incorporated into the current state.
  • Finisher Method: Can be implemented optionally. It is used for final processing and transformations after all elements have been integrated.
  • Combiner Method: Override is optional. This method is crucial for parallel stream operations as it defines how to combine results from different stream segments. It becomes relevant when the stream is processed in parallel and should be tailored to your specific parallel processing requirements.
  • AndThen Method: Override is optional. Default implementation allows to compose gatherer from other gatherers.

The Stream::gather method integrates seamlessly with the Gatherer interface, allowing for a more expressive and flexible approach to stream processing. By passing a Gatherer instance to Stream::gather, developers can craft custom operations tailored to specific needs, overcoming the limitations of predefined operations in the Stream API.

Implementation of custom Gatherers

In the following section, we'll explore the practical application of the gather method introduced in Java 22's Stream API. To demonstrate its versatility and power, we'll implement a series of custom gatherers that operate on a specially designed Money class. These examples will showcase how the gather method can simplify and enhance operations on complex data types, offering insights into both the utility and innovative potential of this new feature. As previously mentioned, Stream Gatherers are currently in the preview phase, so to enable this functionality, we must apply the JVM flag --enable-preview.

public record Money(BigDecimal amount, Currency currency) {
   public Money add(Money money) {
       return new Money(money.amount.add(this.amount), currency);
   }

   public Money multiply(BigDecimal multiplier) {
       return new Money(amount.multiply(multiplier), currency);
   }
}

Distinct by Gatherer

To highlight the utility of the gather method, we'll start with a DistinctByGatherer. This gatherer ensures stream elements are unique based on a selector function, a functionality not natively supported by the Stream API.

public class DistinctByGatherer<T, P> implements Gatherer<T, Set<P>, T> {

   private final Function<T, P> selector;

   public DistinctByGatherer(Function<T, P> selector) {
       this.selector = selector;
   }

   @Override
   public Supplier<Set<P>> initializer() {
       return HashSet::new;
   }

   @Override
   public Integrator<Set<P>, T, T> integrator() {
       return Integrator.ofGreedy((state, item, downstream) -> {
           P extracted = selector.apply(item);
           if(!state.contains(extracted)) {
               state.add(extracted);
               downstream.push(item);
           }

           return true;
       });
   }
}

In this implementation, the DistinctByGatherer uses:

  • An initializer to establish an internal state, specifically, a set that tracks encountered elements.
  • An integrator, which ensures the uniqueness of elements based on a specified selector function. This component operates by:
    • state: Maintains an internal set, initialized by the initializer() function, to track unique elements.
    • item: The current element from the input stream being processed.
    • downstream: The subsequent stage in the pipeline, to which the unique element is forwarded if it passes the uniqueness check.

To simplify usage, we introduce a factory class MyGatherers:

public class MyGatherers {
   public static <T, P> DistinctByGatherer<T, P> distinctBy(Function<T, P> extractor) {
       return new DistinctByGatherer<>(extractor);
   }
}

Using this gatherer with a Money class:

var money = List.of(
       new Money(BigDecimal.valueOf(12), PLN),
       new Money(BigDecimal.valueOf(11), EUR),
       new Money(BigDecimal.valueOf(15), PLN)
);

System.out.println("\nDistinct By Currency:");
money.stream()
       .gather(MyGatherers.distinctBy(Money::currency))
       .forEach(System.out::println);

Output:

Distinct By Currency:
Money[amount=12, currency=PLN]
Money[amount=11, currency=EUR]

The DistinctByGatherer serves as a sophisticated enhancement of the standard Stream::distinct method, offering greater flexibility and advanced filtering capabilities based on custom-defined criteria, beyond mere object equality.

Reduce by Gatherer

Next, we'll explore the ReduceByGatherer, which aggregates elements in a stream based on a selector function. Unlike the DistinctByGatherer, this gatherer combines elements that occur more than once, rather than skipping duplicates.

public class ReduceByGatherer<T, P> implements Gatherer<T, Map<P, T>, T> {
   private final Function<T, P> selector;
   private final BiFunction<T, T, T> operation;

   public ReduceByGatherer(Function<T, P> extractor, BiFunction<T, T, T> reducer) {
       this.selector = extractor;
       this.operation = reducer;
   }

   @Override
   public Supplier<Map<P, T>> initializer() {
       return HashMap::new;
   }

   @Override
   public Integrator<Map<P, T>, T, T> integrator() {
       return Integrator.ofGreedy((state, item, _) -> {
           state.merge(selector.apply(item), item, operation);
           return true;
       });
   }

   @Override
   public BiConsumer<Map<P, T>, Downstream<? super T>> finisher() {
       return (state, downstream) -> state.values().forEach(downstream::push);
   }
}

Here's the revised and more comprehensive version of the section on the key aspects of the ReduceByGatherer:

Key aspects of ReduceByGatherer:

  • State Management: Employs a HashMap for state maintenance, enabling effective aggregation of stream elements.
  • Integrator Implementation: Integrates elements by merging them during the stream processing, guided by the specified selector and reducer functions. Unlike previous approaches that directly send processed elements downstream, this method requires storing them in a HashMap for state management.
  • Finisher Implementation: Takes on the role of finalizing the process, pushing the accumulated values to the downstream after all elements have been handled. This approach is crucial for reduction operations, where the collective processing of all elements is necessary before deriving a final result.
  • Complete Stream Processing: Unlike operations that support early termination (like takeWhile), ReduceByGatherer necessitates the full processing of all stream elements to get correct results.

To incorporate ReduceByGatherer into our utility class:

public class MyGatherers {
   public static <T, P> DistinctByGatherer<T, P> distinctBy(Function<T, P> extractor) {
       return new DistinctByGatherer<>(extractor);
   }

   public static <T, P> ReduceByGatherer<T, P> reduceBy(Function<T, P> extractor, BiFunction<T, T, T> reducer) {
       return new ReduceByGatherer<>(extractor, reducer);
   }
}

Example usage with the Money class:

var money = List.of(
       new Money(BigDecimal.valueOf(12), PLN),
       new Money(BigDecimal.valueOf(11), EUR),
       new Money(BigDecimal.valueOf(15), PLN)
);

System.out.println("Reduce By Currency");
money.stream()
       .gather(MyGatherers.reduceBy(Money::currency, Money::add))
       .forEach(System.out::println);

Output:

Reduce By Currency
Money[amount=27, currency=PLN]
Money[amount=11, currency=EUR]

This gatherer efficiently consolidates elements of a stream, making it ideal for compacting or aggregating data in a finite stream.

Max by Gatherer

The MaxByGatherer is designed to find the maximum element in a stream based on a specified selector function. This implementation is more complex as it includes an additional class for state management and a combiner function, enhancing its capability for parallel stream operations.

class MaxByGatherer<T, B extends Comparable<B>> implements Gatherer<T, State<T>, T> {
   private final Function<T, B> extractor;

   MaxByGatherer(Function<T, B> extractor) {
       this.extractor = extractor;
   }

   @Override
   public Supplier<State<T>> initializer() {
       return State::new;
   }

   @Override
   public Integrator<State<T>, T, T> integrator() {
       return Integrator.ofGreedy((state, item, _) -> {
           if (state.maxElement == null) {
               state.maxElement = item;
               return true;
           }

           B currentItemValue = extractor.apply(item);
           B maxItemValue = extractor.apply(state.maxElement);

           if (currentItemValue.compareTo(maxItemValue) > 0) {
               state.maxElement = item;
           }

           return true;
       });
   }

   @Override
   public BinaryOperator<State<T>> combiner() {

       return (first, second) -> {
           if (first.maxElement == null && second.maxElement == null) {
               return null;
           } else if (first.maxElement == null) {
               return second;
           } else if (second.maxElement == null) {
               return first;
           }

           B firstMaxValue = extractor.apply(first.maxElement);
           B secondMaxValue = extractor.apply(second.maxElement);

           if (firstMaxValue.compareTo(secondMaxValue) > 0) {
               return first;
           } else {
               return second;
           }
       };
   }

   @Override
   public BiConsumer<State<T>, Downstream<? super T>> finisher() {
       return (state, downstream) -> downstream.push(state.maxElement);
   }
}

class State<T> {
   T maxElement;
}

Key Features:

  • State Management: Uses a custom State class to track the maximum element.
  • Integrator: Determines the maximum element based on the provided comparator function.
  • Combiner: Enables efficient processing in parallel streams by combining state from different segments.

This gatherer significantly enhances the Stream API's ability to handle complex computations like finding maximum values in parallel processing scenarios.

Extending MyGatherers for MaxByGatherer:

public class MyGatherers {
   public static <T, P> DistinctByGatherer<T, P> distinctBy(Function<T, P> extractor) {
       return new DistinctByGatherer<>(extractor);
   }

   public static <T, P> ReduceByGatherer<T, P> reduceBy(Function<T, P> extractor, BiFunction<T, T, T> reducer) {
       return new ReduceByGatherer<>(extractor, reducer);
   }

   public static <T, B extends Comparable<B>> MaxByGatherer<T, B> maxBy(Function<T, B> extractor) {
       return new MaxByGatherer<>(extractor);
   }
}

Example Usage:

var money = List.of(
       new Money(BigDecimal.valueOf(12), PLN),
       new Money(BigDecimal.valueOf(11), PLN),
       new Money(BigDecimal.valueOf(15), PLN)
);

System.out.println("\nMax By Amount");
money.stream()
       .parallel()
       .gather(MyGatherers.maxBy(Money::amount))
       .forEach(System.out::println);

It is worth noting that to benefit from a combiner implementation we have to use the parallel stream.

Output:

Max By Amount
Money[amount=15, currency=PLN]

This implementation showcases how MaxByGatherer can efficiently determine the maximum value in a collection.

Map Not-Null Gatherer

Consider a scenario where we have a list of Money objects mixed with null values. Our objective is to filter out the nulls and apply a transformation to the remaining elements. While this can be achieved using a combination of Stream::filter and Stream::map, let's explore a more streamlined approach inspired by Kotlin's mapNotNull function.

public class MapNotNullGatherer<T,M> implements Gatherer<T, T, M> {
   private final Function<T, M> mapper;

   public MapNotNullGatherer(Function<T, M> mapper) {
       this.mapper = mapper;
   }

   @Override
   public Integrator<T, T, M> integrator() {
       return Integrator.ofGreedy((_, item, downstream) -> {
           if(item != null) {
               downstream.push(mapper.apply(item));
           }
           return true;
       });
   }
}

Key Highlights:

  • Simplicity: This gatherer only requires implementing the integrator method, filtering out nulls and applying the mapping function to non-null elements.
  • Efficiency: It combines filtering and mapping in one step, enhancing code readability and performance.

Extending MyGatherers to include MapNotNullGatherer:

public class MyGatherers {
   public static <T, P> DistinctByGatherer<T, P> distinctBy(Function<T, P> extractor) {
       return new DistinctByGatherer<>(extractor);
   }

   public static <T, P> ReduceByGatherer<T, P> reduceBy(Function<T, P> extractor, BiFunction<T, T, T> reducer) {
       return new ReduceByGatherer<>(extractor, reducer);
   }

   public static <T, B extends Comparable<B>> MaxByGatherer<T, B> maxBy(Function<T, B> extractor) {
       return new MaxByGatherer<>(extractor);
   }

   public static <T, M> MapNotNullGatherer<T, M> mapNotNull(Function<T, M> mapper) {
       return new MapNotNullGatherer<>(mapper);
   }
}

Usage Example:

var moneyWithNulls = Arrays.asList(
       new Money(BigDecimal.valueOf(12), PLN),
       null,
       new Money(BigDecimal.valueOf(11), EUR),
       new Money(BigDecimal.valueOf(15), PLN),
       null
);

System.out.println("Map not-null Gatherer");
moneyWithNulls.stream()
       .gather(MyGatherers.mapNotNull(m -> m.multiply(BigDecimal.TWO)))
       .forEach(System.out::println);

Output:

Map not-null Gatherer
Money[amount=24, currency=PLN]
Money[amount=22, currency=EUR]
Money[amount=30, currency=PLN]

This implementation highlights how custom gatherers can simplify and enhance stream operations, particularly in cases requiring both filtering and transformation.

Find first Gatherer

The FindFirstGatherer is an innovative adaptation of the standard Stream::findFirst method, with two key distinctions:

  • Stream Output: Unlike Stream::findFirst which returns an Optional<T>, this gatherer returns a Stream<T>. This results in either an empty stream or a stream with a single element, depending on whether the predicate is fulfilled.
  • Predicate-Based Filtering: It operates based on a predicate, selectively processing elements until the first match is found.
public class FindFirstGatherer<T> implements Gatherer<T, T, T> {
   private final Predicate<T> predicate;

   public FindFirstGatherer(Predicate<T> predicate) {
       this.predicate = predicate;
   }

   @Override
   public Integrator<T, T, T> integrator() {
       return Integrator.ofGreedy((_, item, downstream) -> {
           if (predicate.test(item)) {
               downstream.push(item);
               return false;
           } else {
               return true;
           }
       });
   }
}

The implementation stops processing further elements once an item fulfilling the predicate is found. In such a case the integrator returns false.

Extending MyGatherers with FindFirstGatherer:

public class MyGatherers {
   public static <T, P> DistinctByGatherer<T, P> distinctBy(Function<T, P> extractor) {
       return new DistinctByGatherer<>(extractor);
   }

   public static <T, P> ReduceByGatherer<T, P> reduceBy(Function<T, P> extractor, BiFunction<T, T, T> reducer) {
       return new ReduceByGatherer<>(extractor, reducer);
   }

   public static <T, B extends Comparable<B>> MaxByGatherer<T, B> maxBy(Function<T, B> extractor) {
       return new MaxByGatherer<>(extractor);
   }

   public static <T, M> MapNotNullGatherer<T, M> mapNotNull(Function<T, M> mapper) {
       return new MapNotNullGatherer<>(mapper);
   }

   public static <T> FindFirstGatherer<T> findFirst(Predicate<T> predicate) {
       return new FindFirstGatherer<>(predicate);
   }
}

Usage Example:

var money = List.of(
       new Money(BigDecimal.valueOf(12), PLN),
       new Money(BigDecimal.valueOf(11), PLN),
       new Money(BigDecimal.valueOf(15), PLN)
);

System.out.println("Find first Gatherer");
money.stream()
       .gather(MyGatherers.findFirst(m -> m.currency().equals(PLN)))
       .forEach(System.out::println);

Output:

Find first Gatherer
Money[amount=12, currency=PLN]

This gatherer demonstrates an elegant solution for situations where only the first matching element is needed, streamlining the process in a more functional style.

Summary

In this article, we've taken a comprehensive look at JEP-461: Stream Gatherers, a preview feature introduced in Java 22. We began by unraveling its fundamental concepts and then crafted a variety of operations to demonstrate its utility in specific contexts. Through these explorations, it's clear that this new abstraction in the Stream API significantly enhances customizability, reusability, and the overall comprehension of stream operations, marking a positive evolution in Java's data processing capabilities.

In the next article, we will fully unleash the potential of Stream Gatherers by exploring built-in gatherers and the possibility to compose Gatherer from the other Gatherers.

Let us know your thoughts about this enhancement and its potential to become a stable feature in future Java releases. In my view, it's an exciting development that makes working with streams more enjoyable and efficient.

Reviewed by: Rafał Maciak, Sebastian Rabiej

Blog Comments powered by Disqus.