Contents

Contract testing of event driven application with Kafka and Spring Cloud Contract

Contract testing of event driven application with Kafka and Spring Cloud Contract webp image

In the previous articles of this series, I showed how to implement contract tests of the synchronous communication realized via the REST API. This is quite an easy and common example that is widely used in the articles about contract testing available on the Internet. What is harder to find is how to test an asynchronous communication implemented using a message broker.

In this article, I will explain how to test asynchronous, event-driven communication with Kafka as a message broker. This tutorial requires a basic understanding of Kafka, as explaining its functions is out of the scope of this post. Thus, if you’re not familiar with Kafka, I suggest you first understand its concepts. You can find a lot of useful resources to start your learning journey in Michał’s article “Good resources for learning about Apache Kafka”.

If you want to read how event-driven architecture and Kafka help to develop distributed systems, I encourage you to read the following article.

Zrzut%20ekranu%202023-04-21%20o%2014.45.17

Previous articles of this series use an example of an automatically paid order in the e-commerce platform. There are two services, order-service, and payment-service, which communicate via REST API, i.e., order-service calls payment-service’s API. For this part, I inverted the dependencies to demonstrate how this use-case may be realized using the event-driven solution. Now, the order-service publishes the fact of creating the order using an event, which is sent to Kafka’s topic. One of the subscribers of this topic is the payment-service, which for every order initializes the payment.
image5
This change causes the order-service to be a producer, while the payment-service is a consumer using the terminology of contract testing. For the sake of simplicity, the events are produced in JSON format. They contain the metadata (eventId and time of occurrence) together with information about created order.

{
  "eventId": "9c465a8f-bf77-4f3a-b357-d760ced8b26f",
  "occuredAt": "2023-12-02T13:00:00",
  "orderId": "59404950-bc8d-4722-a521-0dee64b680d6",
  "accountId": "5c2c2407-7202-4720-9a45-f0d0f8175ef1",
  "total": 120.8
}

In this tutorial, I explain how to test such a type of communication using Spring Cloud Contract. Similar to testing REST API, we start with defining the contract.

Defining the contract

We may define the contract using the DSL provided by Spring Cloud Contract. It may be written in Java, Kotlin, Groovy, or Yaml. This is the contract in Groovy for OrderCreated event published by order-service:

Contract.make {
    description("Should produce OrderCreated event when order was created")
    input {
        triggeredBy("createOrder()")
    }
    label("triggerOrderCreatedEvent")
    outputMessage {
        sentTo("OrderCreated")
        body([
                eventId: anyUuid(),
                occurredAt: anyDateTime(),
                orderId: anyUuid(),
                accountId: anyUuid(),
                total: $(consumer(regex("(\\d+\\.\\d+)")))
        ])
    }
}

You may notice that it looks slightly different from the REST API contract. It is expected as this type of communication works differently. There is no server that handles the consumer’s requests, but the producer (order-service in this case) publishes an event as a result of business operation. Thus, this must be somehow triggered in the test. In the section input we define what triggers sending the message. In this case, it’s the createOrder() method. This will be used in the generated tests as I will show later.

Also, on the consumer side, we have to somehow trigger sending the message to the Kafka consumer. This is defined by the label method. I called this label triggerOrderCreatedEvent. This will be necessary to know while implementing the consumer’s tests.

Finally, the outputMessage section defines where the message is published and what it looks like. The sentTo() method defines the message sent to the OrderCreated topic. We expect the body of this message to contain 5 fields of the given types. This is defined using the body() method. Similar to what I explained in the article about testing REST API, we don’t use the concrete values of those fields here. Instead, we define that eventId, orderId, and accountId should contain any valid UUID. The event occurrence date (occurredAt) is in a date with time format, and total is positive double.

This contract should be placed in /src/contractTest/resources. We also can use subdirectories to group the contracts by consumers. In this example, the order-service is a producer thus the contracts are placed in this project in the directory
order-service/src/contractTest/resources/contracts/paymentService.

image6

Testing the producer

To generate the tests and stubs from the contract and execute the tests of the producer, we need to use the Spring Cloud Contract dependency and plugin.

plugins {
    id 'org.springframework.cloud.contract' version '4.0.0'
}

dependencies {
    testImplementation 'org.springframework.cloud:spring-cloud-starter-contract-verifier:4.0.0'
}

Other dependencies and plugins required to execute the tests, like Spring Boot Test or Spring Kafka were omitted for conciseness. In the Gradle’s build.gradle file I also specified the minimal configuration required to run the framework:

contracts {
    baseClassForTests = "pl.rmaciak.payment.BaseContractTestsSpec"
    testFramework = "SPOCK"
}

This declares that generated tests will be implemented in Spock and what is most important, the base class for them will be BaseContractTestsSpec. And this is a thing we have to dive into.

While testing the REST API, the base class is used to launch the server, or a mock of a server, which is used to execute the test against. In that case, the server is a producer which exposes the REST API. The tests confirm whether the API meets the requirements specified in the contracts. However, things look different in event-driven examples. The producer doesn’t expose the API but it publishes the message to the Kafka broker. The tests should confirm whether the message meets the requirements declared in the contracts. Thus, we need to capture the message sent to the broker and validate it against the contracts.

Capturing the message may be implemented in various ways. One possibility is to use Testcontainers with Kafka’s image to run the broker in the Docker container. Contract test triggers the functionality which publishes the event. It is sent to the suitable topic on started, ephemeral, Kafka broker. And this is what we mainly do in the BaseContractTestsSpec class.

@SpringBootTest(webEnvironment = NONE, classes = [OrderApplication.class])
@AutoConfigureMessageVerifier
@Testcontainers
abstract class BaseContractTestsSpec extends Specification {

    @Shared
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

    @Autowired
    private OrderCreator orderCreator

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        kafka.start()
        registry.add("spring.kafka.bootstrap-servers", () -> kafka.getBootstrapServers())
    }

    void createOrder() {
        orderCreator.createOrder(
                UUID.randomUUID(),
                UUID.randomUUID(),
                BigDecimal.valueOf(120.8)
        )
    }
}

This class declares the Kafka container with the latest image of Confluent’s community version of Kafka (field Kafka). This container is started in the method annotated with @DynamicPropertySource. This method also sets the property spring.kafka.bootstrap-server to the one exposed in the container. This allows Spring to publish and consume the events using the broker in the container. The method createOrder() is the one declared as the triggering input in the contract. It simply executes the order creation logic, using injected OrderCreator, which causes the event to be published.

Having the BaseContractTestsSpec implemented as described causes only the messages to be published by the service logic to the Kafka running in the container. What is missing is how to receive the message and verify it against the contract. This may be achieved by a Message Verifier. Note, that the class is annotated with @AutoConfigureMessageVerifier, which creates and configures the beans responsible for verifying the messages by Spring Cloud Contract. The framework provides a bunch of the out of the box verifiers for JMS, Apache Camel, Spring Integration, and others. Unfortunately, there is no built-in verifier for Kafka brokers. Thus, we have to implement it ourselves.

Spring Cloud Contracts provides the interface MessageVerifierReceiver which can be implemented in order to receive the message from Kafka in the test. This interface declares two overloaded methods receive(). Those methods return the interface Message from Spring Messaging. This is the message which is validated against the contract by Spring Cloud Contract. To pull the message from the broker we can implement a test consumer, which, in this example, puts the messages into the set. It only consumes the events from the OrderCreated topic, so if the event is published to the wrong destination, it won’t be consumed, and the test will fail. The receive() methods read the message from this set and return them if they exist. Of course, consuming the messages may take some time, so it’s repeated every few seconds.

class KafkaEventVerifier implements MessageVerifierReceiver<Message<?>> {

    private final Set<Message> consumedEvents = Collections.synchronizedSet(new HashSet<Message>())

    @KafkaListener(topics = ["OrderCreated"], groupId = "order-consumer")
    void consumeOrderCreated(ConsumerRecord payload) {
        consumedEvents.add(MessageBuilder.createMessage(payload.value(), new MessageHeaders(emptyMap())))
    }

    @Override
    Message receive(String destination, long timeout, TimeUnit timeUnit, @Nullable YamlContract contract) {
        for (int i = 0; i < timeout; i++) {
            Message msg = consumedEvents.stream().findFirst().orElse(null)
            if (msg != null) {
                return msg
            }

            timeUnit.sleep(1)
        }

        return consumedEvents.stream().findFirst().orElse(null)
    }

    @Override
    Message receive(String destination, YamlContract contract) {
        return receive(destination, 5, SECONDS, contract)
    }
}

This visualizes the architecture of this solution:
image1

Now, we have everything necessary to validate the message. It’s published to Kafka and consumed by our verifier. Spring Cloud Contract receives the consumed message and validates whether it meets the expectations provided in the contracts. This is done in the generated test. They are created by the framework while building the application. To generate them you can run the following command:

./gradlew check

This is the test that was generated from our contract:

@SuppressWarnings("rawtypes")
class PaymentServiceSpec extends BaseContractTestsSpec {
    @Inject ContractVerifierMessaging contractVerifierMessaging
    @Inject ContractVerifierObjectMapper contractVerifierObjectMapper

    def validate_shouldProduceOrderCreatedEvent() throws Exception {
        when:
            createOrder()

        then:
            ContractVerifierMessage response = contractVerifierMessaging.receive("OrderCreated",
                    contract(this, "shouldProduceOrderCreatedEvent.yml"))
            response != null

        and:
            DocumentContext parsedJson = JsonPath.parse(contractVerifierObjectMapper.writeValueAsString(response.getPayload()))
            assertThatJson(parsedJson).field("['eventId']").matches("[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
            assertThatJson(parsedJson).field("['occurredAt']").matches("([0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])")
            assertThatJson(parsedJson).field("['orderId']").matches("[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
            assertThatJson(parsedJson).field("['accountId']").matches("[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
            assertThatJson(parsedJson).field("['total']").matches("(\\d+\\.\\d+)")
    }
}

This test class extends our BaseContractTestsSpec and in the when section, it simply calls the trigger method createOrder(). This causes the message to be published to the Kafka broker and then pulled by our consumer. Thus, calling the receive() method in then section eventually returns the message. Finally, the message is asserted field by field. This test is located in the build/generated-test-sources directory.

This part of the testing validates whether the valid message was published to the correct topic in Kafka broker. If the tests pass, we are sure that the message meets the contract’s expectation. It requires you to write some code, mainly because there is no built-in verifier for Kafka. However, this is rather generic code and may be easily extended to handle other events too.

Testing the consumer

In the contract tests of REST API the Wiremock stubs, which reflect the contract’s assumption, were created while building the application. They could be used in the consumer’s tests. In event-based communication using a message broker, there is no direct HTTP connection between the producer and consumer. Instead, it relies on the messages which are published to and consumed from the broker, located between the services. Thus, testing the consumer is in fact checking whether the consumer is able to understand and process the event published by the producer.

The consumer usually has an event listener implemented, as it consumes the events. What we want to do in the test is to validate whether the message, which conforms to the contract’s rules, is correctly consumed and processed. Thus, we need to send the sample message to the Kafka, which should be consumed by the consumer and processed. In our scenario, the payment-service is a consumer, and it consumes the events from order-service.

To generate and publish the message which reflects the contract, we need to know what the message looks like. This is why in the JAR created while building the producer, there are no Wiremock stubs. Instead, it contains the contract itself. This is exactly the same contract that we defined in the producer. This provides the following information:

  • event destination (topic)
  • message structure
  • what triggers the message

image3
This contract is located in the JAR file, which can be found in the build/libs directory (order-service-0.0.1-SNAPSHOT-stubs.jar). The JAR may be pushed, for instance, to the local Maven repository (.m2 directory). Having the contract we may implement the tests, which validates the logic of payment-service. To remind, on OrderCreated event, this service should initiate the payment.

To test it, we also need the Kafka broker. Again, we use Testcontainers to run Kafka as a Docker container. To send the sample message to the broker, we need two things:

  • definition of the message (this is a contract which is located in the JAR)
  • mechanism to send the message to the broker. Again, there is no built-in mechanism for Kafka, thus we have to implement it ourselves.

This is how the test class with one test scenario looks:

@SpringBootTest(webEnvironment = NONE, classes = [PaymentApplication.class, TestConfiguration.class])
@AutoConfigureStubRunner(ids = ["com.rmaciak:order-service:0.0.1-SNAPSHOT:stubs:8088"], stubsMode = LOCAL)
@Testcontainers
class EventHandlerSpec extends Specification {

    @Shared
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

    @Autowired
    PaymentRepository paymentRepository

    @Autowired
    StubTrigger stubTrigger

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        kafka.start()
        registry.add("spring.kafka.bootstrap-servers", () -> kafka.getBootstrapServers())
    }

    def "should initiate payment on OrderCreatedEvent"() {
        given:
        def conditions = new PollingConditions(timeout: 10, initialDelay: 1.5, factor: 1.25)

        when:
        stubTrigger.trigger("triggerOrderCreatedEvent")

        then:
        conditions.eventually {
            def payments = paymentRepository.getAll()
            assert payments.size() == 1
            assert payments[0].paymentStatus() == FINISHED
        }
    }
}

It has an annotation @AutoConfigureStubRunner, among others, which you already know from the previous article. In the case of our test, it provides the contract to generate the event. We also have a Kafka container declared and started in a method annotated with DynamicPropertySource, which was explained in the producer’s section. The most interesting field of this test class is stubTrigger. This is an object provided by Spring Cloud Contract which allows you to trigger sending the message. It’s used in the when section of the test method. In this case, I use triggering by label, where label triggerOrderCreatedEvent is defined in the contract. Spring Cloud Contract supports another triggering mechanism. You can read about them in the documentation.

In this test scenario, we check whether the payment was created and executed (status is FINISHED) when the service consumed the OrderCreated event. Checking the payment and its status is wrapped in PoolingCondition because publishing and consuming the event is an asynchronous operation that should eventually happen.

The test looks quite simple. It triggers sending the message and validates whether the expected logic was executed. But what sends the message? There is a dedicated interface called MessageVerifierSender which we implement to send the message to Kafka.

@Configuration
class TestConfiguration {

    @Bean
    MessageVerifierSender<Message<?>> standaloneMessageVerifier(KafkaTemplate kafkaTemplate) {
        return new MessageVerifierSender<Message<?>>() {

            @Override
            void send(Message<?> message, String destination, @Nullable YamlContract contract) {
                kafkaTemplate.send(message)
            }

            @Override
            <T> void send(T payload, Map<String, Object> headers, String destination, @Nullable YamlContract contract) {
                kafkaTemplate.send(
                        createMessage(payload, new MessageHeaders(Map.of(TOPIC, destination)))
                )
            }
        }
    }
}

It simply uses KafkaTemplate from SpringKafka to publish the event. Then it’s consumed by the payment-service’s listener and processed.

image4

Now we have both producer and consumer of the event tested against the same contract. This ensures that both sides of this communication conform to the rules declared in this contract. Even if they don’t communicate directly with each other, but there is a broker between them, we can ensure the correctness of this communication using contract tests.

Why do I need contract tests for Kafka messages?

While reading this article you may wonder - why do I need contract tests for Kafka messages? Usually, the messages are not in JSON format, but they leverage schema-based formats like Avro and Schema Registry. They ensure that the messages are compatible. There are two answers:

  1. For any reason you don’t want or can’t use e.g. Avro + Schema Registry
  2. Schema validates only the semantic correctness of the message. It doesn’t ensure that the message was published to and consumed from the correct topic. It also doesn’t validate that the message is valid in a specific scenario. For example, the message may contain optional fields. If all the optional fields are not contained, it’s semantically valid. However, such a message doesn’t make sense in any business scenario, and let’s say at least one optional field is required for the message to be valid.

Thus, even if you use a schema-based message format with Schema Registry, it’s worth implementing contract tests to make sure that this communication is valid, not only in semantic meaning but also meets functional requirements.

Summary

In this article, I showed how to use Spring Cloud Contract to test asynchronous communication between microservices using Kafka as a message broker. It started with defining the contract for this communication. Then both producer and consumer were tested using defined contracts. While introducing the concepts, I explained some differences between testing the REST API and event-based communication using this framework.

You can notice that testing events with Kafka is not very easy in this framework (compared to testing e.g. REST API). It requires you to write some code in order to validate that the message published to, and consumed from the broker is correct. As I wrote, this is mainly because Spring Cloud Contract doesn’t provide a built-in mechanism for testing messages with Kafka, and we have to implement it on our own. However, I believe that after reading this article, you will find it easier to understand and you will be able to implement such tests in your project.

The upcoming, last in the series, article will explain testing Kafka contracts using a Pact broker. I encourage you to read it and decide which approach suits your project best.

Reviewed by Rafał Wokacz

Blog Comments powered by Disqus.