Contents

Stream Gatherers in practice Part 2

Łukasz Rola

11 Apr 2024.8 minutes read

Stream Gatherers in practice Part 2 webp image

This second article continues our exploration of Stream Gatherers, introduced in Java 22 with JEP 461. After detailing the Gatherer interface and custom operations in the first part, we now shift focus to built-in gatherers, examining their capabilities. Furthermore, we'll delve into constructing complex operations by combining these built-in gatherers, demonstrating the enhanced functionality and versatility they bring to Java's Stream API. This exploration aims to illustrate the practical applications and benefits of these powerful tools in stream processing.

As with the first article, all code examples discussed are accessible in the GitHub repository.

Built in Gatherers overview

This section delves into built-in gatherers like fold, mapConcurrent, scan, windowFixed, and windowSliding, offering a brief overview followed by practical demonstrations.

Fold

Gatherer combines elements into a single result as the stream processes, similar to Stream::reduce. However, unlike reduce, which wraps the outcome in an Optional, Fold directly returns a stream with the final aggregated value.

Let’s consider a simple example of factorial calculation:

System.out.println("Fold example:");
Stream.of(1, 2, 3, 4, 5)
       .gather(Gatherers.fold(() -> 1, (a, b) -> a * b))
       .forEach(System.out::println);

As we can see, the factory method of fold consumes two arguments:

  • initial: Supplier that provides the initial value for the folding operation
  • folder: BiFunction utilized for the sequential combination of elements

Output:

Fold example:
120

Map Concurrent

The "Map Concurrent" operation allows for concurrent execution of functions across stream elements, capped by a maximum concurrency level and leveraging virtual threads, ensuring stream order is preserved.

Example usage for doubling each element:

System.out.println("Map concurrent example:");
Stream.of(1, 2, 3, 4, 5)
       .gather(Gatherers.mapConcurrent(4, a -> a *2))
       .forEach(System.out::println);

This method takes two parameters:

  • maxConcurrency: sets the desired level of concurrency.
  • mapper: Function applied to each element

The output demonstrates efficient parallel processing, doubling each number while maintaining the original order:

Map concurrent example:
2
4
6
8
10

Scan

The Gatherer performs a Prefix Scan, which incrementally accumulates results using specified functions. It starts with an initial value from a Supplier, and for each stream element, a BiFunction is applied to the current and next element. The accumulated result is then passed downstream, allowing for the dynamic construction of a sequence of progressively accumulated values.

Let's revisit the scan method to compute factorials once more:

System.out.println("Scan example:");
Stream.of(1, 2, 3, 4, 5)
       .gather(Gatherers.scan(()->1, (a,b) -> a*b))
       .forEach(System.out::println);

Unlike the fold method, with the scan method, the stream displays each step of the calculation rather than just the final outcome:

Scan example:
1
2
6
24
120

Window Fixed

Organizes stream elements into sequential, ordered groups of a predetermined size. No windows are formed from an empty stream. The concluding window might hold fewer elements if the stream's total doesn't perfectly divide by the window size.

Let's examine an example where we divide stream elements into windows, each containing two elements:

System.out.println("Window fixed example:");
Stream.of(1, 2, 3, 4, 5)
       .gather(Gatherers.windowFixed(2))
       .forEach(System.out::println);

The output is:

Window fixed example:
[1, 2]
[3, 4]
[5]

The observation here is that the final grouping contains only one element due to the stream having an odd number of elements, which does not evenly divide by the window size of two.

Window Sliding

The "Window Sliding" Gatherer segments elements into dynamically overlapping windows based on a specified size. Each new window shifts by one element from the previous, maintaining a continuous flow of data. This operation is distinct in generating multiple, overlapping windows, ensuring some elements appear in consecutive windows for a comprehensive view of the stream's sequence.

Let's apply the windowSliding operation to the same stream as before:

System.out.println("Window sliding example:");
Stream.of(1, 2, 3, 4, 5)
       .gather(Gatherers.windowSliding(2))
       .forEach(System.out::println);

This time, we receive the following output:

Window sliding example:
[1, 2]
[2, 3]
[3, 4]
[4, 5]

This time, we observe an increased number of windows, containing two elements. Notably, some elements appear in two consecutive windows.

Build in Gatherers in practice

Now that we're familiar with how built-in gatherers operate, let's apply them to tackle more practical challenges, including ensuring safe animal distribution, simulating games between players, and tracking bank account balance history.

Safe animal distribution

In addressing the challenge of safe animal distribution, we consider a sequence containing three types of animals:

  • wolf,
  • sheep,
  • sheepdog.

The crucial requirement is to avoid any consecutive triple of animals where a wolf and a sheep are present without the protective presence of a sheepdog, as this poses a risk to the sheep. Our objective is to implement a validation logic that checks if an animal stream fulfills this requirement. To begin, we define these animal types using an enum, setting the foundation for our logic implementation.

public enum Animal {
   SHEEP, SHEEP_DOG, WOLF
}

Moving forward, we will design a Gatherer that encapsulates this validation logic:

public class AnimalStreamUtils {

   public static Gatherer<Animal, ?, Boolean> isValidSequence() {
       Predicate<List<Animal>> validTriplePredicate = triple -> !(
               triple.contains(SHEEP) && triple.contains(WOLF) && !triple.contains(SHEEP_DOG)
       );

       var tripleWindowGatherer = Gatherers.<Animal>windowSliding(3);
       var areAllTriplesValidGatherer = Gatherers.<List<Animal>, Boolean>fold(
               () -> true,
               (result, triple) -> result && validTriplePredicate.test(triple)
       );

       return tripleWindowGatherer.andThen(areAllTriplesValidGatherer);
   }
}

Important points:

  • AnimalStreamUtils: Introduces a utility class with a factory method for streamlined Gatherer integration, enhancing ease of use.
  • validTriplePredicate: Evaluates triples to ensure they're safe, preventing scenarios where a wolf could endanger a sheep.
  • tripleWindowGatherer: Generates sequential animal triples for analysis, ensuring comprehensive sequence evaluation.
  • areAllTriplesValidGatherer: Verifies the safety of all animal triples against the established criteria.
  • Gatherer:andThen: Method is used to combine gatherers into one: \
    stream.gather(a.andThen(b)) is equivalent to stream.gather(a).gather(b).

After developing the Gatherer, let's proceed to test its application.

var validSequence = List.of(SHEEP, SHEEP_DOG, WOLF, WOLF);
System.out.println("Valid animal sequence result:");
validSequence.stream()
       .gather(AnimalStreamUtils.isValidSequence())
       .forEach(System.out::println);

var invalidSequence = List.of(SHEEP_DOG, SHEEP, WOLF, WOLF);
System.out.println("\nInvalid animal sequence result:");
invalidSequence.stream()
       .gather(AnimalStreamUtils.isValidSequence())
       .forEach(System.out::println);

After execution, we receive the expected output:

Valid animal sequence result:
true
Invalid animal sequence result:
false

Games simulation

Moving on to another challenge, we aim to create pairs of players and simulate games between them. A game can end in a win for either player or a draw.

We begin with a basic Player model, which only includes a name to identify each player in our game simulations:

public record Player(String name) {
}

Next, we introduce a Game interface with two distinct implementations: CompletedGame, representing a game that has concluded successfully, and InvalidGame, used in scenarios where a sufficient number of players was not available.

public interface Game {
   Game INVALID_GAME = new InvalidGame();
   static Game completedGame(Player player1, Player player2, GameResult result) {
       return new CompletedGame(player1, player2, result);
   }
}

record CompletedGame(Player player1, Player player2, GameResult result) implements Game {
}

class InvalidGame implements Game {
   @Override
   public String toString() {
       return "INVALID GAME";
   }
}

The GameResult is defined as an enum with outcomes for either the player winning or the game ending in a draw. It includes a method to generate a random result for each game.

public enum GameResult {
   PLAYER1_WON, PLAYER2_WON, DRAW;

   private static final Random RANDOM = new Random();
   public static GameResult randomResult() {
       return switch (RANDOM.nextInt(3)) {
           case 0 -> PLAYER1_WON;
           case 1 -> PLAYER2_WON;
           default -> DRAW;
       };
   }
}

Following that, we proceed to develop a Gatherer designed for pairing players and orchestrating their games, effectively managing the matchmaking process.

public class GameGatherers {

   public static Gatherer<Player, ?, Game> performGamesGatherer() {
       Gatherer<Player,?, List<Player>> pairPlayersGatherer = Gatherers.windowFixed(2);
       Gatherer<List<Player>,?, Game> simulateGamesGatherer = Gatherers.mapConcurrent(4, players -> {
           if(players.size() != 2) {
               return Game.INVALID_GAME;
           }
           return Game.completedGame(players.get(0), players.get(1), GameResult.randomResult());
       });

       return pairPlayersGatherer.andThen(simulateGamesGatherer);
   }
}

Important points:

  • pairPlayersGatherer: Pairs players ensuring each participates in only one game, using windowFixed to manage matchups.
  • simulateGamesGatherer: It simulates matches between pairs of players. If there's not enough players for a pair, INVALID_GAME is returned. Otherwise, a game is created with a randomly determined result.
  • andThen: Finally, we merge the two gatherers to achieve the expected goal.

Now, let's put the Gatherer into action:

var listOfPlayers = List.of(
       new Player("John"),
       new Player("Marry"),
       new Player("George"),
       new Player("Ann"),
       new Player("Pete"),
       new Player("Stuart"),
       new Player("Adam")
);

System.out.println("\nGames result:");
listOfPlayers.stream()
       .gather(GameGatherers.performGamesGatherer())
       .forEach(System.out::println);

The output is generated as follows:

Games result:
CompletedGame[player1=Player[name=John], player2=Player[name=Marry], result=PLAYER2_WON]
CompletedGame[player1=Player[name=George], player2=Player[name=Ann], result=DRAW]
CompletedGame[player1=Player[name=Pete], player2=Player[name=Stuart], result=DRAW]
INVALID GAME

We see that the last game is marked as invalid due to an uneven number of players, illustrating how our setup handles scenarios where complete pairing isn't feasible.

Bank account balance history

For our final task, we revisit the Money record initially discussed in our introductory Stream Gatherers article.

public record Money(BigDecimal amount, Currency currency) {
   public Money add(Money money) {
       return new Money(money.amount.add(this.amount), currency);
   }

   public Money multiply(BigDecimal multiplier) {
       return new Money(amount.multiply(multiplier), currency);
   }
}

Considering Money as a representation of both bank account balance and transactions, our aim is to trace the balance history across a series of transactions. The solution involves crafting a Gatherer, outlined below, that efficiently accumulates transaction impacts on the initial balance.

public class MoneyUtils {

   public static Gatherer<Money, ?, Money> computeBalanceHistory(Money balance){
       return Gatherers.scan(() -> balance, Money::add);
   }
}

Essential aspects include:

  • The initial balance, supplied to the computeBalanceHistory method, serves as the starting point for the calculation.
  • The Gatherers::scan function is used to update the balance with each transaction in the stream, reflecting financial changes.

Now, let's put the Gatherer into practice:

var transactions = List.of(
       new Money(BigDecimal.valueOf(-10), PLN),
       new Money(BigDecimal.valueOf(20), PLN),
       new Money(BigDecimal.valueOf(50), PLN)
);

var balance = new Money(BigDecimal.valueOf(270), PLN);

System.out.println("\n Balance history:");
transactions.stream()
       .gather(MoneyUtils.computeBalanceHistory(balance))
       .forEach(System.out::println);

The results are as follows:

Balance history:
Money[amount=260, currency=PLN]
Money[amount=280, currency=PLN]
Money[amount=330, currency=PLN]

While this solution may appear brief, it can be useful in various use cases.

Summary

In this followup article about Stream Gatherers, we delved into built-in functionalities like fold, mapConcurrent, scan, windowFixed, and windowSliding. We began with an overview and then applied these gatherers to solve more practical problems, demonstrating their usability. The process often required the strategic combination of multiple gatherers using the andThen method to reach the desired outcomes, showcasing the versatility and utility of these powerful tools in stream processing.

We see that gatherers are made to create customized solutions, but their built-in features also present valuable opportunities for stream operations, simplifying and enhancing the way streams are handled.

We're curious about your thoughts on these built-in gatherers. Do you find them useful? Are there any functionalities you feel are missing?

Reviewed by: Sebastian Rabiej

Blog Comments powered by Disqus.