Reactive Event Sourcing in Java, Part 4: Controller
In the previous part of the series, we exposed the ShowService
as the main port (from the Hexagonal Architecture point of view. In this post, we will use it and, finally, launch our application not only from the perspective of tests.
Controller
I chose the Spring framework for the HTTP API layer, only because it's so popular. This could be anything you want, just remember that we are building a reactive solution, so it will be reasonable to use something with a non-blocking API, like Micronaut, Quarkus, etc. You can read about the differences between these here.
The ShowController
has 2 endpoints. The first one is to get ShowResponse
by id:
@RestController
@RequestMapping(value = "/shows")
public class ShowController {
private final ShowService showService;
public ShowController(ShowService showService) {
this.showService = showService;
}
@GetMapping(value = "{showId}", produces = "application/json")
public Mono<ShowResponse> findById(@PathVariable UUID showId) {
CompletionStage<ShowResponse> showResponse = showService.findShowBy(ShowId.of(showId)).thenApply(ShowResponse::from);
return Mono.fromCompletionStage(showResponse);
}
Since we are using Spring WebFlux, we need to transform CompletionStage
to Mono
to stay reactive. In a standard (blocking) controller, we would need to block and wait for the ShowService
response.
The second endpoint is more interesting because it is used for seat reservation and cancelation.
@PatchMapping(value = "{showId}/seats/{seatNum}", consumes = "application/json")
public Mono<ResponseEntity<String>> reserve(@PathVariable("showId") UUID showIdValue,
@PathVariable("seatNum") int seatNumValue,
@RequestBody SeatActionRequest request) {
ShowId showId = ShowId.of(showIdValue);
SeatNumber seatNumber = SeatNumber.of(seatNumValue);
CompletionStage<ShowEntityResponse> actionResult = switch (request.action()) {
case RESERVE -> showService.reserveSeat(showId, seatNumber);
case CANCEL_RESERVATION -> showService.cancelReservation(showId, seatNumber);
};
return Mono.fromCompletionStage(actionResult.thenApply(response -> switch (response) {
case CommandProcessed ignored -> accepted().body(request.action() + " successful");
case CommandRejected rejected -> badRequest().body(request.action() + " failed with: " + rejected.error().name());
}));
}
Let's skip the discussion on whether it is RESTful or not, I'm too old for that. In this case, we need to transform the response from the service to an appropriate HTTP status code.
We also need some basic Spring Beans configurations:
@Configuration
class BaseConfiguration {
@Bean
public Config config() {
return PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
}
@Bean(destroyMethod = "terminate")
public ActorSystem<Void> actorSystem(Config config) {
return ActorSystem.create(VoidBehavior.create(), "es-workshop", config);
}
@Bean
public ClusterSharding clusterSharding(ActorSystem<?> actorSystem) {
return ClusterSharding.get(actorSystem);
}
@Bean
Clock clock() {
return new Clock.UtcClock();
}
}
@Configuration
class ReservationConfiguration {
@Bean
public ShowService showService(ClusterSharding sharding, Clock clock) {
return new ShowService(sharding, clock);
}
}
The ActorSystem
is a pretty heavy structure. It should be created only once, a perfect candidate for a Bean
. Creation of a typed ActorSystem
requires passing some guardianBehavior
. At this point, we don't need this functionality, so we can pass a VoidBehavior
:
public class VoidBehavior {
public static Behavior<Void> create() {
return Behaviors.receive(Void.class).build();
}
}
The guardianBehavior
is more useful in the case of manual actor creation. In our case, we are using sharding for that.
The Config
Bean is using an in-memory event store. That's why the scope of the akka-persistence-testkit_*
dependency must be compile
. This is only for prototyping and it will be switched back to test
in the next part, when we introduce a production-ready event store.
Controller test
For testing our controller, we use WebTestClient
:
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
class ShowControllerItTest {
@Autowired
private WebTestClient webClient;
@Test
public void shouldGetShowById() {
//given
String showId = randomShowId().id().toString();
//when //then
webClient.get().uri("/shows/{showId}", showId)
.exchange()
.expectStatus().isOk()
.expectBody(ShowResponse.class).value(shouldHaveId(showId));
}
One thing worth noticing is that we are closing Spring Context after each test to avoid Actor System collisions:
2021-10-14 10:51:48,057 ERROR akka.io.TcpListener - Bind failed for TCP channel on endpoint [/127.0.0.1:2551]
java.net.BindException: [/127.0.0.1:2551] Address already in use
at java.base/sun.nio.ch.Net.bind0(Native Method)
at java.base/sun.nio.ch.Net.bind(Net.java:555)
at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:337)
at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:294)
at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
Using @DirtiesContext(classMode = ClassMode.AFTER_CLASS)
is not enough, we also need to configure the ActorSystem
Bean destroy method @Bean(destroyMethod = "terminate")
.
A different approach would be to reuse the Spring Context and the Actor System across all tests, but then we cannot create the Actor System manually like in ShowServiceTest.
Running the application
Our application is ready to be launched via the CinemaApplication
class or from a command line ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview"
(make sure that you are using Java 17).
You can run some requests from the development/show.http
file (requires IntelliJ IDEA Ultimate) or the curl
version from development/show.sh
.
Packaging
I'm a bit of a control freak when it comes to packaging and a clear separation of responsibilities. That's why I've added a test with ArchUnit assertions for that. The PackageStructureValidationTest
will check if there is no violation of rules between modules (base
should not depend on reservation
) and inside a single module. The domain
layer should not depend on application
, api
, infrastructure
(for future changes), and akka
. The application
layer should not depend on api
, infrastructure
, etc. All rules could be represented in this diagram:
This is not something new, it is from Vaughn Vernon's Implementing Domain-Driven Design book (the so-called "Red Book"). It could be very easily transformed into a hexagonal a.k.a ports and adapters version. The infrastructure
and api
packages are adapters, the application
package is exposing the port.
Of course, any other naming/packaging convention will be perfectly fine as long as it will protect the domain code and will be used consistently.
Summary
That was easy, wasn't it? We can use ShowService
in many different adapters, this could be Kafka consumer, WebSocket endpoint, anything you want. Check out the part_4 tag and play with the application. The main takeaway is that with tools like Akka Persistence, we can prototype Event Sourced applications pretty fast. We can easily add it to the existing blocking or non-blocking stack. We could go without a durable event store for a long time, but I have a feeling that you would like to see something production-ready. Stay tuned, this will be the topic for the next part. If you don't want to miss any new materials, subscribe to SoftwareMill Academy.