Contents

Contract testing of the event-driven system with Kafka and Pact

Rafał Maciak

10 May 2023.8 minutes read

Contract testing of the event-driven system with Kafka and Pact webp image

Testing asynchronous event-driven systems is quite challenging. However, contract testing helps with this task by providing validation of communication correctness. I explained how to implement such testing using Spring Cloud Contract in the previous article. In this one, the last in the series part, you can read how to test asynchronous communication with Kafka as the message broker using the Pact framework.

This article assumes you have a basic understanding of event-driven architecture and are familiar with Kafka broker. If you don’t feel confident with those topics yet, please refer to this article, where I provided some resources to learn those concepts. And if you want to see how Kafka works, check out our Kafka Visualization.

It still uses an example of an order-service and a payment-service. To remind, the first one publishes an event OrderCreated to Kafka when an order is created. The payment-service consumes this event and initiates the payment process.

an example of an order-service and a payment-service
And this is how the OrderCreated event looks like:

{
  "eventId": "9c465a8f-bf77-4f3a-b357-d760ced8b26f",
  "occuredAt": "2023-12-02T13:00:00",
  "orderId": "59404950-bc8d-4722-a521-0dee64b680d6",
  "accountId": "5c2c2407-7202-4720-9a45-f0d0f8175ef1",
  "total": 120.8
}

In this tutorial, the tests are written in Groovy using Spock. Gradle is used as a build tool. It also uses Testcontainers to run the Kafka broker. All the codes from this tutorial are available in this repository.

Let’s see how to test this communication using the Pact framework.

Defining the contract

Pact expects to define the contract expectations in the consumer’s tests. Similar to HTTP communication, there is an API to define message/event-driven communication. In our example, the payment-service is a consumer, so we define it in its tests. This is how contract expectations for the OrderCreated event look like:

new PactMessageBuilder().call {
    serviceConsumer 'payment-service'
    hasPactWith 'order-service'

    expectsToReceive 'OrderCreatedEvent when order is created'
    withContent(contentType: 'application/json') {
        eventId uuid()
        occurredAt datetime()
        orderId uuid()
        accountId uuid()
        total decimal()
    }
}

The class PactMessageBuilder allows defining contract expectations for messages. They should be defined as a closure in the call() method. First, we define the services which participate in this communication. The builder method serviceConsumer() defines the consumer's name, which is payment-service, and using hasPactWith(), we say that this service communicates, or has a pact (contract), with a provider - order-service.

The method expectsToRecive() defines the name of the contract. I named it 'OrderCreatedEvent when order is created' to reflect the business scenario which is tested. This name is important because this will identify the contract in the PactBroker. The producer has to use this name too, which will be explained later. Finally, using withContent(), we define the body of our event. I provided all the fields in the body of the OrderCreated event using uuid(), datetime(), and decimal() matchers.

That’s all, we defined the contract of order-service and payment-service communication. Now let’s write the test to validate whether the payment-service correctly handles this event.

Testing the consumer

To validate whether the payment-service consumes and handles the OrderCreated event correctly, we need to simulate sending this event by the provider to the broker. There are a couple of ways to do it. I chose the one with a real Kafka broker started in Testcontainers.

@Shared
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
    kafka.start()
    registry.add("spring.kafka.bootstrap-servers", () -> kafka.getBootstrapServers())
}

I’m using Spring’s KafkaTemplate to produce the event in the test. In the then section of the test, I validate whether the expected business functionality was executed when the event was consumed. In this case, it’s checked whether a payment was created in the service and was processed.

def "should initiate payment on OrderCreatedEvent"() {
    given:
    def eventFlow = new PactMessageBuilder().call {
        serviceConsumer 'payment-service'
        (…) // already explained above
    }

    when:
    eventFlow.run { Message message ->
        kafkaTemplate.send('OrderCreated', message.contentsAsBytes())
    }

    then:
    new PollingConditions().eventually {
        def payments = paymentRepository.getAll()
        assert payments.size() == 1
        assert payments[0].paymentStatus() == FINISHED
    }
}

When the test is running, the event is generated by the Pact framework with random values according to the defined field types. Then it’s sent to Kafka and consumed by the payment-service. It’s eventually processed, and payment is generated. This confirms that the payment-service can understand and process the OrderCreated event.

Of course, in order to implement and run this test, we need to add the required dependencies. Particularly, we need to add a dependency to Pact for Groovy:

testImplementation 'au.com.dius.pact.consumer:groovy:4.4.4'

Following diagram summarizes how this test works:

testing the consumer diagram
As you can see, the contract is published to the Pact broker as we want to share it with the provider. To do so, we have to add the Pact plugin and configure it in the build.gradle:

id "au.com.dius.pact" version "4.3.6"
(…)

pact {
    publish {
        pactBrokerUrl = 'http://localhost:9292/'
    }
}

The following command publishes the contract to the Pact broker:

./gradlew pactPublish

We can see in the Pact broker that the contract is available, however, it’s not verified with the provider yet.

pacts
If you go to the details of the integration between order-service and payment-service (by clicking the document icon), you can see that the name used in the method expectsToReceive() in the contract’s expectation is the one that identifies the contract (description field in the snippet below).

integration between order-service and payment-service
Now it’s time to implement the producer’s test.

Testing the producer

Currently, we have a contract defined and pushed to the Pact broker. The consumer is validated whether it handles the defined message correctly. Our goal now is to check whether the producer conforms to the contract and produces the correct event. In this case, we need to implement the method which returns the event published by the producer. Again, there are a couple of ways to do it. My solution is to execute the business logic which is responsible for producing the OrderCreated event. I also use Testcontainers with Kafka broker to receive this message. In the test, I consume the message and return it to Pact for validation. This may sound quite complicated, but I want to be sure that the validated message is exactly the one that is sent to Kafka and there is no difference between production and test serialization settings.

See how consuming is implemented in the test:

@PactBroker(url = "http://localhost:9292")
@Provider("order-service")
@SpringBootTest(webEnvironment = NONE)
@Testcontainers
class ContractVerificationSpec extends Specification {

    @Shared
    private static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

    @Shared
    private static final Set<String> consumedEvents = Collections.synchronizedSet(new HashSet<String>())

    @DynamicPropertySource
    static void registerKafkaBootstrapServerProperty(DynamicPropertyRegistry registry) {
        kafka.start()
        registry.add("spring.kafka.bootstrap-servers", () -> kafka.getBootstrapServers())
    }

    @KafkaListener(topics = "OrderCreated")
    void orderCreatedEventListener(ConsumerRecord record) {
        consumedEvents.add((String) record.value())
    }
    ( ... )
}

The class is annotated with a couple of annotations:

  • @PactBroker instructs the framework to receive contracts from Pact Broker
  • @Provider defines the name of the provider service. This is used to get the contracts from Pact Broker
  • @SpringBootTest is used to create ApplicationContext
  • @Testcontainer to use the Kafka container

Then, at the beginning of this class, I defined a Kafka container using KafkaContainer and a Set to store consumed messages. The method annotated with @DynamicPropertySource is necessary to start the Kafka container and set the property spring.kafka.bootstrap-server to the one exposed in the container. Finally, the test event consumer is implemented using @KafkaListener annotation. It simply puts consumed events into the predefined set. This part of the test class is responsible for consuming the events from Kafka.

By default, the Pact framework is configured to validate HTTP communications. Thus, we need to configure it to validate messages instead. This may be done using the following code, which simply configures the target to be message based.

@BeforeEach
void setTargetToMessage(PactVerificationContext context) {
    context.setTarget(new MessageTestTarget())
}

Now it's time to define the test. Do you remember when I said that the name of the contract is important? This is a time when we use it. For every contract, we have to implement the method which returns the message for validation. Such method must be annotated with @PactVerifyProvider with contract name as a value parameter ('OrderCreatedEvent when order is created' in this example):

@PactVerifyProvider('OrderCreatedEvent when order is created')
String "should publish OrderCreated event"() {
    orderCreator.createOrder(randomUUID(),randomUUID(), BigDecimal.valueOf(10.5))

    new PollingConditions().eventually {
        assert consumedEvents.stream().findFirst().isPresent()
    }

    return consumedEvents.stream().findFirst().orElse(null)
}

This method executes order creation logic, which under the hood publishes the OrderCreated event, and waits until the event is available in the consumedEvents set (until it’s consumed by the listener defined above). This event is returned, and the Pact framework validates whether the structure of this event conforms contract expectations. To initiate this validation, we must execute verifyInteraction() method on Pact’s context:

@TestTemplate
@ExtendWith(PactVerificationInvocationContextProvider)
void initInteractionVerification(PactVerificationContext context) {
    context.verifyInteraction()
}

While executing the build of order-service, the Pact framework connects to the Pact broker and retrieves all the contracts defined for this service. Then, for each contract, it finds the methods with @PactVerifyProvider annotation and a suitable name. If one doesn’t exist, the error is returned as the tests are required for all contracts. This makes sure that all the scenarios are tested. The following picture summarizes the flow of this test:

flow of the producer test
Building the service should publish the contract testing results to the Pact broker to be used, for instance, by the can-i-deploy tool. You can read more about this tool in my previous article about Pact.

Summary

In this tutorial, I explained how to use the Pact framework to test asynchronous, event-based communication between services. We defined contract expectations using dedicated DLS in the consumer's test (payment-service). Then we implemented the test, which validates whether the consumer is able to handle this event correctly and executes expected business logic based on this event. Next, the contract was published to Pact Broker. It allowed us to implement producer’s tests. They confirm that the producer generates events that meet the expectations of the contract.

This tutorial is the last in the series of contract testing. It consists of the following articles, which introduce contract testing and provide examples of implementing it using Pact and Spring Cloud Contract frameworks for both synchronous and asynchronous communication types:

  1. Testing Microservices - Contract Tests. The first article introduces the contract testing concept.
  2. Contract Testing - Spring Cloud Contract. This article explains how to implement the contract tests of REST API using Spring Cloud Contract.
  3. Contract Testing with Pact. It explains how to test REST API but using the Pact framework instead.
  4. Contract testing of the event-driven system with Kafka and Spring Cloud Contract. This article shows how to test event-driven communication based on Kafka broker using Spring Cloud Contract.
  5. Contract testing of the event-driven system with Kafka and Pact. The current article explains how to test event-driven communication using Pact.

After reading this series, I hope you are familiar with the concept of contract testing and feel confident to implement those tests in your project.

Good luck!

Blog Comments powered by Disqus.