this is the second tutorial about creating a Java Producer an Consumer with Apache Kafka. In the first tutorial we have learnt how to set up a Maven project to run a Kafka Java Consumer and Producer (Kafka Tutorial: Creating a Java Producer and Consumer ) Now we will code a more advanced use case, when custom Java types are used in messaging.
This project is composed of the following Classes:
- SampleKafkaProducer: A standalone Java class which sends messages to a Kafka topic.
- SampleKafkaConsumer: A standalone Java class which consumes Kafka messages from to a Topic
- StockQuote: A Java Bean which is sent as a message
- StockQuoteSerializer: A Java class which is used by Kafka to Serialize the StockQuote in a stream of bytes
- StockQuoteDeserializer: A Java class which is used by Kafka to deserialize the stream of bytes into a StockQuote object
It is required that you start a single Node Kafka cluster as discussed in this tutorial: Getting started with Apache Kafka
Let’s start from the SampleKafkaProducer class:
package com.masteringintegration.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; // Example class sending text Messages to Kafka // cluster public class SampleKafkaProducer extends // Thread { private final KafkaProducer<String, // StockQuote> producer; private final String // topic; private final Boolean isAsync; // public static final String KAFKA_SERVER_URL = // "localhost"; public static final int // KAFKA_SERVER_PORT = 9092; public static final // String CLIENT_ID = "SampleProducer"; public // static final String TOPIC = "testTopic"; // public static final int MESSAGES = 100; // public SampleKafkaProducer(String topic, Boolean // isAsync) { Properties properties = new // Properties(); // properties.put("bootstrap.servers", // KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT); // properties.put("client.id", CLIENT_ID); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "com.masteringintegration.kafka.StockQuoteSerializer"); producer = new KafkaProducer(properties); this.topic = topic; this.isAsync = isAsync; } public static void main( String[] args ) { boolean isAsync = false; SampleKafkaProducer producer = new SampleKafkaProducer(TOPIC, isAsync); // start the producer producer.start(); } public void run() { int messageNo = 1; while (messageNo < MESSAGES) { String key = String.valueOf(messageNo); StockQuote quote = new StockQuote(); long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, key, quote), new MessageCallBack(startTime, key, quote)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, key, quote)).get(); System.out.println("Sent Quote: (" + key + ", " + quote + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); // handle the exception } } ++messageNo; } } } class MessageCallBack implements Callback { private final long startTime; private final String key; private final StockQuote quote; public MessageCallBack(long startTime, String key, StockQuote quote) { this.startTime = startTime; this.key = key; this.quote = quote; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "quote(" + key + ", " + quote + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }
This class extends Thread so that each message is sent asynchronously to a Topic named “testTopic”.
Let’s check now the Kafka Consumer class:
package com.masteringintegration.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class SampleKafkaConsumer { public static void main(String[] args) { String server = "127.0.0.1:9092"; String groupId = "SampleKafkaConsumer"; String topic = "testTopic"; new SampleKafkaConsumer(server, groupId, topic).run(); } // Variables private final Logger mLogger = LoggerFactory.getLogger(SampleKafkaConsumer.class.getName()); private final String mBootstrapServer; private final String mGroupId; private final String mTopic; // Constructor SampleKafkaConsumer(String bootstrapServer, String groupId, String topic) { mBootstrapServer = bootstrapServer; mGroupId = groupId; mTopic = topic; } // Public void run() { mLogger.info("Creating consumer thread"); CountDownLatch latch = new CountDownLatch(1); ConsumerRunnable consumerRunnable = new ConsumerRunnable(mBootstrapServer, mGroupId, mTopic, latch); Thread thread = new Thread(consumerRunnaable); thread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { mLogger.info("Caught shutdown hook");consumerRunnable.shutdown();await (latch);mLogger.info("Application has exited"); })); await (latch); } // Private void await (CountDownLatch latch) { try { latch.await(); } catch (InterruptedException e) { mLogger.error("Application got interrupted", e); } finally { mLogger.info("Application is closing"); } } // Inner classes private class ConsumerRunnable implements Runnable { private CountDownLatch mLatch; private KafkaConsumer < String, StockQuote > mConsumer; ConsumerRunnable(String bootstrapServer, String groupId, String topic, CountDownLatch latch) { mLatch = latch; Properties props = consumerProps(bootstrapServer, groupId); mConsumer = new KafkaConsumer < > (props); mConsumer.subscribe(Collections.singletonList(topic)); } @Override public void run() { try { while (true) { ConsumerRecords < String, StockQuote > records = mConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord < String, StockQuote > record: records) { mLogger.info("Key: " + record.key() + ", Value: " + record.value()); mLogger.info("Partition: " + record.partition() + ", Offset: " + record.offset()); } } } catch (WakeupException e) { mLogger.info("Received shutdown signal!"); } finally { mConsumer.close(); mLatch.countDown(); } } void shutdown() { mConsumer.wakeup(); } private Properties consumerProps(String bootstrapServer, String groupId) { String deserializer = "com.masteringintegration.kafka.StockQuoteDeserializer"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return properties; } } }
The consumer class implements Runnable so that messages can be consumed asynchronously as well.
The Java Bean class which is sent over the wire is StockQuote which is basically an example of a random Stock Quote ticker:
package com.masteringintegration.kafka; import java.util.Random; public class StockQuote { private int quote; private String name; public StockQuote() { String company[] = new String[] { "Microsoft", "IBM", "Oracle", "Accenture", "HPE" }; Random rand = new Random(); quote = rand.nextInt(1000); name = company[rand.nextInt(5)]; } public int getQuote() { return quote; } public void setQuote(int quote) { this.quote = quote; } public String getName() { return name; } public void setName(String name) { this.name = name; } @java.lang.Override public java.lang.String toString() { return "StockQuote{" + "quote=" + quote + ", name='" + name + '' ' + ' } '; } }
In order to be sent over the wire, we need a custom Serializer and Deserializer for this class. Here is the StockQuoteDeserializer which implements Deserializer:
package com.masteringintegration.kafka; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import com.fasterxml.jackson.databind.ObjectMapper; public class StockQuoteDeserializer implements Deserializer<StockQuote> { @Override public void close() {} @Override public void configure(Map<String, ?> arg0, boolean arg1) {} @Override public StockQuote deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); StockQuote quote = null; try { quote = mapper.readValue(arg1, StockQuote.class); } catch (Exception e) { e.printStackTrace(); } return quote; } }
And the corresponding StockQuoteSerializer class:
package com.masteringintegration.kafka; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.errors.SerializationException; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; public class StockQuoteSerializer implements Serializera { private ObjectMapper objectMapper; @Override public void configure(Map<String, ?> configs, boolean isKey) { this.objectMapper = new ObjectMapper(); } @Override public byte[] serialize(String topic, StockQuote quote) { try { return objectMapper.writeValueAsBytes(quote); } catch (Exception e) { throw new SerializationException("Error serializing object", e); } } @Override public void close() {} }
In addition to Kafka client classes, we have added also Jackson Data-bind dependencies in pom.xml file:
<?xml version="1.0" encoding="UTF-8"?><project> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.6.0</version> <executions> <execution> <id>consumer</id> <configuration> <mainClass>com.masteringintegration.kafka.SampleKafkaConsumer</mainClass> </configuration> </execution> <execution> <id>producer</id> <configuration> <mainClass>com.masteringintegration.kafka.SampleKafkaProducer</mainClass> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
As you can see, we have included two different executions so that you can run both java standalone applications. You can run the Producer as follows:
mvn exec:java@producer
The messages will be sent to the destination:
Sent Quote: (1, StockQuote{quote=281, name='Accenture'}) Sent Quote: (2, StockQuote{quote=458, name='IBM'}) Sent Quote: (3, StockQuote{quote=209, name='Microsoft'}) Sent Quote: (4, StockQuote{quote=764, name='IBM'}) Sent Quote: (5, StockQuote{quote=489, name='IBM'}) Sent Quote: (6, StockQuote{quote=518, name='HPE'}) Sent Quote: (7, StockQuote{quote=708, name='Oracle'}) Sent Quote: (8, StockQuote{quote=921, name='Accenture'}) Sent Quote: (9, StockQuote{quote=964, name='Oracle'}) Sent Quote: (10, StockQuote{quote=706, name='Accenture'}) . . . . .
Now start the Consumer application:
mvn exec:java@consumer
You will see that messages are consumed from the Topic:
Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=890, name='Microsoft'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24772 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=908, name='HPE'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24773 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=763, name='HPE'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24774 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=841, name='Accenture'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24775 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=971, name='HPE'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24776 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=176, name='Microsoft'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24777 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key:, Value: StockQuote{quote=945, name='Microsoft'} [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Partition: 0, Offset: 24778 [Thread-2] INFO com.masteringintegration.kafka.SampleKafkaConsumer - Key: , Value: StockQuote{quote=201, name='Oracle'} . . . .
Congratulations! You have just managed to connect to Kafka cluster using a Java Producer and a Consumer and custom Java objects.
Source code for this tutorial: https://github.com/fmarchioni/masterspringboot/tree/master/camel/kafka/java-custom-objects
Found the article helpful? if so please follow us on Socials