Apache Kafka and Spring Boot quickstart

In this tutorial we will learn how to connect to a Kafka cluster from a Spring Boot REST Controller. As a proof of concept, we will set up a basic Web application which produces and consumes messages that will be streamed to Kafka.

First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Apache Kafka is generally used for two types of applications:

  • Application which build real-time streaming data pipelines that reliably get data between systems or applications
  • Applications which transform or react to the streams of data

To understand how Kafka does these things, let’s dive in a real example using Spring Boot.

First generate a project:

spring init -dweb,kafka kafka-demo 

The following dependencies will be added to your project:

<?xml version="1.0" encoding="UTF-8"?><project>
   <dependencies>
           
      <dependency>
                  
         <groupId>org.springframework.boot</groupId>
                  
         <artifactId>spring-boot-starter</artifactId>
              
      </dependency>
           
      <dependency>
                  
         <groupId>org.springframework.boot</groupId>
                  
         <artifactId>spring-boot-starter-web</artifactId>
              
      </dependency>
           
      <dependency>
                  
         <groupId>org.springframework.kafka</groupId>
                  
         <artifactId>spring-kafka</artifactId>
              
      </dependency>
       
   </dependencies>
    
</project>

Then, let’s write a KafkaConsumer. You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation. We will use the latter option:

package com.masterspringboot;

import org.springframework.stereotype.Service;
import org.springframework.kafka.annotation.KafkaListener;
import java.io.IOException;
import java.util.logging.Logger;

@Service
public class KafkaConsumer {
  private static final Logger logger = Logger.getLogger(KafkaConsumer.class.getName());

  @KafkaListener(topics = "mytopic", groupId = "group_id")
  public void consume(String message) throws IOException {
    logger.info(String.format("Consumed message -> %s", message));
  }
}

Then, let’s write a KafkaProducer. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics:

package com.masterspringboot;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.logging.Logger;

@Service
public class KafkaProducer {
  private static final Logger logger = Logger.getLogger(KafkaProducer.class.getName());
  private static final String TOPIC = "mytopic";
  @Autowired private KafkaTemplate<String, String> kafkaTemplate;

  public void sendMessage(String message) {
    logger.info(String.format("Sending message -> %s", message));
    this.kafkaTemplate.send(TOPIC, message);
  }
}

The next component we will add is a simple RestController which will publish messages to the Kafka topic, as new GET requests (with a Path Variable) arrive under the “/kafka/publish/{message}” URI:

package com.masterspringboot;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
  private final KafkaProducer producer;

  @Autowired
  KafkaController(KafkaProducer producer) {
    this.producer = producer;
  }

  @GetMapping(value = "/publish/{message}")
  public String sendMessageToKafkaTopic(@PathVariable("message") String message) {
    this.producer.sendMessage(message);
    return "Message sent! check logs!";
  }
}

To run the application, a simple @SpringBootApplication is included:

package com.masterspringboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SimpleKafkaApplication {
  public static void main(String[] args) {
    SpringApplication.run(SimpleKafkaApplication.class, args);
  }
}

Starting Kafka

The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:

version: '2'  services:    zookeeper:     image: strimzi/kafka:0.11.3-kafka-2.1.0     command: [       "sh", "-c",       "bin/zookeeper-server-start.sh config/zookeeper.properties"     ]     ports:       - "2181:2181"     environment:       LOG_DIR: /tmp/logs    kafka:     image: strimzi/kafka:0.11.3-kafka-2.1.0     command: [       "sh", "-c",       "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"     ]     depends_on:       - zookeeper     ports:       - "9092:9092"     environment:       LOG_DIR: "/tmp/logs"       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092       KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 

From the same directory where you have saved the docker-compose.yaml file execute:

docker-compose up 

Check from the Console that Kafka started successfully:

kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser) kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser) kafka_1      | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) 

Now you can start your Spring Boot application with:

mvn install spring-boot:run 

Then, you can start sending and consuming messages with: http://localhost:9000/kafka/publish/hello

spring boot kafka spring boot kafka spring boot kafka

On the Console, you should see:

2019-11-04 09:01:18.624  INFO 16891 --- [ntainer#0-0-C-1] com.masterspringboot.KafkaConsumer       : Consumed message -> hello 

Congratulations! You just managed to connect Apache Kafka with a Spring Boot application!

Source code for this article is available at: https://github.com/fmarchioni/masterspringboot/tree/master/kafka/kafka-demo

Java EE User?  if you are interested in the Enterprise version of this tutorial, check it out: WildFly and Kafka quickstart.

Found the article helpful? if so please follow us on Socials
Twitter Icon       Facebook Icon       LinkedIn Icon       Mastodon Icon