Java data processing using modern concurrent programming
May the Flows be with you!
We are happy to announce that Jox library now has a new feature called Flows
: bringing the rich concurrency features of Reactive Streams, while keeping the simplicity of synchronous Java programming!
Inspired by Scala counter-part library Ox and Kotlin Implementation, Flows
provide easy and user-friendly API for data processing in concurrent environment leveraging Java Structured Concurrency
and Virtual Threads
.
In this article I will introduce Flows
and its possibilities step by step, starting from simple synchronous processing and finishing with more advanced asynchronous computations.
What are Flows?
A single Flow
represents a stream of values that can be processed by the defined pipeline synchronously or asynchronously. Processing of the Flow
only begins when any of the run*
methods is called, which makes them “lazy”.
Flows
are very similar to reactive streams, however their main goal is to provide a high level API for defining synchronous and asynchronous data processing pipelines. Despite the conceptual difference, Flows
can be treated as reactive streams. They can be transformed into, or created from specification and TCK compliant Publisher.
Flows are created on top of Jox's Channels and Jox's Structured Concurrency. If you want to know more about Structured Concurrency
you can check out this article
How to start with Flows?
Adding a dependencies
Maven:
<dependency>
<groupId>com.softwaremill.jox</groupId>
<artifactId>flows</artifactId>
<version>0.4.0</version>
</dependency>
Gradle:
implementation 'com.softwaremill.jox:flows:0.4.0'
Simple Flows
Now that we have all the dependencies, let's see what flows have to offer.
Flows
class comes with a variety of factory methods that makes it easy to create a flow, one of them is range
, which creates a Flow
generating values within specified (inclusive) range.
Let's start with this simple finite flow, that iterates from 1
to 4
with step 1
and then prints each value to the console.:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 4, 1)
.runForeach(System.out::println);
}
}
Output:
1
2
3
4
Flows are cold
This means that nothing happens unless we call any of run*
methods. In the above example we call runForeach
, which consumes each produced value, and prints it to the console.
We can define the pipeline as we want, and trigger execution at the end, even multiple times.
Note:
Flows
created from sources that can be read only once, will work only for the first time,Flow
does not "cache" generated values.
Example:
import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;
import java.util.List;
public class Demo {
public static void main(String[] args) throws Exception {
Flow<Integer> flow = Flows.fromIterable(List.of(1, 2, 3));
flow.runForeach(System.out::println);
flow.runForeach(System.out::println);
}
}
Output:
1
2
3
1
2
3
Infinite Flows
Flows
API allows the creation of infinite flows. One of the possibilities is by using Flows.tick(Duration interval, T value)
method, which generates value
every interval
Example:
import com.softwaremill.jox.flows.Flows;
import java.time.Duration;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.tick(Duration.ofMillis(100), "SML Rules!")
.runForeach(System.out::println);
}
}
Output:
SML Rules!
SML Rules!
SML Rules!
…
The above Flow generates "SML Rules!"
every 100ms. Values will be generated as long as the main thread is not interrupted, however there is a possibility to limit the number of items that are processed further by using take(n)
method, which passes only first n
elements to the downstream.
Custom Flows
Aside from predefined factory methods, you can use Flows.usingEmit()
to create a Flow
that produces any amount of items you need.
Method provides an emit
instance that allows you to generate elements that are passed to the downstream.
Example:
import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.usingEmit(emit -> {
emit.apply(1);
emit.apply(2);
}).runForeach(System.out::println);
}
Output:
1
2
Processing the values
Producing the value is one thing, but sometimes we need to manipulate it. Flows
allows you to do that in many ways. As an example, let's use map
and multiply our values by 10:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 4, 1)
.map(i -> i * 10)
.runForeach(System.out::println);
}
}
Output:
10
20
30
40
map
allows you to convert an incoming value into another value or a completely different type. Flows support multiple map*
methods which you can explore here
Values can be also filtered by using filter
method:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 4, 1)
.map(i -> i * 10)
.filter(i -> i < 30)
.runForeach(System.out::println);
}
}
Output:
10
20
At this point, you probably see some similarities to Java Streams, and that is true. Some of the methods are very similar, others are not, some are missing, some you won't find in Java Streams. Keep in mind that Flows are designed to provide a simple API for concurrent data processing, not to replace Java Streams.
Order of computations
In the above scenarios each produced value was processed entirely before the next value was produced.
Let's demonstrate using the tap
method, which allows us to "take a look" at the current value, before it is processed further (just like Java Stream's peek
).
In the first tap
method, we will print the value, and in the second, we will throw a RuntimeException
when the value is greater than 2
.
Example:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 5, 1)
.tap(System.out::println)
.tap(i -> {
if (i > 2) {
throw new RuntimeException("Boom!");
}
})
.runDrain(); // consume the values and discard the result
}
}
Output:
1
2
3
Exception in thread "main" java.lang.RuntimeException: Boom!
As you can see, values 4
and 5
were not printed, because an exception was thrown when processing value 3
. This means that both values were not consumed by the first tap
method, after the exception was thrown. Such behavior concludes that, by default, values are processed by the whole pipeline one by one.
Advanced processing
After covering basic synchronous operations, it’s time to get up to speed with asynchronous processing.
Buffers
The easiest way to introduce asynchronous processing is to use the buffer
method. It allows upstream
(methods used before buffer
) to produce values, save them to buffer, and consume them in a separate thread by downstream
(methods used after buffer
). Values in the upstream
are processed until the buffer
is full, and only freeing space in the buffer by the downstream
resumes processing. Exceptions that happen in the upstream
are propagated via channel to the downstream
.
Let’s demonstrate that all values are printed before writing them to the buffer, even if downstream
should throw an exception for values greater than 1
.
Let's see how it works:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 4, 1)
.tap(i -> System.out.printf("%s: Hello from first tap: %d\n", Thread.currentThread(), i))
.buffer(10)
.tap(i -> {
System.out.printf("%s, Hello from second tap: %d\n", Thread.currentThread(), i);
if (i > 1) {
throw new RuntimeException("boom!");
}
})
.runDrain();
}
}
Output:
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 3
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 4
Thread[#1,main,5,main]: Hello from second tap: 1
Thread[#1,main,5,main]: Hello from second tap: 2
Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: java.lang.RuntimeException: boom!
As you can see, all values were produced before reaching the buffer
, however only the first 2 values have been processed after that. Also the exception is different, the original RuntimeException
is wrapped in JoxScopeExecutionException
. This topic will be covered in the later section.
You can see that upstream was run and processed by a VirtualThread
, whereas the rest was processed by the main
thread. This means that we introduced async processing without much effort! Don't worry, the created thread will be terminated once the Flow is finished successfully or an exception is thrown.
It’s also worth mentioning that this result might vary depending on the thread's order of excecution, meaning that the exception can be thrown before all upstream
values are buffered.
Let's now take a look, what will happen if we reduce size of the buffer
to 1
and increase the range to 10
:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 10, 1)
.tap(i -> System.out.printf("%s: Hello from first tap: %d\n", Thread.currentThread(), i))
.buffer(1)
.tap(i -> {
System.out.printf("%s: Hello from second tap: %d\n", Thread.currentThread(), i);
if (i > 1) {
throw new RuntimeException("boom!");
}
})
.runDrain();
}
}
Output:
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 3
Thread[#1,main,5,main]: Hello from second tap: 1
Thread[#1,main,5,main]: Hello from second tap: 2
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1: Hello from first tap: 4
Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: java.lang.RuntimeException: boom!
You may notice that before the exception is thrown, Hello from first tap: 4
is printed. This might be counterintuitive, but let’s explain it.
We need to take a closer look at the buffer
method. Under the hood it uses channel
with capacity taken from the argument. This means that thread suspension (wait) is fully managed by channel
. In our case, we can have a maximum of one value waiting to be read.
If we would like to write more values, we need to wait until the buffered value is consumed from the channel
. The wait itself happens before writing the value to the buffer, so we already processed the value, and it is just waiting to be sent.
This is exactly what happens in our scenario, you can read more about the channels here
The processing probably looked like this (order of some operations might be different and the result would be the same):
- Virtual Thread produces value 1 and pushes it to the buffer
- Virtual Thread produces value 2 -> value is processed, and waits for space in the buffer
- Main thread reads value 1
- Virtual Thread pushes value 2 to the buffer
- Virtual Thread produces value 3 -> value is processed, and waits for space in the buffer
- Main thread prints value 1
- Main thread reads and prints value 2
- Virtual Thread pushes value 3 to the buffer
- Virtual Thread produces value 4 -> value is processed, and waits for space in the buffer
- Exception is thrown
Parallel mapping
Another useful method provided by Flows
is mapPar(int parallelism, mappingFunction)
, which allows to apply a given mappingFunction
in up to parallelism
virtual threads at the same time. Let’s take a look at example:
import com.softwaremill.jox.flows.Flows;
import java.util.List;
public class Demo {
public static void main(String[] args) throws Exception {
List<Integer> results = Flows.range(1, 10, 1)
.mapPar(5, i -> {
System.out.printf("Executing %d in %s\n", i, Thread.currentThread());
return i * 2;
})
.runToList();
System.out.println(results);
}
}
Output:
Executing 1 in VirtualThread[#32]/runnable@ForkJoinPool-1-worker-2
Executing 2 in VirtualThread[#37]/runnable@ForkJoinPool-1-worker-5
Executing 3 in VirtualThread[#38]/runnable@ForkJoinPool-1-worker-2
Executing 4 in VirtualThread[#39]/runnable@ForkJoinPool-1-worker-7
Executing 5 in VirtualThread[#40]/runnable@ForkJoinPool-1-worker-2
Executing 6 in VirtualThread[#42]/runnable@ForkJoinPool-1-worker-7
Executing 7 in VirtualThread[#43]/runnable@ForkJoinPool-1-worker-2
Executing 8 in VirtualThread[#44]/runnable@ForkJoinPool-1-worker-3
Executing 10 in VirtualThread[#46]/runnable@ForkJoinPool-1-worker-7
Executing 9 in VirtualThread[#45]/runnable@ForkJoinPool-1-worker-4
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
You can see that each value was processed by a different virtual thread, and that values were not necessarily processed in sequence, however the list with results preserves the order. If you don’t care about the order you can use a similar mapParUnordered
method.
Output of the mapPar
method is buffered and the buffer size can be controlled by FLOW#CHANNEL_BUFFER_SIZE
ScopedValue.
Grouping
Flows
also allows to group values into sub-flows by the calculated key, process the sub-flows in separate threads and merge their results into a single flow. Each sub-flow value is processed by the same thread.
Method signature looks as follows groupBy(int parallelism, groupingFunction, childFlowTransform)
, parallelism
controls maximum number of threads executing child flows, groupingFunction
is used to calculate the grouping key and childFlowTransform
is used to process the child flow. Let’s take a look at example:
import com.softwaremill.jox.flows.Flow;
import com.softwaremill.jox.flows.Flows;
import java.util.List;
public class Demo {
public static void main(String[] args) throws Exception {
Flow.ChildFlowTransformer<Integer, Integer, String> childFlowTransformer = mod -> {
return f -> f.map(i -> {
System.out.printf("%d: %s\n", i, Thread.currentThread());
return mod == 0 ? "even" : "odd";
});
};
List<String> result = Flows.range(1, 10, 1)
.groupBy(5, i -> i % 2, childFlowTransformer)
.runToList();
System.out.println(result);
}
}
The groupingFunction
is a simple modulo 2 operation, so all even numbers will be directed to the one child flow, and all odd numbers will be directed to the second one.
childFlowTransformer
is a lambda taking a grouping key as an argument and returning another lambda that takes values matched to the given key, and returning modified flow - in our case, we transform the flow into a string odd
or even
.
Output:
1: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-1
2: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3
3: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
5: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
7: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
4: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
9: VirtualThread[#33]/runnable@ForkJoinPool-1-worker-5
6: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
8: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
10: VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1
[odd, even, odd, odd, odd, even, odd, even, even, even]
Looking at the output, you can notice that all odd
values were processed by one thread, and all even
by different one. Overall there were 2 threads working. You may also notice that order is not preserved in this case.
Usage of Scoped Values
Scoped Values are a new Java feature introduced in Java 21 for the first preview. Their main purpose is to provide a programming model to share data both within a thread and with child threads. You can read more about it in following JEP 446
To avoid passing additional parameters, some of the Flow methods use Flow#CHANNEL_BUFFER_SIZE
Scoped Value when internally creating a channel. Whenever such a situation occurs it is mentioned in the Java Doc of the given method.
The example method can be buffer
without the parameter:
ScopedValue.runWhere(Flow.CHANNEL_BUFFER_SIZE, 3, () -> {
Flows.fromValues(1, 2, 3)
.buffer();
});
Is the same as:
Flows.fromValues(1, 2, 3)
.buffer(3);
If the Flow#CHANNEL_BUFFER_SIZE
value is not passed, the Channel#DEFAULT_BUFFER_SIZE
is used.
Completion listeners
Flows
support 3 completion listeners:
onComplete
- runs provided method whenFlow
completes successfully or when an error occursonDone
- runs provided method whenFlow
completes successfullyonError
- runs provided method when error was thrown
If multiple listeners are used, then they are called in the order they are defined in the Flow
.
Listeners can be defined in any place in the pipeline and they will be triggered, however it is advised to use them at the end of the pipeline, as some methods change this behavior.
Example:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.fromValues(1, 2, 3)
.onComplete(() -> System.out.println("Hello from first on complete"))
.onError(e -> System.out.println("Hello from first error"))
.tap(i -> {
if (i > 1) throw new RuntimeException();
})
.onDone(() -> System.out.println("Hello from on done"))
.onError(e -> System.out.println("Hello from second error"))
.onComplete(() -> System.out.println("Hello from second on complete"))
.runForeach(System.out::println);
}
}
Output:
1
Hello from first on complete
Hello from first error
Hello from second error
Hello from second on complete
This behaviour might change if buffer
or mapPar
or any other method that uses Channel
under the hood is used. Exceptions are caught
in those methods and propagated to the downstream
via channel
. As upstream
is processed by a different thread, all values might be “generated” before an exception is thrown in the downstream.
Example:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.fromValues(1, 2, 3)
.onComplete(() -> System.out.println("Hello from first on complete"))
.onError(e -> System.out.println("Hello from first error"))
.onDone(() -> System.out.println("Hello from first on done"))
.mapPar(4, i -> {
if (i > 1) throw new RuntimeException();
return i;
})
.onDone(() -> System.out.println("Hello from second on done"))
.onError(e -> System.out.println("Hello from second error"))
.onComplete(() -> System.out.println("Hello from second on complete"))
.runForeach(System.out::println);
}
}
Output:
Hello from first on complete
Hello from first on done
Hello from second error
Hello from second on complete
As you can see, onDone
was triggered despite an error being thrown in the mapPar
method. This happened only because all values were produced before an exception was thrown, if the exception was thrown earlier, onError
method would be triggered. Completion listeners after mapPar
were triggered in the correct order.
Giving a bigger range as examples shows correct error propagation.
Error propagated correctly
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.range(1, 100, 1)
.onComplete(() -> System.out.println("Hello from first on complete"))
.onError(e -> System.out.println("Hello from first error"))
.onDone(() -> System.out.println("Hello from first on done"))
.mapPar(4, i -> {
if (i == 1) throw new RuntimeException();
return i;
})
.onDone(() -> System.out.println("Hello from second on done"))
.onError(e -> System.out.println("Hello from second error"))
.onComplete(() -> System.out.println("Hello from second on complete"))
.runForeach(System.out::println);
}
}
Output
Hello from first on complete
Hello from first error
Hello from second error
Hello from second on complete
Exception handling
To correctly handle exceptions we should wrap the whole execution of the Flow
in a try/catch
block. Usually exceptions that happen while processing the Flow
are thrown directly and can be caught as they appear. However whenever methods that use Scope
or Channel
are called, exceptions are wrapped in JoxScopeExecutionException
and ChannelErrorException
respectively. If a method uses both Scope
and Channel
, the original exception will be wrapped in both exceptions.
This is why in the previous example, introducing buffer
caused the original exception to be wrapped in JoxScopeExecutionException
.
If you are not sure if the given method wraps the original exception, please refer to JavaDoc
of the method, as it is mentioned there explicitly.
Example:
import com.softwaremill.jox.flows.Flows;
public class Demo {
public static void main(String[] args) throws Exception {
Flows.fromValues(1, 2, 3)
.mapPar(4, i -> {
if (1 == 1) throw new RuntimeException();
return i;
})
.runForeach(System.out::println);
}
}
Output:
Exception in thread "main" com.softwaremill.jox.structured.JoxScopeExecutionException: com.softwaremill.jox.ChannelErrorException: java.lang.RuntimeException
Conclusion
Flows
provide a nice and simple API that allows you to process data with modern concurrency by leavering Structured Concurrency
. It can be used on top of existing logic where you can benefit from concurrent
execution, or it can be used when developing new features or projects.
As Flows
is the library of the future, it uses Java Preview Features: Structured Concurrency
and Scoped Values
so keep in mind that it requires additional JVM parameter to run. It also means that there might be some backward-incompatible changes introduced with new versions.
If you are excited about the project you can share it with your friends or colleagues, or even post on social media (sic!)
If you would like to help with development or contribute you can follow our GitHub Repository!
Remember Leaving a Star or even liking an issue is already a great help!
Reviewed by: Łukasz Lenart, Adam Warski