In this tutorial we will learn how to connect to a Kafka cluster from a Spring Boot REST Controller. As a proof of concept, we will set up a basic Web application which produces and consumes messages that will be streamed to Kafka.
First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Apache Kafka is generally used for two types of applications:
- Application which build real-time streaming data pipelines that reliably get data between systems or applications
- Applications which transform or react to the streams of data
To understand how Kafka does these things, let’s dive in a real example using Spring Boot.
First generate a project:
spring init -dweb,kafka kafka-demo
The following dependencies will be added to your project:
<?xml version="1.0" encoding="UTF-8"?><project> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> </project>
Then, let’s write a KafkaConsumer. You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation. We will use the latter option:
package com.masterspringboot; import org.springframework.stereotype.Service; import org.springframework.kafka.annotation.KafkaListener; import java.io.IOException; import java.util.logging.Logger; @Service public class KafkaConsumer { private static final Logger logger = Logger.getLogger(KafkaConsumer.class.getName()); @KafkaListener(topics = "mytopic", groupId = "group_id") public void consume(String message) throws IOException { logger.info(String.format("Consumed message -> %s", message)); } }
Then, let’s write a KafkaProducer. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics:
package com.masterspringboot; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.logging.Logger; @Service public class KafkaProducer { private static final Logger logger = Logger.getLogger(KafkaProducer.class.getName()); private static final String TOPIC = "mytopic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { logger.info(String.format("Sending message -> %s", message)); this.kafkaTemplate.send(TOPIC, message); } }
The next component we will add is a simple RestController which will publish messages to the Kafka topic, as new GET requests (with a Path Variable) arrive under the “/kafka/publish/{message}” URI:
package com.masterspringboot; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping(value = "/kafka") public class KafkaController { private final KafkaProducer producer; @Autowired KafkaController(KafkaProducer producer) { this.producer = producer; } @GetMapping(value = "/publish/{message}") public String sendMessageToKafkaTopic(@PathVariable("message") String message) { this.producer.sendMessage(message); return "Message sent! check logs!"; } }
To run the application, a simple @SpringBootApplication is included:
package com.masterspringboot; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SimpleKafkaApplication { public static void main(String[] args) { SpringApplication.run(SimpleKafkaApplication.class, args); } }
Starting Kafka
The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:
version: '2' services: zookeeper: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" environment: LOG_DIR: /tmp/logs kafka: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper ports: - "9092:9092" environment: LOG_DIR: "/tmp/logs" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
From the same directory where you have saved the docker-compose.yaml file execute:
docker-compose up
Check from the Console that Kafka started successfully:
kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Now you can start your Spring Boot application with:
mvn install spring-boot:run
Then, you can start sending and consuming messages with: http://localhost:9000/kafka/publish/hello
On the Console, you should see:
2019-11-04 09:01:18.624 INFO 16891 --- [ntainer#0-0-C-1] com.masterspringboot.KafkaConsumer : Consumed message -> hello
Congratulations! You just managed to connect Apache Kafka with a Spring Boot application!
Source code for this article is available at: https://github.com/fmarchioni/masterspringboot/tree/master/kafka/kafka-demo
Java EE User? if you are interested in the Enterprise version of this tutorial, check it out: WildFly and Kafka quickstart.
Found the article helpful? if so please follow us on Socials