Saturday, January 23, 2016

Apache Kafka Producer Consumer: Real Time Java Project

Apache Kafka Producer Consumer: Real Time Java Project

Here I will explain real time java code example for producing and consuming message from Apache Kafka. Right now I am assuming that, there is a full up and running setup of Apache Kafka onto your machine and there is a topic IQUBALTOPIC already exists. If you have not Kafka setup, please go through my previous blog onto Apache Kafka.






















Thus here Zookeeper and Kafka is up and running. 

Please download the below explained Java Project from IMUKafkaPOC or from GitHub Source Code

Below Kafka Message Producer code will publish the message to the IQUBALTOPIC and whoever will register them for the message will get the message. Thus you need to run consumer code first then execute producer code. Message will automatically come onto the consumer console.

Kafka Producer - IMUKafkaProducer JAVA

package com.kafka.producer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaProducer {
    final static String TOPIC = "IQUBALTOPIC";


  public static void main(String[] argv){
   Properties properties = new Properties();
   properties.put("metadata.broker.list","localhost:9092");
   properties.put("serializer.class","kafka.serializer.StringEncoder");
   ProducerConfig producerConfig = new ProducerConfig(properties);
   kafka.javaapi.producer.Producer producer = 
    new kafka.javaapi.producer.Producer(producerConfig);
   SimpleDateFormat sdf = new SimpleDateFormat();
   KeyedMessage message =
   new KeyedMessage(TOPIC,"Iqubal Producer Sending Message to Topic " + 
   sdf.format(new Date()));
   producer.send(message);
   producer.close();
   System.out.println("Completed------");
  }
}
Kafka Consumer - IMUKafkaConsumer Java


package com.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaConsumer { private final ConsumerConnector consumer; private final String topic; public IMUKafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "false"); props.put("queued.max.message.chunks", "10000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( props)); this.topic = topic; } public void topicMessageConsumer() { try { Map<String, Integer> topicCount = new HashMap(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams .get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); if (it.isEmpty()) System.out.println("Topic is empty-----" + it.size()); else System.out.println("Topic size is-----" + it.size()); while (it.hasNext()) { System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } catch (Exception ex) { System.out.print("error" + ex.getMessage()); } } public static void main(String[] args) { String topic = "IQUBALTOPIC"; IMUKafkaConsumer imuKafkaConsumer = new IMUKafkaConsumer( "localhost:2181", "kafka-consumer-group", topic); imuKafkaConsumer.topicMessageConsumer(); } }
First you need to run consumer code then execute the producer code. Message will automatically populate onto the consumer console.

Hope you have enjoyed the article.
Author : Iqubal Mustafa Kaki, Technical Specialist
Want to connect with me
If you want to connect with me, please connect through my email - iqubal.kaki@gmail.com



4 comments:

  1. It is nice blog Thank you provide important information and i am searching for same information to save my timeHadoop Admin Online Course

    ReplyDelete
  2. An iOS dev course provides complete knowledge of developing mobile applications using Apple technologies. It explains Swift programming, development tools, and user interface design concepts. This ios dev course helps learners improve their programming skills. Students practice development through assignments and exercises. Projects provide hands-on experience in building real applications. The course prepares learners for professional iOS development careers.

    ReplyDelete
  3. Tableau online training offers complete knowledge of business intelligence tools and visualization techniques. It explains data blending and dashboard publishing clearly. This tableau online training improves analytical thinking and presentation skills. Learners gain real-time project experience. Practical assignments are included. Certification guidance is provided. It prepares industry-ready professionals.

    ReplyDelete
  4. Excellent read! Learn ui design classes online
    design step-by-step, including wireframing, prototyping, user flows, and usability testing, preparing you for careers in digital design.

    ReplyDelete