Saturday, January 23, 2016

Apache Kafka Producer Consumer: Real Time Java Project

Apache Kafka Producer Consumer: Real Time Java Project

Here I will explain real time java code example for producing and consuming message from Apache Kafka. Right now I am assuming that, there is a full up and running setup of Apache Kafka onto your machine and there is a topic IQUBALTOPIC already exists. If you have not Kafka setup, please go through my previous blog onto Apache Kafka.






















Thus here Zookeeper and Kafka is up and running. 

Please download the below explained Java Project from IMUKafkaPOC or from GitHub Source Code

Below Kafka Message Producer code will publish the message to the IQUBALTOPIC and whoever will register them for the message will get the message. Thus you need to run consumer code first then execute producer code. Message will automatically come onto the consumer console.

Kafka Producer - IMUKafkaProducer JAVA

package com.kafka.producer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaProducer {
    final static String TOPIC = "IQUBALTOPIC";


  public static void main(String[] argv){
   Properties properties = new Properties();
   properties.put("metadata.broker.list","localhost:9092");
   properties.put("serializer.class","kafka.serializer.StringEncoder");
   ProducerConfig producerConfig = new ProducerConfig(properties);
   kafka.javaapi.producer.Producer producer = 
    new kafka.javaapi.producer.Producer(producerConfig);
   SimpleDateFormat sdf = new SimpleDateFormat();
   KeyedMessage message =
   new KeyedMessage(TOPIC,"Iqubal Producer Sending Message to Topic " + 
   sdf.format(new Date()));
   producer.send(message);
   producer.close();
   System.out.println("Completed------");
  }
}
Kafka Consumer - IMUKafkaConsumer Java


package com.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaConsumer { private final ConsumerConnector consumer; private final String topic; public IMUKafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "false"); props.put("queued.max.message.chunks", "10000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( props)); this.topic = topic; } public void topicMessageConsumer() { try { Map<String, Integer> topicCount = new HashMap(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams .get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); if (it.isEmpty()) System.out.println("Topic is empty-----" + it.size()); else System.out.println("Topic size is-----" + it.size()); while (it.hasNext()) { System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } catch (Exception ex) { System.out.print("error" + ex.getMessage()); } } public static void main(String[] args) { String topic = "IQUBALTOPIC"; IMUKafkaConsumer imuKafkaConsumer = new IMUKafkaConsumer( "localhost:2181", "kafka-consumer-group", topic); imuKafkaConsumer.topicMessageConsumer(); } }
First you need to run consumer code then execute the producer code. Message will automatically populate onto the consumer console.

Hope you have enjoyed the article.
Author : Iqubal Mustafa Kaki, Technical Specialist
Want to connect with me
If you want to connect with me, please connect through my email - iqubal.kaki@gmail.com



1 comment:

  1. It is nice blog Thank you provide important information and i am searching for same information to save my timeHadoop Admin Online Course

    ReplyDelete