Flows - simple Java asynchronous data processing in action
In the previous Java data processing using modern concurrent programming
I introduced the new Jox library feature Flows
. In this article, I would like to show an example project that solves a real business use case utilizing Flows
. This means that detailed description of methods and concepts won’t be covered here. If you feel like you need to catch up, read the mentioned article first.
The code used in the article can be found in this repository.
Task description
Let's assume we've got the following task:
- create a new REST API endpoint that will generate
csv
trading report and return it - make sure user always gets a full report
- the report is a summary of bought items by
John Doe
in a given day - each row should contain a total amount for given day for one item
- report has the following header:
- day - day of the summary in the format
dd-MM-yyyy
- product - the name of the product
- amount - total amount of bought items in a given day
- day - day of the summary in the format
To make sure that we always send the whole report, we'll first save it to some temporary file, then send it to the user, and then remove the file at the end.
Dependencies and configuration
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.softwaremill.jox:flows:0.4.0'
implementation 'org.slf4j:slf4j-api:2.0.16'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
To be able to work with Jox
, we need to enable Java Preview
features, by adding --enable-preview
argument for compiler and test runner:
tasks.withType(JavaCompile).configureEach {
...
options.compilerArgs += "--enable-preview"
...
}
tasks.named('test') {
...
jvmArgs += "--enable-preview"
...
}
Implementation
Let's divide our task into a few classes.
We'll use ReportGenerationController
to expose the REST API GET
endpoint.
ReportGenerationController
calls DemoReportGenerator
to get final report bytes. DemoReportGenerator
calls DemoReportLinesProvider
to obtain randomly generated lines - you can replace it with your own implementation.
To test the whole stack, we will create JUnit5
Test using @SpringBootTest
annotation and TestRestTemplate
dependency.
The folder structure looks like the following:
src
├── main
│ └── java
│ └── com
│ └── softwaremill
│ └── flowsdemo
│ ├── DemoReportGenerator.java
│ ├── DemoReportLinesProvider.java
│ ├── FlowsDemoApplication.java
│ └── ReportGenerationController.java
└── test
└── java
└── com
└── softwaremill
└── flowsdemo
└── DemoReportGenerationTest.java
HTTP API
Let's see what's inside
ReportGenerationController
package com.softwaremill.flowsdemo;
import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
@RestController
public class ReportGenerationController {
@GetMapping("/report")
public ResponseEntity<StreamingResponseBody> downloadFile() {
var reportGenerator = new DemoReportGenerator();
StreamingResponseBody stream = outputStream -> {
try {
reportGenerator.generateReport(outputStream);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
ContentDisposition contentDisposition = ContentDisposition.builder("attachment")
.filename("file.csv")
.build();
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, "text/csv")
.header(HttpHeaders.CONTENT_DISPOSITION, contentDisposition.toString())
.body(stream);
}
}
There are multiple ways to send files over the REST API, but in our scenario, we'll use StreamingResponseBody
. The interface provides an OutputStream
of the response body, where we can directly write content. To give proper information for the browser, we also set Content-Type
and Content-Disposition
headers.
Let's pass outputstream
to the report generator, where we can fill it with data.
Report Generation
The generateReport
method:
void generateReport(OutputStream outputStream) throws Exception {
Path tmpReportPath = Files.createTempFile("report", UUID.randomUUID().toString());
ScopedValue.callWhere(Flow.CHANNEL_BUFFER_SIZE, 36, () -> {
Flow<ByteChunk> reportLines = DemoReportLinesProvider.produceReportLines(NUMBER_OF_ROWS);
runReportLinesToTmpFile(reportLines, tmpReportPath);
runContentFromTmpFileToOutputStream(outputStream, tmpReportPath);
return null;
});
}
It takes outputStream
as an argument, which we'll use later when reading from the temp
file, but first, we need to create it, and obtain a path
where it was created.
Once we know where to write, we can start generating the report.
We set a ScopedValue
for Flow.CHANNEL_BUFFER_SIZE
, as this will be later used in the buffer
method.
After that, we call DemoReportLinesProvider.produceReportLines(NUMBER_OF_ROWS)
to generate a given number of lines and return them via the provided flow.
Method producing lines looks like this:
static Flow<ByteChunk> produceReportLines(int numberOfRows) {
return Flows.range(1, numberOfRows, 1)
.map(DemoReportLinesProvider::reportLine)
.prepend(Flows.fromValues(ByteChunk.fromArray(HEADER_LINE.getBytes(StandardCharsets.UTF_8))))
.onDone(() -> LOGGER.info("Produced all report lines"))
.buffer();
}
private static ByteChunk reportLine(Integer i) {
var day = Instant.now()
.minus(i / PRODUCTS.size(), ChronoUnit.DAYS) // make sure all products are present for one day
.atZone(ZoneId.systemDefault());
String product = PRODUCTS.get(i % PRODUCTS.size()); // get products in round-robin
int amount = RANDOM.nextInt(100);
String line = LINE_TEMPLATE.formatted(DATE_TIME_FORMATTER.format(day), product, amount);
return ByteChunk.fromArray(line.getBytes(StandardCharsets.UTF_8));
}
We create a Flow
that produces numberOfRows
report lines.
Then we prepend
a single element producing Flow
that is responsible for generating header row
. The prepend method allows to concatenate
two Flows
, where a Flow
passed as an argument will produce elements first, and only after all elements are generated the calling Flow
will be used.
After we are done, we want to log an INFO
message about it, and we are utilizing the onDone
method for it.
To introduce asynchronous processing, we’ll leverage the buffer
method.
Flow
has a generic type of ByteChunk
class, which is a Jox
wrapper for byte[]
that provides additional methods while operating on them.
Writing content to a temp file
Method responsible for writing to file
private static void runReportLinesToTmpFile(Flow<ByteChunk> reportLines, Path tmpReportPath) throws Exception {
try {
reportLines
.toByteFlow()
.runToFile(tmpReportPath);
} catch (Exception e) {
LOGGER.error("Error while processing! %s\n", e);
Files.deleteIfExists(tmpReportPath);
throw e;
}
}
In this step, we want to consume items produced by the given Flow
and save them to the temp
file.
To be able to access I/O
operations we need to "convert" the Flow
into a dedicated sub class called ByteFlow
. Thanks to this subclass
, we can ensure that methods like runToFile
are called on Flow
containing proper types - ByteChunk
or byte[]
.
To convert from Flow<ByteChunk>
or Flow<byte[]>
to ByteFlow
you can simply call toByteFlow()
method. If you need to convert from any other type, you can use toByteFlow(ByteChunkMapper<T> f)
.
There are also static factory methods in Flows
that can create ByteFlow
directly, such as fromByteChunks
.
To write ByteFlow
to a file, we can simply call runToFile(Path)
method.
The whole Flow
is surrounded with a try/catch
block to handle exceptions gracefully.
Writing from the file to the response
As the last step, we need to read content from the temp
file and write it to the response.
private static void runContentFromTmpFileToOutputStream(OutputStream outputStream, Path tmpReportPath) throws Exception {
try {
Flows.fromFile(tmpReportPath, 128)
.runToOutputStream(outputStream);
} catch (Exception e) {
LOGGER.error("Exception while writing from file to response!");
throw e;
} finally {
Files.deleteIfExists(tmpReportPath);
LOGGER.info("Deleted tmp file!");
}
}
We can create a Flow
directly from a file
by passing Path
and chunkSize
as arguments. chunkSize
represents the number of bytes that will be emitted as one ByteChunk
in the ByteFlow
. Now all we need to do is to write all the bytes to the outputStream
.
In this configuration, read and write operations are handled by the same thread. Each read ByteChunk
is written directly to outputStream
, so we never load the whole report into memory.
This also means that tweaking the chunkSize
parameter is crucial for performance as reading and writing each chunk
is an I/O
operation.
We can also split the work of reading and writing between threads by simply using the buffer
method, which can give more flexibility in tweaking the performance.
Testing
Let's create a simple test that will check if our solution works. As we are operating on the HTTP layer, it will be an Integration test that starts up Spring Boot Application, and performs a HTTP call.
package com.softwaremill.flowsdemo;
import com.softwaremill.jox.flows.Flows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) // start up web server on random port
public class DemoReportGenerationTest {
@LocalServerPort // inject randomly chosen port
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void shouldGenerateReport() throws Exception {
//given
String url = "http://localhost:%d/report".formatted(port);
// when
byte[] report = this.restTemplate.getForObject(url, byte[].class);
// then
List<String> lines = Flows.fromByteArrays(report)
.linesUtf8()
.filter(s -> !s.isBlank()) // filter out last empty line
.runToList();
assertThat(lines.getFirst()).isEqualTo("day,product,amount");
assertThat(lines).hasSize(1_001); // 1000 items + 1 header line
}
}
The test performs an HTTP
call via TestRestTemplate
. The response is saved as byte[]
, passed to Flow
and split into lines by using linesUtf8()
. At the end all elements are saved to List<String>
.
Now we can verify that the first element of the List<String>
is the header row
, and that number of elements matches the number of produced lines.
Results:
Summary
Thanks to Flows
, a nice and simple API, we are able to solve the given task with minimum effort while introducing asynchronous and concurrent processing.
As Flows
is the library of the future, it uses Java Preview Features: Structured Concurrency
and Scoped Values
so keep in mind that it requires additional JVM parameter to run. It also means that there might be some backward-incompatible changes introduced with new versions.
If you are excited about the project you can share it with your friends or colleagues, or even post on social media (sic!)
If you would like to help with development or contribute, you can follow our GitHub Repository!
Remember: Leaving a Star or even liking an issue is already a great help!
Reviewed by: Łukasz Lenart, Adam Warski