Contents

Java data processing using modern concurrent programming

Java data processing using modern concurrent programming webp image

May the Flows be with you!

We are happy to announce that Jox library now has a new feature called Flows: bringing the rich concurrency features of Reactive Streams, while keeping the simplicity of synchronous Java programming!

Inspired by Scala counter-part library Ox and Kotlin Implementation, Flows provide easy and user-friendly API for data processing in concurrent environment leveraging Java Structured Concurrency and Virtual Threads.

In this article I will introduce Flows and its possibilities step by step, starting from simple synchronous processing and finishing with more advanced asynchronous computations.

What are Flows?

A single Flow represents a stream of values that can be processed by the defined pipeline synchronously or asynchronously. Processing of the Flow only begins when any of the run* methods is called, which makes them “lazy”.

Flows are very similar to reactive streams, however their main goal is to provide a high level API for defining synchronous and asynchronous data processing pipelines. Despite the conceptual difference, Flows can be treated as reactive streams. They can be transformed into, or created from specification and TCK compliant Publisher.

Flows are created on top of Jox's Channels and Jox's Structured Concurrency. If you want to know more about Structured Concurrency you can check out this article

How to start with Flows?

Adding a dependencies

Maven:

<dependency>
    <groupId>com.softwaremill.jox</groupId>
    <artifactId>flows</artifactId>
    <version>0.4.0</version>
</dependency>

Gradle:

implementation 'com.softwaremill.jox:flows:0.4.0'

Simple Flows

Now that we have all the dependencies, let's see what flows have to offer.

Flows class comes with a variety of factory methods that makes it easy to create a flow, one of them is range, which creates a Flow generating values within specified (inclusive) range.

Let's start with this simple finite flow, that iterates from 1 to 4 with step 1 and then prints each value to the console.:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 4, 1)
                .runForeach(System.out::println);
    }
}

Output:

1
2
3
4

Flows are cold

This means that nothing happens unless we call any of run* methods. In the above example we call runForeach, which consumes each produced value, and prints it to the console.

We can define the pipeline as we want, and trigger execution at the end, even multiple times.

Note: Flows created from sources that can be read only once, will work only for the first time, Flow does not "cache" generated values.

Example:

import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;

import java.util.List;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flow<Integer> flow = Flows.fromIterable(List.of(1, 2, 3));
        flow.runForeach(System.out::println);
        flow.runForeach(System.out::println);
    }
}

Output:

1
2
3
1
2
3

Infinite Flows

Flows API allows the creation of infinite flows. One of the possibilities is by using Flows.tick(Duration interval, T value) method, which generates value every interval

Example:

import com.softwaremill.jox.flows.Flows;

import java.time.Duration;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.tick(Duration.ofMillis(100), "SML Rules!")
                .runForeach(System.out::println);
    }
}

Output:

SML Rules!
SML Rules!
SML Rules!
…

The above Flow generates "SML Rules!" every 100ms. Values will be generated as long as the main thread is not interrupted, however there is a possibility to limit the number of items that are processed further by using take(n) method, which passes only first n elements to the downstream.

Custom Flows

Aside from predefined factory methods, you can use Flows.usingEmit() to create a Flow that produces any amount of items you need.

Method provides an emit instance that allows you to generate elements that are passed to the downstream.

Example:

import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.usingEmit(emit -> {
            emit.apply(1);
            emit.apply(2);
        }).runForeach(System.out::println);
}

Output:

1
2

Processing the values

Producing the value is one thing, but sometimes we need to manipulate it. Flows allows you to do that in many ways. As an example, let's use map and multiply our values by 10:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 4, 1)
                .map(i -> i * 10)
                .runForeach(System.out::println);
    }
}

Output:

10
20
30
40

map allows you to convert an incoming value into another value or a completely different type. Flows support multiple map* methods which you can explore here

Values can be also filtered by using filter method:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 4, 1)
                .map(i -> i * 10)
                .filter(i -> i < 30)
                .runForeach(System.out::println);
    }
}

Output:

10
20

At this point, you probably see some similarities to Java Streams, and that is true. Some of the methods are very similar, others are not, some are missing, some you won't find in Java Streams. Keep in mind that Flows are designed to provide a simple API for concurrent data processing, not to replace Java Streams.

Order of computations

In the above scenarios each produced value was processed entirely before the next value was produced.
Let's demonstrate using the tap method, which allows us to "take a look" at the current value, before it is processed further (just like Java Stream's peek).

In the first tap method, we will print the value, and in the second, we will throw a RuntimeException when the value is greater than 2.

Example:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 5, 1)
                .tap(System.out::println)
                .tap(i -> {
                    if (i > 2) {
                        throw new RuntimeException("Boom!");
                    }
                })
                .runDrain(); // consume the values and discard the result
    }
}

Output:

1
2
3
Exception in thread "main" java.lang.RuntimeException: Boom!

As you can see, values 4 and 5 were not printed, because an exception was thrown when processing value 3. This means that both values were not consumed by the first tap method, after the exception was thrown. Such behavior concludes that, by default, values are processed by the whole pipeline one by one.

Advanced processing

After covering basic synchronous operations, it’s time to get up to speed with asynchronous processing.

Buffers

The easiest way to introduce asynchronous processing is to use the buffer method. It allows upstream (methods used before buffer) to produce values, save them to buffer, and consume them in a separate thread by downstream (methods used after buffer). Values in the upstream are processed until the buffer is full, and only freeing space in the buffer by the downstream resumes processing. Exceptions that happen in the upstream are propagated via channel to the downstream.

Let’s demonstrate that all values are printed before writing them to the buffer, even if downstream should throw an exception for values greater than 1.

Let's see how it works:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 4, 1)
                .tap(i -> System.out.printf("%s: Hello from first tap: %d\n", Thread.currentThread(),  i))
                .buffer(10)
                .tap(i -> {
                    System.out.printf("%s, Hello from second tap: %d\n", Thread.currentThread(), i);
                    if (i > 1) {
                        throw new RuntimeException("boom!");
                    }
                })
                .runDrain();
    }
}

Output:

VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 3
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 4
Thread[#1,main,5,main]: Hello from second tap: 1
Thread[#1,main,5,main]: Hello from second tap: 2
Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: java.lang.RuntimeException: boom!

As you can see, all values were produced before reaching the buffer, however only the first 2 values have been processed after that. Also the exception is different, the original RuntimeException is wrapped in JoxScopeExecutionException. This topic will be covered in the later section.

You can see that upstream was run and processed by a VirtualThread, whereas the rest was processed by the main thread. This means that we introduced async processing without much effort! Don't worry, the created thread will be terminated once the Flow is finished successfully or an exception is thrown.

It’s also worth mentioning that this result might vary depending on the thread's order of excecution, meaning that the exception can be thrown before all upstream values are buffered.

Let's now take a look, what will happen if we reduce size of the buffer to 1 and increase the range to 10:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 10, 1)
                .tap(i -> System.out.printf("%s: Hello from first tap: %d\n", Thread.currentThread(), i))
                .buffer(1)
                .tap(i -> {
                    System.out.printf("%s: Hello from second tap: %d\n", Thread.currentThread(), i);
                    if (i > 1) {
                        throw new RuntimeException("boom!");
                    }
                })
                .runDrain();
    }
}

Output:

VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 3
Thread[#1,main,5,main]: Hello from second tap: 1
Thread[#1,main,5,main]: Hello from second tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 4
Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: java.lang.RuntimeException: boom!

You may notice that before the exception is thrown, Hello from first tap: 4 is printed. This might be counterintuitive, but let’s explain it.

We need to take a closer look at the buffer method. Under the hood it uses channel with capacity taken from the argument. This means that thread suspension (wait) is fully managed by channel. In our case, we can have a maximum of one value waiting to be read.

If we would like to write more values, we need to wait until the buffered value is consumed from the channel. The wait itself happens before writing the value to the buffer, so we already processed the value, and it is just waiting to be sent.

This is exactly what happens in our scenario, you can read more about the channels here

The processing probably looked like this (order of some operations might be different and the result would be the same):

  1. Virtual Thread produces value 1 and pushes it to the buffer
  2. Virtual Thread produces value 2 -> value is processed, and waits for space in the buffer
  3. Main thread reads value 1
  4. Virtual Thread pushes value 2 to the buffer
  5. Virtual Thread produces value 3 -> value is processed, and waits for space in the buffer
  6. Main thread prints value 1
  7. Main thread reads and prints value 2
  8. Virtual Thread pushes value 3 to the buffer
  9. Virtual Thread produces value 4 -> value is processed, and waits for space in the buffer
  10. Exception is thrown

Parallel mapping

Another useful method provided by Flows is mapPar(int parallelism, mappingFunction), which allows to apply a given mappingFunction in up to parallelism virtual threads at the same time. Let’s take a look at example:

import com.softwaremill.jox.flows.Flows;

import java.util.List;

public class Demo {
    public static void main(String[] args) throws Exception {
        List<Integer> results = Flows.range(1, 10, 1)
                .mapPar(5, i -> {
                    System.out.printf("Executing %d in %s\n", i, Thread.currentThread());
                    return i * 2;
                })
                .runToList();
        System.out.println(results);
    }
}

Output:

Executing 1 in VirtualThread[#32]/runnable@ForkJoinPool-1-worker-2
Executing 2 in VirtualThread[#37]/runnable@ForkJoinPool-1-worker-5
Executing 3 in VirtualThread[#38]/runnable@ForkJoinPool-1-worker-2
Executing 4 in VirtualThread[#39]/runnable@ForkJoinPool-1-worker-7
Executing 5 in VirtualThread[#40]/runnable@ForkJoinPool-1-worker-2
Executing 6 in VirtualThread[#42]/runnable@ForkJoinPool-1-worker-7
Executing 7 in VirtualThread[#43]/runnable@ForkJoinPool-1-worker-2
Executing 8 in VirtualThread[#44]/runnable@ForkJoinPool-1-worker-3
Executing 10 in VirtualThread[#46]/runnable@ForkJoinPool-1-worker-7
Executing 9 in VirtualThread[#45]/runnable@ForkJoinPool-1-worker-4
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

You can see that each value was processed by a different virtual thread, and that values were not necessarily processed in sequence, however the list with results preserves the order. If you don’t care about the order you can use a similar mapParUnordered method.

Output of the mapPar method is buffered and the buffer size can be controlled by FLOW#CHANNEL_BUFFER_SIZE ScopedValue.

Grouping

Flows also allows to group values into sub-flows by the calculated key, process the sub-flows in separate threads and merge their results into a single flow. Each sub-flow value is processed by the same thread.

Method signature looks as follows groupBy(int parallelism, groupingFunction, childFlowTransform), parallelism controls maximum number of threads executing child flows, groupingFunction is used to calculate the grouping key and childFlowTransform is used to process the child flow. Let’s take a look at example:

import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;

import java.util.List;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flow.ChildFlowTransformer<Integer, Integer, String> childFlowTransformer = mod -> {
            return f -> f.map(i -> {
                System.out.printf("%d: %s\n", i, Thread.currentThread());
                return mod == 0 ? "even" : "odd";
            });
        };
        List<String> result = Flows.range(1, 10, 1)
                .groupBy(5, i -> i % 2, childFlowTransformer)
                .runToList();
        System.out.println(result);
    }
}

The groupingFunction is a simple modulo 2 operation, so all even numbers will be directed to the one child flow, and all odd numbers will be directed to the second one.

childFlowTransformer is a lambda taking a grouping key as an argument and returning another lambda that takes values matched to the given key, and returning modified flow - in our case, we transform the flow into a string odd or even.

Output:

1: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-1
2: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3
3: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
5: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
7: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
4: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
9: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
6: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
8: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
10: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
[odd, even, odd, odd, odd, even, odd, even, even, even]

Looking at the output, you can notice that all odd values were processed by one thread, and all even by different one. Overall there were 2 threads working. You may also notice that order is not preserved in this case.

Usage of Scoped Values

Scoped Values are a new Java feature introduced in Java 21 for the first preview. Their main purpose is to provide a programming model to share data both within a thread and with child threads. You can read more about it in following JEP 446

To avoid passing additional parameters, some of the Flow methods use Flow#CHANNEL_BUFFER_SIZE Scoped Value when internally creating a channel. Whenever such a situation occurs it is mentioned in the Java Doc of the given method.

The example method can be buffer without the parameter:

ScopedValue.runWhere(Flow.CHANNEL_BUFFER_SIZE, 3, () -> {
    Flows.fromValues(1, 2, 3)
        .buffer();
});

Is the same as:

Flows.fromValues(1, 2, 3)
    .buffer(3);

If the Flow#CHANNEL_BUFFER_SIZE value is not passed, the Channel#DEFAULT_BUFFER_SIZE is used.

Completion listeners

Flows support 3 completion listeners:

  1. onComplete - runs provided method when Flow completes successfully or when an error occurs
  2. onDone - runs provided method when Flow completes successfully
  3. onError - runs provided method when error was thrown

If multiple listeners are used, then they are called in the order they are defined in the Flow.

Listeners can be defined in any place in the pipeline and they will be triggered, however it is advised to use them at the end of the pipeline, as some methods change this behavior.

Example:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.fromValues(1, 2, 3)
                .onComplete(() -> System.out.println("Hello from first on complete"))
                .onError(e -> System.out.println("Hello from first error"))
                .tap(i -> {
                    if (i > 1) throw new RuntimeException();
                })
                .onDone(() -> System.out.println("Hello from on done"))
                .onError(e -> System.out.println("Hello from second error"))
                .onComplete(() -> System.out.println("Hello from second on complete"))
                .runForeach(System.out::println);
    }
}

Output:

1
Hello from first on complete
Hello from first error
Hello from second error
Hello from second on complete

This behaviour might change if buffer or mapPar or any other method that uses Channel under the hood is used. Exceptions are caught in those methods and propagated to the downstream via channel. As upstream is processed by a different thread, all values might be “generated” before an exception is thrown in the downstream.

Example:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.fromValues(1, 2, 3)
                .onComplete(() -> System.out.println("Hello from first on complete"))
                .onError(e -> System.out.println("Hello from first error"))
                .onDone(() -> System.out.println("Hello from first on done"))
                .mapPar(4, i -> {
                    if (i > 1) throw new RuntimeException();
                    return i;
                })
                .onDone(() -> System.out.println("Hello from second on done"))
                .onError(e -> System.out.println("Hello from second error"))
                .onComplete(() -> System.out.println("Hello from second on complete"))
                .runForeach(System.out::println);
    }
}

Output:

Hello from first on complete
Hello from first on done
Hello from second error
Hello from second on complete

As you can see, onDone was triggered despite an error being thrown in the mapPar method. This happened only because all values were produced before an exception was thrown, if the exception was thrown earlier, onError method would be triggered. Completion listeners after mapPar were triggered in the correct order.

Giving a bigger range as examples shows correct error propagation.

Error propagated correctly

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.range(1, 100, 1)
                .onComplete(() -> System.out.println("Hello from first on complete"))
                .onError(e -> System.out.println("Hello from first error"))
                .onDone(() -> System.out.println("Hello from first on done"))
                .mapPar(4, i -> {
                    if (i == 1) throw new RuntimeException();
                    return i;
                })
                .onDone(() -> System.out.println("Hello from second on done"))
                .onError(e -> System.out.println("Hello from second error"))
                .onComplete(() -> System.out.println("Hello from second on complete"))
                .runForeach(System.out::println);
    }
}

Output

Hello from first on complete
Hello from first error
Hello from second error
Hello from second on complete

Exception handling

To correctly handle exceptions we should wrap the whole execution of the Flow in a try/catch block. Usually exceptions that happen while processing the Flow are thrown directly and can be caught as they appear. However whenever methods that use Scope or Channel are called, exceptions are wrapped in JoxScopeExecutionException and ChannelErrorException respectively. If a method uses both Scope and Channel, the original exception will be wrapped in both exceptions.

This is why in the previous example, introducing buffer caused the original exception to be wrapped in JoxScopeExecutionException.
If you are not sure if the given method wraps the original exception, please refer to JavaDoc of the method, as it is mentioned there explicitly.

Example:

import com.softwaremill.jox.flows.Flows;

public class Demo {
    public static void main(String[] args) throws Exception {
        Flows.fromValues(1, 2, 3)
                .mapPar(4, i -> {
                    if (1 == 1) throw new RuntimeException();
                    return i;
                })
                .runForeach(System.out::println);
    }
}

Output:

Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: com.softwaremill.jox.ChannelErrorException: java.lang.RuntimeException

Conclusion

Flows provide a nice and simple API that allows you to process data with modern concurrency by leavering Structured Concurrency. It can be used on top of existing logic where you can benefit from concurrent execution, or it can be used when developing new features or projects.

As Flows is the library of the future, it uses Java Preview Features: Structured Concurrency and Scoped Values so keep in mind that it requires additional JVM parameter to run. It also means that there might be some backward-incompatible changes introduced with new versions.

If you are excited about the project you can share it with your friends or colleagues, or even post on social media (sic!)

If you would like to help with development or contribute you can follow our GitHub Repository!
Remember Leaving a Star or even liking an issue is already a great help!

Reviewed by: Łukasz Lenart, Adam Warski

Blog Comments powered by Disqus.