tl;dr

This article will show how to write a messaging PACT test in Java, based on a Kafka message with a JSON payload.

Introduction

The PACT Contract Testing framework has been around for some time now with the first usage back in 2013. Contract Testing is a great boon to building microservices, as called out by Sam Newman in the book of the same name (now in its 2nd edition).

Whilst support for HTTP communication is strong and has been around since the beginning of PACT, support for “messaging” is relatively recent. It is quite generic too and isn’t tied to a technology, so in addition to being available for kafka messaging, it could be used for other queueing or messaging systems, or websockets for that matter. Unlike HTTP, these contract tests do not involve using the underlying technology, so if you’re using kafka the tests will run without a kafka broker.

This is a bit of a double edged sword. Whilst PACT supports different messaging mechanisms that it doesn’t have any knowledge of, the contract tests don’t actually go so deep. I could, for instance, have an app that publishes messages over a websocket and have the consumer listening to kafka – this obviously wouldn’t work in practice but the contract test would be green. PACT is currently undergoing some refactoring to enable a plugin architecture which will allow this to be addressed, but for now, thats how things stand.

Anyway, back to why we’re here.

Having 2 applications that are communicating over kafka with a JSON payload (the value, not the key), I naturally wanted to use Kafka’s Schema Registry. When configured appropriately, this ultimately ensures that the producer writes data to a topic that meets the declared schema. This is good news in a running environment – when we attempt to write incorrect data we get blocked – our application can decide what to do. The consumers don’t need to worry.

Assuming that we may need to change the format of the message (and therefore the schema) at some point, we still want to be sure that changes we make won’t break things. This is where PACT comes in.

So, how do we use the messaging abstraction available in PACT to write a contract test for a java consumer? Glad you asked.

Recipe

The PACT documentation is being augmented with some “recipes” to show how to test various technologies. The following content has now been submitted under Kafka, with the exception of the Summary and link to the codebase.

Continuing with regular JSON

Here in Part 1, we’re going to start with a regular JSON message – not using the schema registry. Watch out for Part 2, which will build on this and add schema registry support.

We’ll start with the bones of a class and we will use Junit5 annotations:

JSON Consumer

@ExtendWith(PactConsumerTestExt.class)
@PactTestFor(providerName = "jsonKafkaProviderApp", providerType = ProviderType.ASYNCH, pactVersion = PactSpecVersion.V3)
class JsonKafkaConsumerTest {
...
}

Although we’re writing the consumer test, we’ve also named the provider. Next, we’ll define what we *expect* the contract to look like:

@Pact(consumer = "jsonKafkaConsumerApp")
MessagePact simpleJsonPact(MessagePactBuilder builder) {
    PactDslJsonBody body = new PactDslJsonBody();
    body.stringType("name", "almost-anything");

    return builder.expectsToReceive("A simple message")
                  .withMetadata(Map.of("contentType", "application/json"))
                  .withContent(body)
                  .toPact();
}

The most interesting part here is the body. We’ve defined it to have a single String field – I’d expect your real one to have more than one field in all likelihood. Having defined what we’re expecting, we can finally write our test:

@Test
@PactTestFor(pactMethod = "simpleJsonPact", providerType = ProviderType.ASYNCH)
void simpleMessage(List<Message> messages) {
    byte[] kafkaBytes = convertToKafkaBytes(messages.get(0));

    assertDoesNotThrow(() -> {
        expectApplicationToConsumeKafkaBytesSuccessfully(kafkaBytes);
    });
}

private byte[] convertToKafkaBytes(Message message) {
    return message.contentsAsBytes();
}

private void expectApplicationToConsumeKafkaBytesSuccessfully(byte[] kafkaBytes) {
    ConsumerDomainRecord consumerDomainRecord = useProductionCodeToDeserializeKafkaBytesToDomain(kafkaBytes);

    ProductionCode productionCode = new ProductionCode();
    productionCode.handle(consumerDomainRecord);
}

private ConsumerDomainRecord useProductionCodeToDeserializeKafkaBytesToDomain(byte[] kafkaBytes) {
    Deserializer<ConsumerDomainRecord> deserializer = getProductionKafkaDeserializer();
    return deserializer.deserialize("", kafkaBytes);
}

private Deserializer<ConsumerDomainRecord> getProductionKafkaDeserializer() {
    KafkaJsonDeserializer<ConsumerDomainRecord> domainRecordKafkaJsonDeserializer = new KafkaJsonDeserializer<>();
    Map<String, Object> props = Map.of(
            KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, ConsumerDomainRecord.class.getName()
    );
    domainRecordKafkaJsonDeserializer.configure(props, false);
    return domainRecordKafkaJsonDeserializer;
}

Our test method gets a List of Messages passed in – these are PACT’s representation of the message(s) that we defined in the previous step. Each message has space for a byte array and also metadata. We’re not going to look much at metadata at this point.

The “useProductionCodeToDeserializeKafkaBytesToDomain” method tries to indicate that we’re meant to be using some real production code in our tests. After all, there would be little point if our tests just test themselves. With Kafka, however, this is quite tricky. We need a production object that will take our byte array and process it. With Kafka, we’re not so close to the low level action – we typically use “out of the box” classes to do this work for us. A KafkaConsumer will typically return ConsumerRecords<Key,Value> and you can’t pass it a byte array! However, it would be configured with the appropriate Deserializers. For normal JSON, we’d configure it with a KafkaJsonDeserializer. Now, those take byte arrays as input. So, here I would suggest that you read your app’s configuration, and work our what the class of the deserialiser is, instantiating an instance and returning it. A little tricky, but that’s about as close to production code as we’re going to get. The “useProductionCodeToDeserializeKafkaBytesToDomain” is a quicker way of doing it for the sake of this demo.

Having turned out byte array into a custom domain object, what do we do? Well, what would your production application do with it? Call that code and make sure it can process it successfully. We don’t need to do all the assertions that “service” tests might be interested in, we just need to be sure that our application understood the input. A simple “assertDoesNotThrow” is likely sufficient.

Assuming you are using the PACT broker (well, why wouldn’t you? Its awesome!), you might want to run and publish the consumer test. This is what it looks like on the PACT broker:

{
  "consumer": {
    "name": "jsonKafkaConsumerApp"
  },
  "messages": [
    {
      "_id": "753ff77671f87af7045426b6f333e767315fbf2e",
      "contents": {
        "name": "almost-anything"
      },
      "description": "A simple message",
      "matchingRules": {
        "body": {
          "$.name": {
            "combine": "AND",
            "matchers": [
              {
                "match": "type"
              }
            ]
          }
        }
      },
      "metaData": {
        "contentType": "application/json"
      }
    }
  ],
  "metadata": {
    "pact-jvm": {
      "version": "4.3.4"
    },
    "pactSpecification": {
      "version": "3.0.0"
    }
  },
  "provider": {
    "name": "jsonKafkaProviderApp"
  }
}

We’ve just covered a fair bit, so take a breather. We ain’t half done.

JSON Provider

Lets start off by getting our test class in place with the required annotations – its pretty self explanatory, but note the names of the consumer and provider must match what we put in the corresponding consumer test. We’ll also add a @BeforeEach which tells PACT we’re running Message related tests (rather than HTTP), and implement the @TestTemplate method. Finally, we have a couple of constants that we’ll need in the next step:

@Provider("jsonKafkaProviderApp")
@Consumer("jsonKafkaConsumerApp")
@PactBroker(url = "http://localhost:9292")
class JsonKafkaProviderTest {

    private static final String JSON_CONTENT_TYPE = "application/json";
    private static final String KEY_CONTENT_TYPE = "contentType";

    @BeforeEach
    void before(PactVerificationContext context) {
        context.setTarget(new MessageTestTarget());
    }

    @TestTemplate
    @ExtendWith(PactVerificationInvocationContextProvider.class)
    void pactVerificationTestTemplate(PactVerificationContext context) {
        context.verifyInteraction();
    }
}

All that is left, is to write the test:

@PactVerifyProvider("A simple message")
MessageAndMetadata verifySimpleMessageEvent() {
    Map<String, Object> metadata = Map.of(
            KEY_CONTENT_TYPE, JSON_CONTENT_TYPE
    );
    ProviderDomainRecord providerDomainRecord = new ProviderDomainRecord("name");
    KafkaJsonSerializer<ProviderDomainRecord> serializer = createProductionKafkaSerializer();
    byte[] bytes = serializer.serialize("", providerDomainRecord);
    return createPactRepresentationFor(metadata, bytes);
}

private KafkaJsonSerializer<ProviderDomainRecord> createProductionKafkaSerializer() {
    Map<String, Object> config = Map.of(
    );
    KafkaJsonSerializer<ProviderDomainRecord> jsonSerializer = new KafkaJsonSerializer<>();
    jsonSerializer.configure(config, false);
    return jsonSerializer;
}

private MessageAndMetadata createPactRepresentationFor(Map<String, Object> metadata, byte[] bytes) {
    return new MessageAndMetadata(bytes, metadata);
}

In a similar fashion to the consumer test, we will need to get hold of a Kafka Serializer, which we should obtain through looking up the configuration of our production code (and again, in this demo, we’ll just create an instance for use – make sure you do a better job!).

The KafkaJsonSerializer will take our domain object and turn it into a byte array. we pass this back to PACT which will compare it to what is held in the PACT broker. If its a match, we’re green…

Summary

We’ve come a fair way. If you’re still with me, then my time has been worth it.

First, We discussed where contract tests fit with messaging applications, Kafka in particular.

We then wrote the consumer application – the one that would read a message from a Kafka topic.

We followed that with the corresponding provider – one that would write a message to Kafka.

By running each – and ideally using the PACT broker – we find that our test is green.

The Code

You can find all the code behind this and a few other examples (eg Part 2!) on my github account.

Next…

I set out to show how to work with Kafka Schema Registry, but we haven’t touched on it yet. That’s going to be handled in Part 2!