Here I will explain real time java code
example for producing and consuming message from Apache
Kafka. Right now I am assuming that, there is a full up and running setup of Apache Kafka onto your machine and there is a topic IQUBALTOPIC already exists. If you have not Kafka setup, please go through my
previous blog onto Apache
Kafka.
Thus here Zookeeper and Kafka is up and running.
Please download the below explained Java Project from IMUKafkaPOC or from GitHub Source Code
Below Kafka Message Producer code will publish the message to the IQUBALTOPIC and whoever will register them for the message will get the message. Thus you need to run consumer code first then execute producer code. Message will automatically come onto the consumer console.
Kafka
Producer - IMUKafkaProducer JAVA
package com.kafka.producer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
*
* @author Iqubal Mustafa Kaki
*
*/
public class IMUKafkaProducer {
final static String TOPIC = "IQUBALTOPIC";
public static void main(String[] argv){
Properties properties = new Properties();
properties.put("metadata.broker.list","localhost:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer producer =
new kafka.javaapi.producer.Producer(producerConfig);
SimpleDateFormat sdf = new SimpleDateFormat();
KeyedMessage message =
new KeyedMessage(TOPIC,"Iqubal Producer Sending Message to Topic " +
sdf.format(new Date()));
producer.send(message);
producer.close();
System.out.println("Completed------");
}
}
Kafka Consumer - IMUKafkaConsumer Java
package com.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*
* @author Iqubal Mustafa Kaki
*
*/
public class IMUKafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;
public IMUKafkaConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.commit.enable", "false");
props.put("queued.max.message.chunks", "10000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
this.topic = topic;
}
public void topicMessageConsumer() {
try {
Map<String, Integer> topicCount = new HashMap();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams
.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
if (it.isEmpty())
System.out.println("Topic is empty-----" + it.size());
else
System.out.println("Topic size is-----" + it.size());
while (it.hasNext()) {
System.out.println("Message from Single Topic: "
+ new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
} catch (Exception ex) {
System.out.print("error" + ex.getMessage());
}
}
public static void main(String[] args) {
String topic = "IQUBALTOPIC";
IMUKafkaConsumer imuKafkaConsumer = new IMUKafkaConsumer(
"localhost:2181", "kafka-consumer-group", topic);
imuKafkaConsumer.topicMessageConsumer();
}
}
Want to connect with me
If you want to connect with me, please connect through my email - iqubal.kaki@gmail.com
It is nice blog Thank you provide important information and i am searching for same information to save my timeHadoop Admin Online Course
ReplyDelete