Kafka Springboot Hello world
Setting up Kafka
Download kafka Extract it using
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.12-2.4.1.tgz
Go to the location, in my case
cd /home/nuwan/rAndD/kafka/kafka_2.12–2.4.1
Start Zookeper in a new terminal
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Start kafka in a new terminal
./bin/kafka-server-start.sh ./config/server.properties
Create a topic
./bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic javainuse-topic
Create producer
./bin/kafka-console-producer.sh — broker-list localhost:9092 — topic javainuse-topic
Create consumer
./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic javainuse-topic — from-beginning
Verify your setup is working fine
Enter something in producer terminal and it will be printed on consumer terminal
Setting up kafka is completed.
Let’s Create the kafka producer as a spring boot java app
Create a sprint boot application for Kafka and web using https://start.spring.io/
Under the dependencies select
My application created as demo6. Download, extract and open it in your IDE.
Under com.nuwan.kafka6 create two more directories for controller and service. And add following classes as in the diagram.
ApacheKafkaWebController.java
KafkaSender.java
Create KafkaSender.java
package com.nuwan.kafka6.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
String kafkaTopic = "javainuse-topic";
public void send ( String message){
try {
kafkaTemplate.send(kafkaTopic, message);
}
catch (Exception e){
System.out.println(e.toString());
}
}
}
Create ApacheKafkaWebController.java
package com.nuwan.kafka6.controller;
import com.nuwan.kafka6.service.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class ApacheKafkaWebController {
@Autowired
KafkaSender kafkaSender;
@GetMapping(value = "/producer")
public String producer(@RequestParam("message") String message){
kafkaSender.send(message);
return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}
@GetMapping(value = "/test")
public String test(){
return "test";
}
}
Test in your local browser
Browser output should be
Message sent to the Kafka Topic java_in_use_topic Successfully
Verify message delivered to the consumer at kafka terminal
…
Let’s write a consumer to read those messages from the kafka
package com.nuwan.kafka6.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer { private final Logger logger =
LoggerFactory.getLogger(KafkaConsumer.class);
}
@KafkaListener(topics = "javainuse-topic", groupId = "group_id")
public void consume1(String message){
logger.info(
String.format("Consumed Message 1 -> %s",message));
}
}
Just add the above cord and run your project you should start seeing the message in the logs.