Contents

Reactive Event Sourcing in Java, Part 4: Controller

In the previous part of the series, we exposed the ShowService as the main port (from the Hexagonal Architecture point of view. In this post, we will use it and, finally, launch our application not only from the perspective of tests.

Controller

I chose the Spring framework for the HTTP API layer, only because it's so popular. This could be anything you want, just remember that we are building a reactive solution, so it will be reasonable to use something with a non-blocking API, like Micronaut, Quarkus, etc. You can read about the differences between these here.

The ShowController has 2 endpoints. The first one is to get ShowResponse by id:

@RestController
@RequestMapping(value = "/shows")
public class ShowController {

    private final ShowService showService;

    public ShowController(ShowService showService) {
        this.showService = showService;
    }

    @GetMapping(value = "{showId}", produces = "application/json")
    public Mono<ShowResponse> findById(@PathVariable UUID showId) {
        CompletionStage<ShowResponse> showResponse = showService.findShowBy(ShowId.of(showId)).thenApply(ShowResponse::from);
        return Mono.fromCompletionStage(showResponse);
    }

Since we are using Spring WebFlux, we need to transform CompletionStage to Mono to stay reactive. In a standard (blocking) controller, we would need to block and wait for the ShowService response.

The second endpoint is more interesting because it is used for seat reservation and cancelation.

@PatchMapping(value = "{showId}/seats/{seatNum}", consumes = "application/json")
public Mono<ResponseEntity<String>> reserve(@PathVariable("showId") UUID showIdValue,
                                            @PathVariable("seatNum") int seatNumValue,
                                            @RequestBody SeatActionRequest request) {

    ShowId showId = ShowId.of(showIdValue);
    SeatNumber seatNumber = SeatNumber.of(seatNumValue);
    CompletionStage<ShowEntityResponse> actionResult = switch (request.action()) {
        case RESERVE -> showService.reserveSeat(showId, seatNumber);
        case CANCEL_RESERVATION -> showService.cancelReservation(showId, seatNumber);
    };

    return Mono.fromCompletionStage(actionResult.thenApply(response -> switch (response) {
        case CommandProcessed ignored -> accepted().body(request.action() + " successful");
        case CommandRejected rejected -> badRequest().body(request.action() + " failed with: " + rejected.error().name());
    }));
}

Let's skip the discussion on whether it is RESTful or not, I'm too old for that. In this case, we need to transform the response from the service to an appropriate HTTP status code.

We also need some basic Spring Beans configurations:

@Configuration
class BaseConfiguration {

    @Bean
    public Config config() {
        return PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
    }

    @Bean(destroyMethod = "terminate")
    public ActorSystem<Void> actorSystem(Config config) {
        return ActorSystem.create(VoidBehavior.create(), "es-workshop", config);
    }

    @Bean
    public ClusterSharding clusterSharding(ActorSystem<?> actorSystem) {
        return ClusterSharding.get(actorSystem);
    }

    @Bean
    Clock clock() {
        return new Clock.UtcClock();
    }
}

@Configuration
class ReservationConfiguration {

    @Bean
    public ShowService showService(ClusterSharding sharding, Clock clock) {
        return new ShowService(sharding, clock);
    }
}

The ActorSystem is a pretty heavy structure. It should be created only once, a perfect candidate for a Bean. Creation of a typed ActorSystem requires passing some guardianBehavior. At this point, we don't need this functionality, so we can pass a VoidBehavior:

public class VoidBehavior {
    public static Behavior<Void> create() {
        return Behaviors.receive(Void.class).build();
    }
}

The guardianBehavior is more useful in the case of manual actor creation. In our case, we are using sharding for that.

The Config Bean is using an in-memory event store. That's why the scope of the akka-persistence-testkit_* dependency must be compile. This is only for prototyping and it will be switched back to test in the next part, when we introduce a production-ready event store.

Controller test

For testing our controller, we use WebTestClient:

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
class ShowControllerItTest {

    @Autowired
    private WebTestClient webClient;

    @Test
    public void shouldGetShowById() {
        //given
        String showId = randomShowId().id().toString();

        //when //then
        webClient.get().uri("/shows/{showId}", showId)
                .exchange()
                .expectStatus().isOk()
                .expectBody(ShowResponse.class).value(shouldHaveId(showId));
    }

One thing worth noticing is that we are closing Spring Context after each test to avoid Actor System collisions:

2021-10-14 10:51:48,057 ERROR akka.io.TcpListener - Bind failed for TCP channel on endpoint [/127.0.0.1:2551]
java.net.BindException: [/127.0.0.1:2551] Address already in use
    at java.base/sun.nio.ch.Net.bind0(Native Method)
    at java.base/sun.nio.ch.Net.bind(Net.java:555)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)

Using @DirtiesContext(classMode = ClassMode.AFTER_CLASS) is not enough, we also need to configure the ActorSystem Bean destroy method @Bean(destroyMethod = "terminate").

A different approach would be to reuse the Spring Context and the Actor System across all tests, but then we cannot create the Actor System manually like in ShowServiceTest.

Running the application

Our application is ready to be launched via the CinemaApplication class or from a command line ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview" (make sure that you are using Java 17).

You can run some requests from the development/show.http file (requires IntelliJ IDEA Ultimate) or the curl version from development/show.sh.

Packaging

I'm a bit of a control freak when it comes to packaging and a clear separation of responsibilities. That's why I've added a test with ArchUnit assertions for that. The PackageStructureValidationTest will check if there is no violation of rules between modules (base should not depend on reservation) and inside a single module. The domain layer should not depend on application, api, infrastructure (for future changes), and akka. The application layer should not depend on api, infrastructure, etc. All rules could be represented in this diagram:

This is not something new, it is from Vaughn Vernon's Implementing Domain-Driven Design book (the so-called "Red Book"). It could be very easily transformed into a hexagonal a.k.a ports and adapters version. The infrastructure and api packages are adapters, the application package is exposing the port.

Of course, any other naming/packaging convention will be perfectly fine as long as it will protect the domain code and will be used consistently.

Summary

That was easy, wasn't it? We can use ShowService in many different adapters, this could be Kafka consumer, WebSocket endpoint, anything you want. Check out the part_4 tag and play with the application. The main takeaway is that with tools like Akka Persistence, we can prototype Event Sourced applications pretty fast. We can easily add it to the existing blocking or non-blocking stack. We could go without a durable event store for a long time, but I have a feeling that you would like to see something production-ready. Stay tuned, this will be the topic for the next part. If you don't want to miss any new materials, subscribe to SoftwareMill Academy.

Blog Comments powered by Disqus.