Kafka Springboot Hello world

Nuwan Chamara
3 min readMar 16, 2020

--

Setting up Kafka

Download kafka Extract it using

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.12-2.4.1.tgz

Go to the location, in my case

cd /home/nuwan/rAndD/kafka/kafka_2.12–2.4.1

Start Zookeper in a new terminal

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

Start kafka in a new terminal

./bin/kafka-server-start.sh ./config/server.properties

Create a topic

./bin/kafka-topics.sh — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic javainuse-topic

Create producer

./bin/kafka-console-producer.sh — broker-list localhost:9092 — topic javainuse-topic

Create consumer

./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic javainuse-topic — from-beginning

Verify your setup is working fine

Enter something in producer terminal and it will be printed on consumer terminal

Setting up kafka is completed.

Let’s Create the kafka producer as a spring boot java app

Create a sprint boot application for Kafka and web using https://start.spring.io/

Under the dependencies select

My application created as demo6. Download, extract and open it in your IDE.

Under com.nuwan.kafka6 create two more directories for controller and service. And add following classes as in the diagram.

ApacheKafkaWebController.java

KafkaSender.java

Create KafkaSender.java

package com.nuwan.kafka6.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaSender {

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

String kafkaTopic = "javainuse-topic";

public void send ( String message){
try {
kafkaTemplate.send(kafkaTopic, message);
}
catch (Exception e){
System.out.println(e.toString());
}
}
}

Create ApacheKafkaWebController.java

package com.nuwan.kafka6.controller;

import com.nuwan.kafka6.service.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/kafka")
public class ApacheKafkaWebController {

@Autowired
KafkaSender kafkaSender;

@GetMapping(value = "/producer")
public String producer(@RequestParam("message") String message){
kafkaSender.send(message);
return "Message sent to the Kafka Topic java_in_use_topic Successfully";
}

@GetMapping(value = "/test")
public String test(){
return "test";
}

}

Test in your local browser

http://localhost:8080/kafka/producer?message=test2

Browser output should be

Message sent to the Kafka Topic java_in_use_topic Successfully

Verify message delivered to the consumer at kafka terminal

Let’s write a consumer to read those messages from the kafka

package com.nuwan.kafka6.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {
private final Logger logger =
LoggerFactory.getLogger(KafkaConsumer.class);
}

@KafkaListener(topics = "javainuse-topic", groupId = "group_id")
public void consume1(String message){
logger.info(
String.format("Consumed Message 1 -> %s",message));

}
}

Just add the above cord and run your project you should start seeing the message in the logs.

--

--

No responses yet