Adding tracing to a distributed system makes it easier to see whats happening. A single request coming in might span across multiple microservices, and its nice to be able to see what happened to it.

Kafka runs as the beating heart of a distributed system. If you’re using it then tracing is something you should consider as a way to garner insights into what is happening.

Jorge Quilcate wrote a good blog on the Confluent site on the topic, using Zipkin, now an opensource project originally built by twitter. There is another useful blog on the topic written by Aaron Burk using an alternative opentracing product – Jaeger.

In terms of functionality, there seems to be little to choose between Zipkin and Jaeger. They both provide a similar Web app where you can see the traces, search for them etc. Jaeger was partially inspired by Zipkin, but the architecture, and consequently the deployment, is different: it can either be deployed as a single “all-in-one” application (docker image!) like Zipkin, or the individual parts (Agent, Collectors, Query/UI, Ingestors) can be individually deployed. This allows the tracing application itself to be scaled as needed.

However, what sets Jorge’s implementation using Zipkin aside is that not only was he able to add tracing to a Kafka Streams application, but he and his colleagues also wrote an interceptor for Kafka Connect and KSQL. By deploying the JAR onto the Connect and KSQL servers, calls running through these components can also be traced.

There would be nothing to stop anyone doing the same for Jaeger – it is just that nobody has done it – to the best of my knowledge – at the time of writing.

Having already tried out Zipkin successfully, I became interested in looking at Jaeger due to the built in Jaeger support in Quarkus: I had already adapted a normal Kafka Streams application to Quarkus, but I had stripped out the Zipkin support. It was time to put it back in and Jaeger looked like a promising option.

The glue that brings the best of both worlds together is down to Jaeger, as it can be configured to listen for the tracing messages sent from a Zipkin client. We can therefore run a Jaeger backend but also take advantage of the Zipkin integration with Kafka Connect and KSQL.

I first replaced the use of Zipkin with Jaeger. The next task was to change my Quarkus application. However, the existing integration to Jaeger is limited to HTTP and JDBC requests. After some investigation, it appeared to be feasible to add support for Kafka Streams. In the meantime though, a workaround could be implemented using the old Zipkin client – and again entrusting Jaeger to listen to the Zipkin spans being reported.

The Java code needs access to the KafkaStreams object in order to add the tracing, here’s the gist:

void onStart(@Observes StartupEvent event) {
        brave.kafka.streams.KafkaStreamsTracing kafkaStreamsTracing = configureTracing();
        Topology topology = buildTopology();
        Properties kafkaProperties = createStreamingConfig();
        streams = kafkaStreamsTracing.kafkaStreams(topology, kafkaProperties);
        streams.start();
}

private KafkaStreamsTracing configureTracing() {        
    AsyncReporter.builder(
    URLConnectionSender.create(zipkinEndpoint))
    .build();
    
    AsyncReporter<Span> asyncReporter = 
    AsyncReporter.create(URLConnectionSender.create(zipkinEndpoint));
    Tracing tracing = Tracing.newBuilder()
                                 .localServiceName(appName)
                                 .sampler(Sampler.ALWAYS_SAMPLE)
                                 .spanReporter(asyncReporter)
                                 .build();
    return KafkaStreamsTracing.create(tracing);
}

Additionally, the KStream object is needed to programatically add a span to the trace:

Topology buildTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    ValueMapper<String, String> jsonToXmlMapper = createValueMapper();
    KStream<String, String> inputStream = 
      builder.stream(converterConfiguration.inputKafkaTopic, 
      Consumed.with(Serdes.String(), Serdes.String()));

    // Important bit here - this adds a span:
    KStream<String, String> xmlStream = inputStream.transformValues(kafkaStreamsTracing.mapValues("json_to_xml", jsonToXmlMapper));

    xmlStream.to(converterConfiguration.outputKafkaTopic);
    return builder.build();
}

If you want a full concrete example, you can examine the changes I made to two existing microservices here and here.

Aside from modifying the code, we want to see it all in action! I have a project which spins up a Kafka stack along with the microservices, and this is an ideal playground to see the effects. Here’s the Jaeger UI showing Traces across the stack:

I also raised an enhancement for Quarkus. Once this is implemented, it should be simpler to add the tracing to your Quarkus Kafka Streams app!