How to use Kafka Streams

In this blog post I’m going to write about how to use Kafka Streams for stateful stream processing. I’m going to assume that the reader knows the basic ABC’s (producers, consumers, brokers, topics) of Apache Kafka.

Problem Statement: We needed a system which would consume messages, perform real-time fast stateful processing of these messages, and then forward these processed messages downstream with features like scalability, fault-tolerance, high throughput and millisecond processing latency etc.

 

Historically at Yesware, we had used RabbitMQ as our messaging system with good results so far. But at the time when we had this requirement, the latest 0.10 release of Apache Kafka had introduced a new feature called Kafka Streams. So our options were:

  1. Use RabbitMQ (which we were quite familiar with) for messaging and at the downstream consumer level use a fast in-memory data store (like Redis – which we also had used quite extensively) to do stateful processing, and forward the processed messages to the final destination
  2. Use Kafka for messaging (which we had used sparingly), and try our luck with this new thing called Kafka Streams which looked quite promising – mainly because it builds on the highly scalable, elastic, distributed, fault-tolerant capabilities integrated natively within Kafka.

We decided to choose Kafka for messaging, because we had expected a big chunk of messages traffic per second and also just because we wanted to expand our boundaries. Now, with Kafka as our messaging service, we still could have used a stream processor framework existing outside of the Kafka system but then we would have faced lot of complexity (picture on the left), so choosing Kafka Streams was quite tempting indeed (picture on the right).

comparison kafka

So let’s quickly go over the basic concepts, before we dive into the code snippets.

Kafka

Producers write data to Kafka, consumers read that data from Kafka. The important part is that reading and writing is decoupled. The brokers in a Kafka cluster are responsible for storing and serving the data.

kafka store serve

Inside a Kafka cluster, the data is organized into topics. Within a topic, data is partitioned and within a partition, there is strict ordering of messages (which are key-value pairs). These partitions are distributed across the cluster (for things like load balancing, fault tolerance, replication and so forth).

kafka topic partiiton

Kafka Streams

In Kafka streams, we have ordered sequence of key-value records which is known as a stream. It represents an unbounded, continuously updating data set.

stream

This data stream is processed by a processor topology that defines the computational logic of the data processing that needs to be performed by a stream processing application.

topology

Similar to Kafka’s topic partitions, Kafka Streams has stream partitions. Stream tasks are basically copies of the same processing logic, that are getting a subset of data that needs to be processed.

stream partitions

Perhaps the most important concept of Kafka Streams is the duality of streams and tables.

streams tables duality

On the left hand side, we have a traditional database table which is updated via transactions in time. Now we can see that these database transactions can actually be interpreted as a changelog stream of data in the form of key-value pairs. And from this stream, we can reconstruct the table again.

For stateful computations, we have state stores in Kafka Streams, which are basically in-memory key-value stores. We get a RocksDB implementation as a default state store with Kafka Streams. The state store can also be any convenient data structure. The state stores retain Kafka features like fault tolerance and elasticity.

Now before we write a stream processing application based on Kafka Streams, we need to choose how to define topologies, which can be done in two ways:

  1. Kafka Streams DSL: this abstracts state stores away from the developer, and utilizes concepts like KStream and KTable based on the stream-table duality mentioned above
  2. Low-level processor API: more flexible, you get to interact closely with the state stores and can write customized logic. DSL is built on top of this API

For our purpose, we chose the Low-level processor API because we needed finer control over things. Time to see some code. Say we have a Kafka topic and we want to, a) consume messages from it, b) do some stateful processing and c) forward the processed messages downstream.

For our problem statement, we will write a stream processing application which will save the world. Yes, that’s right. We will write a class called AvengersAssembler which will consume messages (Avengers) from a Kafka topic called MCU (in the order of how we were introduced to all the Marvel superheroes), process (assemble) the messages, and forward them to battles (a Kafka topic called InfinityWars).

So the first message will be from Iron Man (2008)

[code]
"key": "GS", (my initials)
"value": { "iron-man": "metal suit" }
[/code]

Second message from The Incredible Hulk (2008)

[code]
"key": "GS",
"value": { "hulk": "smash" }
[/code]

And so on. The task is, that for the first incoming message, we will forward just the Iron Man for his battle against Obadiah Stane (Iron Monger). But when the second message is consumed, we will aggregate the messages such that we forward both the messages and the outgoing payload looks like

[code]
"iron-man": "metal suit",
"hulk": "smash"
[/code]

The end product is that when we reach Avengers Infinity Wars (2019), we should have assembled all of the avengers

[code]
"iron-man": "metal suit",
"hulk": "smash",
"thor": "mjolnir",
"captain-america": "shield",
"black-widow": "guns",
"star-lord": "jet boots",
"rocket": "bombs",
"doctor-strange": "astral ring"
...
...
[/code]

Now to the code, let’s make a class called Main.java where we would have all the configuration. This is what the main method looks like 

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStrams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
 
// the configuration of the Kafka Streams library
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "avengers-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

Here you basically give an id to your stream processing application, register it to a cluster server, and declare some serialization-deserialization interfacing for your key value pairs. This is pretty much a boilerplate code. Next thing that comes in the main method is the configuration of our state stores

StateStoreSupplier avengersStore = Stores.create("AvengersStore")
                                   .withKeys(Serdes.String())
                                   .withValues(Serdes.String())
                                   .persistent()
                                   .build();

Note: For all of this to work, we have to make sure we have these Kafka topics:
MCU – the source topic,
InfinityWars – the sink topic and one
avengers-aggregator-AvengersStore-changelog (remember the correlation between tables and changelog streams?)

Next we build the processor topology

// associate processes, processor nodes and state stores
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "MCU" as input
builder.addSource("Source", "MCU")
       // the node which takes the source node as its upstream processor, a class which we will write shortly
       .addProcessor("AssemblerProcessor", () –› new AvengersAssembler(), "Source")
       // the store associated with the AvengersAssembler processor
       .addStateStore(avengersStore, "AssemblerProcess")
       // the sink node which is the topic "InfinityWars" and has the AvengersAssembler node as its upstream processor
       .addSink("InfinityWarsSink", "InfinityWars", "AssemblerProcess");

Basically, what that means is that a) we will consume messages from a kafka topic called MCU, b) we will write a class called AvengersAssembler which is our stream processing class, c) we will use a state store called AvengersStore, and when done, d) we will forward the processed messages to a topic called InfinityWars. That’s all for Main.java

Let’s write the stream processor class. A processor class must override these methods from the Processor class from Kafka Streams

@Override
init(ProcessorContext context)
@Override
process(String key, String value)
@Override
punctuate(long timestamp)
@Override
close()

This is how these work
 – init is called by the streams library, in this step we will initialize stuff
 – process is called on every received message, in this step we will process each message and do all the logic
 – punctuate is called every N time units, in this step we will forward the processed messages
– close is used to do any clean up work

This is what AvengersAssembler.java looks like

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafkka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.concurrent.BlockingQueue;
 
public class AvengersAssembler implements Processor‹String, String{
  private ProcessorContext context;
  private KeyValueStore‹String, String› kvStore;
  private BlockingQueue‹TreeMap‹String, String›› toBePublished;
 
  // called by the Kafka Streams library during task construction phase
  @Override
  public void init(ProcessorContext context) {
    // keep the process context locally because we need it in punctuate() and commit()
    this.context = context;
    // call this processor’s punctuate() method every 5000 time units
    this.context.schedule(5000);
    // retrieve the key-value store named "AvengersStore"
    this.kvStore = (KeyValueStore) context.getStateStore("AvengersStore");
    // initialize a synchronized queue to store unpublished messages in the form of Map‹UserId, Superheroes› entries
    this.toBePublished = new LinkedBlockingQueue‹›();
...
...
}

Next, the process method

@Override
public void process(String userId, String incomingMessage) {
  String oldMessages = this.kvStore.get(userId);
  String unpublishedMessage = "";
  if (oldMessages) {
     unpublishedMessage = aggregateHeroes(oldMessages, incomingMessage);
     this.kvStore.put(userId, updatedMessage);
  } else {
      unpublishedMessage = incomingMessage;
      this.kvStore.put(userId, incomingMessage);
  }
 
  TreeMap‹String, String› userMessagePair = new TreeMap‹›();
  userMessagePair.put(userId, unpublishedMessage);
  this.toBePublished.add(userMessagePair);
}

Nothing really complex is happening in the process method. First time, we get a message,

[code]
"key": "GS",
"value": { "iron-man": "metal suit" }
[/code]

we see that this entry does not exist in the store for “GS”, so we save this entry in the state store, and add this to a queue which is drained later (in the punctuate method which we’ll write shortly) thereby effectively doing Iron Man vs Iron Monger. Next time we’ll receive say

[code]
"key": "GS",
"value": { "hulk": "smash" }
[/code]

Now, we see that a value does exist for “GS”, so we will aggregate the values using a aggregateHeroes method (not implemented here) and our state store will look like

[code]
"key": "GS",
"value": { "iron-man": "metal suit" }, { "hulk": "smash" }
[/code]

Lastly, the punctuate method

@Override
public void punctuate(long timestamp) {
  // dequeue the whole queue and forward to the sink node
  while(!this.toBePublished.isEmpty()) {
    TreeMap‹String, String› userMessagePair = this.toBePublished.poll();
    if(userMessagePair.size() != 0) {
      String userId = userMessagePair.firstKey();
      String avengers = userMessagePair.get(userId);
      this.context.forward(userId, avengers);
    }
  }
}

Recall that we have scheduled the punctuate method to be called every 5s, so we fill and drain our queue of unpublished messages every 5s. i.e. at the end of each cycle we forward the messages to our sink topic.

At the end of it all,

infinity wars

So in this blog post, we saw how to use Kafka Streams, and in the process tried to save the world. Time to grab a Guinness and witness the epic battle against Thanos.

Leave a Reply

Your email address will not be published. Required fields are marked *