Kafka KStream Joins for Json Objects
Finding a simple hello world type tutorial for joining two JSON topics is not that easy. So I am trying to help you guys with the way how I did.
What is Kafka
Kafka is an event stream processing software platform developed by LinkedIn and now owned and managed by Apache Software Foundation. It combines messaging, storage, and stream processing to allow analysis and storage of both real-time and historical data. It’s scalable, fault-tolerant and publish-subscribe messaging system.
Kafka is scalable due to its partitioned log model which allows data to be distributed across multiple servers. data partitions are distributed and replicated across servers so durability is guaranteed. Decoupled nature of steams allows it to be very fast.
Why Should We Care
Kafka is widely used by BIG players.
Activision: Creator of computer game “Call of Duty”. 100k Messages per second.
Tinder: Popular dating app. 86Billion events 40TB per day.
Pinterest: 100 Billion events per month
Uber: 1 trillion events per day
Netflix:
LinkedIn: 7 trillion messages per day using 4000 servers
Architecture
Producers write data and consumers read data. Kafka stores key-value pairs with a timestamp. Kafka data layer is splitered into topics and topics have partitions. partitions are replicated and distributed to brokers. servers on Kafka cluster are known as brokers. This architecture has allowed Kafka to produce high-performance fault tolerance messaging queues.
Kafka users binary protocol over TCP.
Connect API
Kafka connect or connect API allows import-export data from other systems. it uses producer and consumer APIs.
Streams API
Streams API allows stream processing such as joins, aggregation, and filters.
Theory enough!
Let’s do some practical. Joining two Kafka streams? yes, that can be done easily if you are using primitive types. If you are into complex types such as JSON objects you are bit in trouble.
So what we doing:
- Implement a producer to create JSON objects and push to two separate Kafka topics.
- Create a Kstream consumer and consume the events
- Join the events
- Push back to Kafka
- All tasks are initiated using rest calls
KStreams
Kstreams are used to read data from Kafka and then process and then write back to Kafka using Java. The same can be done with tools such as Apache Flink, Storm, Nifi.
Joins
Joins are the same as SQL inner, left-right, and outer joins. But these are streams, so there is no data when you start joining. so we will be using a concept called window where each event will wait for some time in the memory to find a matching record. so this like taking micro-batches and then doing SQL like joins.
You Need a Running Kafka Server.
Let’s setup that first. Download it from here. Extract it to a folder. Mine is
cd /nuwan/kafka_2.12–2.4.1/
Let’s start the server. Go to the installation directory
cd /nuwan/kafka_2.12–2.4.1/
Start zookeeper first and wait till it finishes booting.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Start kafka server
./bin/kafka-server-start.sh ./config/server.properties
Now we have a running Kafka server. you can find some more details in my previous blog here.
Create The Project
This is my folder structure.
pom.xml — for maven dependencies
item.java — to hold JSON object
KafkaProducerController.java — event producer and spring-boot
KafkaProcessingController.java — consume and join events
pom.xml
Create a springboot project using this pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nuwan.kafka</groupId>
<artifactId>streamjoin</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>streamjoin</name>
<description>Demo project for Spring Boot</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
</project>
Spring framework spring-kafka package is used to produce events
Apache kafka kafka-streams package is used to consume events.
Item.java
Item is the class to hold JSON Object. Let’s create a java object to hold the JSON message that will be sending.
package nuwan.blog.ex6;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Item {
private int id;
private String name;
private String category;
public int getId() {
return id;
}
@JsonCreator
public Item(@JsonProperty("id") int id, @JsonProperty("name") String name, @JsonProperty("category") String category) {
this.id = id;
this.name = name;
this.category = category;
}
@Override
public String toString() {
return "Item{" +
"id=" + id +
", name='" + name + '\'' +
", category='" + category + '\'' +
'}';
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}
Notice how JSON properties are used to serialize and deserialize JSON objects from and to java objects.
KafkaProducerController.java
Create a Producer to fill Data To two topics. This will expose a rest endpoint to call the data producer method. Also, this has the spring-boot start main method.
package nuwan.blog.ex6;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
@RestController
public class KafkaProducerController {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerController.class, args);
}
@Autowired
KafkaTemplate<String, Item> KafkaJsontemplate;
String TOPIC_NAME = "my-kafka-left-stream-topic";
String TOPIC_NAME_2 = "my-kafka-right-stream-topic";
Item item;
public ProducerFactory<String, Item> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Item> kafkaTemplate() {
return new KafkaTemplate<String, Item>(producerFactory());
}
//sending same msg to both topics
@RequestMapping("/sendMessages2/")
public String sendMessages2() {
System.out.println("processing>> " + item);
item = new Item(1, "nuwan", "home");
KafkaJsontemplate.send(TOPIC_NAME, "1", item);
return "Message ONE published successfully";
}
@RequestMapping("/sendMessages3a/")
public String sendMessages3a() {
System.out.println("processing>> " + item);
item = new Item(1, "chamara", "office");
KafkaJsontemplate.send(TOPIC_NAME_2, "1", item);
return "Message ONE published successfully";
}
}// end of public class KafkaProducerController
Create a KStream Consumer To Join Objects
This is a class where the most important implementations happening.
package nuwan.blog.ex6;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
import java.util.Properties;
@RestController
public class KafkaProcessingController {
private KafkaStreams streamsInnerJoin;
private Properties properties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-inner-join");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return props;
}
private void streamsInnerJoinStart(StreamsBuilder builder) {
if (streamsInnerJoin != null) {
streamsInnerJoin.close();
}
final Topology topology = builder.build();
streamsInnerJoin = new KafkaStreams(topology, properties());
streamsInnerJoin.start();
}
public class ItemSerde extends Serdes.WrapperSerde<Item> {
public ItemSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Item.class));
}
}
@Autowired
KafkaProducerController kafkaProducerController;
@RequestMapping("/startStreaJoin/")
public void startStreamStreamInnerJoin3Auto() throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Item> leftSource = builder.stream("my-kafka-left-stream-topic"
, Consumed.with(Serdes.String(), new ItemSerde()));
KStream<String, Item> rightSource = builder.stream("my-kafka-right-stream-topic"
, Consumed.with(Serdes.String(), new KafkaProcessingController.ItemSerde()));
KStream<String, Item> joined = leftSource
.selectKey((key, value) -> key)
.join(rightSource.selectKey((key, value) -> key)
, (value1, value2) -> {
System.out.println("value2.getName() >> " + value1.getName() + value2.getName());
value2.setCategory(value1.getCategory());
return value2;
}
, JoinWindows.of(Duration.ofSeconds(10))
, Joined.with(
Serdes.String(),
new ItemSerde(),
new ItemSerde()
)
);
joined.to("my-kafka-stream-stream-inner-join-out", Produced.with(Serdes.String(), new ItemSerde()));
streamsInnerJoinStart(builder);
}
}
Create Consumers
Now run your application, it shouldn’t give errors as we have started the kafka already.
Let’s create 3 kafka consumers in terminals so that we can view what is in the topics.
Create a lister for my-kafka-left-stream-topic.
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic my-kafka-left-stream-topic — property print.key=true — property print.timestamp=true
Create a Listner for Topic my-kafka-right-stream-topic
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic my-kafka-right-stream-topic — property print.key=true — property print.timestamp=true
Create a Listner for my-kafka-stream-stream-inner-join-out
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic my-kafka-stream-stream-inner-join-out — property print.key=true — property print.timestamp=true
Note: if there is no topic created already in the kafka you will see an error message and then it will create topics, you can create those before starting if you hate those errors.
Testing
Now, all done! time to test.
Go to your browser and run 3 get commands
- Make the join call. This will start to join streams process and will start to listen to topics.
http://localhost:8080/startStreaJoin/
Output on terminal 1:
CreateTime:1588503245293 1 {“id”:1,”name”:”nuwan”,”category”:”home”}
2. Send data to Topic one.
http://localhost:8080/sendMessages2/
Output on terminal 2:
CreateTime:1588503247050 1 {“id”:1,”name”:”chamara”,”category”:”office”}
3. Send data to topic two.
http://localhost:8080/sendMessages3a/
Output on terminal 3:
CreateTime:1588503247050 1 {“id”:1,”name”:”chamara”,”category”:”home”}
Analysis
If you check the final JSON object it has a merged name and category. What I have done is joined two objects and got category from 1st and updated to 2nd.
If you thinking of creating a fully joined object, then create a new java object to hold all the information and fill when joining.
Technical Explanation On Implementation
Referring KafkaProcessingController.java class
Serde
public class ItemSerde extends Serdes.WrapperSerde<Item> {
public ItemSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Item.class));
}
}
We are creating an inner class called ItemSerde to handle serialization. please take a note it had been using. It’s a custom Serde where Kstreams used in serialization.
streamsInnerJoinStart()
private void streamsInnerJoinStart(StreamsBuilder builder) {
if (streamsInnerJoin != null) {
streamsInnerJoin.close();
}
final Topology topology = builder.build();
streamsInnerJoin = new KafkaStreams(topology, properties());
streamsInnerJoin.start();
}
This method will start the window and join the events coming through streams.
KStream<String, Item> leftSource = builder.stream("my-kafka-left-stream-topic"
, Consumed.with(Serdes.String(), new ItemSerde()));
KStream<String, Item> rightSource = builder.stream("my-kafka-right-stream-topic"
, Consumed.with(Serdes.String(), new KafkaProcessingController.ItemSerde()));
KStream<String, Item> leftSource = …
This line will create a reference to my-kafka-left-stream-topic topic
Consumed.with(Serdes.String(), new ItemSerde()))
When the leftSource is created we need to tell kstream how to map the key and value pairs. as i have described earlier kafka stores data as key/value pairs. As the leftSource is accepting string and an item object, we need to provide appropriate Serdes to do the mappings.
Join
1. KStream<String, Item> joined = leftSource
2. .selectKey((key, value) -> key)
3. .join(rightSource.selectKey((key, value) -> key)
4. , (value1, value2) -> {
5. System.out.println("value2.getName() >> " + value1.getName() + value2.getName());
6. value2.setCategory(value1.getCategory());
7. return value2;
}
8. , JoinWindows.of(Duration.ofSeconds(10))
9. , Joined.with(
Serdes.String(),
new ItemSerde(),
new ItemSerde()
)
);
This code does all join work.
- Joined is the new result stream created. leftSource is the primary stream picked to join.
2. SelectKey will select a primary key to join on leftSource. here we have picked the default key field itself. we can pick a field from value as well.
3. Join will do the join. rightSource is picked to join. selectKey selects again key field to do the join.
4. This lamda expression will receive key value pairs from the stream.
5. Values from line 4 is printed
6. Get the value1 that is item object from leftSource and read category info and add it to rightSource category.
7. Updated value2 object that is item object is returned to the joined stream.
8. The time window is 10 seconds, all the items are persisted for join to happen.
9. Here we will give how serialization happening for a key field, right value, and left value.
Git code is here.
Cheers!