Contents

Flows - simple Java asynchronous data processing in action

Flows - simple Java asynchronous data processing in action webp image

In the previous Java data processing using modern concurrent programming
I introduced the new Jox library feature Flows. In this article, I would like to show an example project that solves a real business use case utilizing Flows. This means that detailed description of methods and concepts won’t be covered here. If you feel like you need to catch up, read the mentioned article first.

The code used in the article can be found in this repository.

Task description

Let's assume we've got the following task:

  • create a new REST API endpoint that will generate csv trading report and return it
  • make sure user always gets a full report
  • the report is a summary of bought items by John Doe in a given day
  • each row should contain a total amount for given day for one item
  • report has the following header:
    • day - day of the summary in the format dd-MM-yyyy
    • product - the name of the product
    • amount - total amount of bought items in a given day

To make sure that we always send the whole report, we'll first save it to some temporary file, then send it to the user, and then remove the file at the end.

Dependencies and configuration

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.softwaremill.jox:flows:0.4.0'
    implementation 'org.slf4j:slf4j-api:2.0.16'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

To be able to work with Jox, we need to enable Java Preview features, by adding --enable-preview argument for compiler and test runner:

tasks.withType(JavaCompile).configureEach {
    ...
    options.compilerArgs += "--enable-preview"
    ...
}
tasks.named('test') {
    ...
    jvmArgs += "--enable-preview"
    ...
}

Implementation

Let's divide our task into a few classes.

We'll use ReportGenerationController to expose the REST API GET endpoint.

ReportGenerationController calls DemoReportGenerator to get final report bytes. DemoReportGenerator calls DemoReportLinesProvider to obtain randomly generated lines - you can replace it with your own implementation.

To test the whole stack, we will create JUnit5 Test using @SpringBootTest annotation and TestRestTemplate dependency.

The folder structure looks like the following:

src
├── main
│   └── java
│      └── com
│       └── softwaremill
│           └── flowsdemo
│                   ├── DemoReportGenerator.java
│                   ├── DemoReportLinesProvider.java
│                   ├── FlowsDemoApplication.java
│                   └── ReportGenerationController.java
└── test
    └── java
        └── com
            └── softwaremill
                └── flowsdemo
                        └── DemoReportGenerationTest.java

HTTP API

Let's see what's inside

ReportGenerationController

package com.softwaremill.flowsdemo;

import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

@RestController
public class ReportGenerationController {

    @GetMapping("/report")
    public ResponseEntity<StreamingResponseBody> downloadFile() {
        var reportGenerator = new DemoReportGenerator();

        StreamingResponseBody stream = outputStream -> {
            try {
                reportGenerator.generateReport(outputStream);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };

        ContentDisposition contentDisposition = ContentDisposition.builder("attachment")
                .filename("file.csv")
                .build();
        return ResponseEntity.ok()
                .header(HttpHeaders.CONTENT_TYPE, "text/csv")
                .header(HttpHeaders.CONTENT_DISPOSITION, contentDisposition.toString())
                .body(stream);
    }

}

There are multiple ways to send files over the REST API, but in our scenario, we'll use StreamingResponseBody. The interface provides an OutputStream of the response body, where we can directly write content. To give proper information for the browser, we also set Content-Type and Content-Disposition headers.

Let's pass outputstream to the report generator, where we can fill it with data.

Report Generation

The generateReport method:

    void generateReport(OutputStream outputStream) throws Exception {
        Path tmpReportPath = Files.createTempFile("report", UUID.randomUUID().toString());

        ScopedValue.callWhere(Flow.CHANNEL_BUFFER_SIZE, 36, () -> {
            Flow<ByteChunk> reportLines = DemoReportLinesProvider.produceReportLines(NUMBER_OF_ROWS);
            runReportLinesToTmpFile(reportLines, tmpReportPath);
            runContentFromTmpFileToOutputStream(outputStream, tmpReportPath);
            return null;
        });
    }

It takes outputStream as an argument, which we'll use later when reading from the temp file, but first, we need to create it, and obtain a path where it was created.

Once we know where to write, we can start generating the report.

We set a ScopedValue for Flow.CHANNEL_BUFFER_SIZE, as this will be later used in the buffer method.

After that, we call DemoReportLinesProvider.produceReportLines(NUMBER_OF_ROWS) to generate a given number of lines and return them via the provided flow.

Method producing lines looks like this:

    static Flow<ByteChunk> produceReportLines(int numberOfRows) {
        return Flows.range(1, numberOfRows, 1)
                .map(DemoReportLinesProvider::reportLine)
                .prepend(Flows.fromValues(ByteChunk.fromArray(HEADER_LINE.getBytes(StandardCharsets.UTF_8))))
                .onDone(() -> LOGGER.info("Produced all report lines"))
                .buffer();
    }

    private static ByteChunk reportLine(Integer i) {
        var day = Instant.now()
                .minus(i / PRODUCTS.size(), ChronoUnit.DAYS) // make sure all products are present for one day
                .atZone(ZoneId.systemDefault());
        String product = PRODUCTS.get(i % PRODUCTS.size()); // get products in round-robin
        int amount = RANDOM.nextInt(100);

        String line = LINE_TEMPLATE.formatted(DATE_TIME_FORMATTER.format(day), product, amount);
        return ByteChunk.fromArray(line.getBytes(StandardCharsets.UTF_8));
    }

We create a Flow that produces numberOfRows report lines.

Then we prepend a single element producing Flow that is responsible for generating header row. The prepend method allows to concatenate two Flows, where a Flow passed as an argument will produce elements first, and only after all elements are generated the calling Flow will be used.

After we are done, we want to log an INFO message about it, and we are utilizing the onDone method for it.

To introduce asynchronous processing, we’ll leverage the buffer method.

Flow has a generic type of ByteChunk class, which is a Jox wrapper for byte[] that provides additional methods while operating on them.

Writing content to a temp file

Method responsible for writing to file

    private static void runReportLinesToTmpFile(Flow<ByteChunk> reportLines, Path tmpReportPath) throws Exception {
        try {
            reportLines
                    .toByteFlow()
                    .runToFile(tmpReportPath);
        } catch (Exception e) {
            LOGGER.error("Error while processing! %s\n", e);
            Files.deleteIfExists(tmpReportPath);
            throw e;
        }
    }

In this step, we want to consume items produced by the given Flow and save them to the temp file.

To be able to access I/O operations we need to "convert" the Flow into a dedicated sub class called ByteFlow. Thanks to this subclass, we can ensure that methods like runToFile are called on Flow containing proper types - ByteChunk or byte[].

To convert from Flow<ByteChunk> or Flow<byte[]> to ByteFlow you can simply call toByteFlow() method. If you need to convert from any other type, you can use toByteFlow(ByteChunkMapper<T> f).

There are also static factory methods in Flows that can create ByteFlow directly, such as fromByteChunks.

To write ByteFlow to a file, we can simply call runToFile(Path) method.

The whole Flow is surrounded with a try/catch block to handle exceptions gracefully.

Writing from the file to the response

As the last step, we need to read content from the temp file and write it to the response.

    private static void runContentFromTmpFileToOutputStream(OutputStream outputStream, Path tmpReportPath) throws Exception {
        try {
            Flows.fromFile(tmpReportPath, 128)
                    .runToOutputStream(outputStream);
        } catch (Exception e) {
            LOGGER.error("Exception while writing from file to response!");
            throw e;
        } finally {
            Files.deleteIfExists(tmpReportPath);
            LOGGER.info("Deleted tmp file!");
        }
    }

We can create a Flow directly from a file by passing Path and chunkSize as arguments. chunkSize represents the number of bytes that will be emitted as one ByteChunk in the ByteFlow. Now all we need to do is to write all the bytes to the outputStream.

In this configuration, read and write operations are handled by the same thread. Each read ByteChunk is written directly to outputStream, so we never load the whole report into memory.

This also means that tweaking the chunkSize parameter is crucial for performance as reading and writing each chunk is an I/O operation.

We can also split the work of reading and writing between threads by simply using the buffer method, which can give more flexibility in tweaking the performance.

Testing

Let's create a simple test that will check if our solution works. As we are operating on the HTTP layer, it will be an Integration test that starts up Spring Boot Application, and performs a HTTP call.

package com.softwaremill.flowsdemo;

import com.softwaremill.jox.flows.Flows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;

import java.nio.charset.StandardCharsets;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) // start up web server on random port
public class DemoReportGenerationTest {

    @LocalServerPort // inject randomly chosen port
    private int port;

    @Autowired
    private TestRestTemplate restTemplate;

    @Test
    void shouldGenerateReport() throws Exception {
        //given
        String url = "http://localhost:%d/report".formatted(port);

        // when
        byte[] report = this.restTemplate.getForObject(url, byte[].class);

        // then
        List<String> lines = Flows.fromByteArrays(report)
                .linesUtf8()
                .filter(s -> !s.isBlank()) // filter out last empty line
                .runToList();

        assertThat(lines.getFirst()).isEqualTo("day,product,amount");
        assertThat(lines).hasSize(1_001); // 1000 items + 1 header line
    }
}

The test performs an HTTP call via TestRestTemplate. The response is saved as byte[], passed to Flow and split into lines by using linesUtf8(). At the end all elements are saved to List<String>.

Now we can verify that the first element of the List<String> is the header row, and that number of elements matches the number of produced lines.

Results:

results

Summary

Thanks to Flows , a nice and simple API, we are able to solve the given task with minimum effort while introducing asynchronous and concurrent processing.

As Flows is the library of the future, it uses Java Preview Features: Structured Concurrency and Scoped Values so keep in mind that it requires additional JVM parameter to run. It also means that there might be some backward-incompatible changes introduced with new versions.

If you are excited about the project you can share it with your friends or colleagues, or even post on social media (sic!)

If you would like to help with development or contribute, you can follow our GitHub Repository!

Remember: Leaving a Star or even liking an issue is already a great help!

Reviewed by: Łukasz Lenart, Adam Warski

Blog Comments powered by Disqus.