Friday, January 29, 2016

Apache Hive XML Data Analysis

Loading XML Data into Apache Hive

It is very interesting to do analysis of XML data using Apache Hive. Suppose we have large XML file and we need to process these data using Hadoop Ecosystem. Here I will explain about Loading of XML data into Apache Hive table.
















Here a sample XML is as following

Company.xml
<Employee>
<Name>Bill Gates</Name>
<Email>BILLGATES@Microsoft.Com</Email>
<Contact>XXXXXXXXXX</Contact>
</Employee>
<Employee>
<Name>Steve Jobs</Name>
<Email>SteveJobs@Apple.Com</Email>
<Contact> XXXXXXXXXX</Contact>
</Employee>
<Employee>
<Name>Iqubal Mustafa Kaki</Name>
<Email>IqubalMustafaKaki@SZIAS.Com</Email>
<Contact>8390900000</Contact>
</Employee>


Here we will load XML data into hive table by using XPATH() .

Thus we will explore XPATH() which will used for converting XML data to String Array.

Steps for loading XML Data into Hive Table

Step:1 Structured the XML, by executing below command

cat Company.xml | tr -d '&' | tr '\n' ' ' | tr '\r' ' ' | sed 's|</Employee>|</Employee>\n|g'| grep -v '^\s*$' > company_structured_records.xml


Now we have company_structured_records.xml file for the further operation.

Step:2 Create a Hive table and load the XML file into the table

hive> create table CompanyXMLTablePOC(XMLDATA String);

hive> load data local inpath '/home/hadoopadmin/Desktop/company_structured_records.xml' into table CompanyXmlTablePOC;


Step:3 Convert the XML Data into Array Format Using XPATH()

hive> select xpath(xmldata, 'Employee/*/text()') from CompanyXmlTablePOC;

you will get the result result as below.

OK
["Bill Gates","BILLGATES@Microsoft.Com","XXXXXXXXXX"]
["Steve Jobs","SteveJobs@Apple.Com"," XXXXXXXXXX"]
["Iqubal Mustafa Kaki","IqubalMustafaKaki@SZIAS.Com","8390900000"]
Time taken: 1.503 seconds, Fetched: 3 row(s)




























Step:4  Create the HIVE Table and Insert Data from Above Created Table

hive> create table company AS select xpath_string(xmldata,'Employee/Name'),xpath_string(xmldata,'Employee/Email'),xpath_string(xmldata,'Employee/Contact')from companyxmltablepoc;







hive> select * from company;
OK
Bill Gates BILLGATES@Microsoft.Com XXXXXXXXXX
Steve Jobs SteveJobs@Apple.Com XXXXXXXXXX
Iqubal Mustafa Kaki IqubalMustafaKaki@SZIAS.Com 8390900000

Time taken: 5.616 seconds, Fetched: 3 row(s)





Hope you have enjoyed the article.
Author : Iqubal Mustafa Kaki, Technical Specialist

Want to connect with me
If you want to connect with me, please connect through my email - 
iqubal.kaki@gmail.com

Saturday, January 23, 2016

Apache Kafka Producer Consumer: Real Time Java Project

Apache Kafka Producer Consumer: Real Time Java Project

Here I will explain real time java code example for producing and consuming message from Apache Kafka. Right now I am assuming that, there is a full up and running setup of Apache Kafka onto your machine and there is a topic IQUBALTOPIC already exists. If you have not Kafka setup, please go through my previous blog onto Apache Kafka.






















Thus here Zookeeper and Kafka is up and running. 

Please download the below explained Java Project from IMUKafkaPOC or from GitHub Source Code

Below Kafka Message Producer code will publish the message to the IQUBALTOPIC and whoever will register them for the message will get the message. Thus you need to run consumer code first then execute producer code. Message will automatically come onto the consumer console.

Kafka Producer - IMUKafkaProducer JAVA

package com.kafka.producer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaProducer {
    final static String TOPIC = "IQUBALTOPIC";


  public static void main(String[] argv){
   Properties properties = new Properties();
   properties.put("metadata.broker.list","localhost:9092");
   properties.put("serializer.class","kafka.serializer.StringEncoder");
   ProducerConfig producerConfig = new ProducerConfig(properties);
   kafka.javaapi.producer.Producer producer = 
    new kafka.javaapi.producer.Producer(producerConfig);
   SimpleDateFormat sdf = new SimpleDateFormat();
   KeyedMessage message =
   new KeyedMessage(TOPIC,"Iqubal Producer Sending Message to Topic " + 
   sdf.format(new Date()));
   producer.send(message);
   producer.close();
   System.out.println("Completed------");
  }
}
Kafka Consumer - IMUKafkaConsumer Java


package com.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
 * 
 * @author Iqubal Mustafa Kaki
 *
 */
public class IMUKafkaConsumer { private final ConsumerConnector consumer; private final String topic; public IMUKafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "false"); props.put("queued.max.message.chunks", "10000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig( props)); this.topic = topic; } public void topicMessageConsumer() { try { Map<String, Integer> topicCount = new HashMap(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams .get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); if (it.isEmpty()) System.out.println("Topic is empty-----" + it.size()); else System.out.println("Topic size is-----" + it.size()); while (it.hasNext()) { System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } catch (Exception ex) { System.out.print("error" + ex.getMessage()); } } public static void main(String[] args) { String topic = "IQUBALTOPIC"; IMUKafkaConsumer imuKafkaConsumer = new IMUKafkaConsumer( "localhost:2181", "kafka-consumer-group", topic); imuKafkaConsumer.topicMessageConsumer(); } }
First you need to run consumer code then execute the producer code. Message will automatically populate onto the consumer console.

Hope you have enjoyed the article.
Author : Iqubal Mustafa Kaki, Technical Specialist
Want to connect with me
If you want to connect with me, please connect through my email - iqubal.kaki@gmail.com



Sunday, January 17, 2016

Apache Kafka & Zookeeper - Multi-Broker Apache Kafka Cluster on a Single Node

Apache Kafka & Zookeeper - Multi-Broker Apache Kafka Cluster on a Single Node
Apache Kafka is an open source, distributed publish-subscribe messaging system. In comparison to most messaging systems, Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which make it a good solution for large scale message processing applications.






















Kafka is built as a modern distributed system. Data is replicated and partitioned over a cluster of machines that can grow and shrink transparently to the applications using the cluster. Consumers of data can be scaled out over a pool of machines as well and automatically adapt to failures in the consuming processes.


A key aspect of Kafka's design is that it handles large set of data very easily. A Kafka broker can store many Terabytes of data. Thus it provides the facility to handle big data which would be impossible in a traditional database.
Thus, Kafka provides a real-time publish-subscribe solution, which overcomes the challenges of real-time data usage for consumption, for data volumes that may grow in order of magnitude, larger that the real data. Kafka also supports parallel data loading in the Hadoop Systems.




Apache Kafka mainly designed with the following characteristics:
Fast - A single Kafka broker can serve thousands of clients by handling megabytes of reads and  writes per second
Scalable -  Data are partitioned and streamlined over a cluster of machines to enable larger data
Durable - Messages are persistent and is replicated within the cluster to prevent data loss
Distributed by Design - It provides fault tolerance guarantees and durability.
Five Components of Apache Kafka
Topic: A topic is a category or feed name to which messages are published by the message producers. Topics are partitioned and each partition is represented by the ordered immutable sequence of messages. Each message in the partition is assigned a unique sequential ID (offset).
Broker: A Kafka cluster consists of servers where each one may have server processes (brokers). Topics are created within the context of broker processes.
Zookeeper: Zookeeper serves as the coordinator between the Kafka broker and consumers.
Producers: Producers publish data to the topics by choosing the appropriate partition within the topic.
Consumers: Consumers are the applications or processes that subscribe to topics and process the feed of published messages.

Zookeeper is an open source, high-performance co-ordination service used for distributed applications adapted by Kafka. It coordinates and synchronizes configuration information of distributed nodes. It is not possible to bye-pass Zookeeper and connects directly to the Kafka broker. Once the Zookeeper is down, it cannot serve client request. Zookeeper is basically used to communicate between different nodes in a cluster. In Kafka, it is used to commit offset, so if node fails in any case it can be retrieved from the previously committed offset. Apart from this it also does other activities like leader detection, distributed synchronization, configuration management, identifies when a new node leaves or joins, the cluster, node status in real time, etc. It is used for managing, coordinating Kafka broker. In Hadoop ecosystem, Zookeeper is also used for cluster management for Hadoop. Thus, we have to say Zookeeper is mainly solving the problem of reliable distributed coordination.

Apache Kafka Installation
Actually, once we install Kafka, we can use the Zookeeper that comes with Kafka bundle.

Download Apache Kafka

Download ApacheKafka tar file
Then Unzip tar file & move to installation location.

$ tar -zxvf  kafka_2.10-0.8.2.0.tgz
$ mv kafka_2.10-0.8.2.0 /usr/local/kafka


Start Zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

By default the Zookeeper server will listen on *:2181/tcp


Configure & Start Kafka Brokers

Here we will create three Kafka brokers whose configurations are based on the default config/server.properties. Apart from the settings below the configurations of the brokers are identical.

Create the config file for the first broker

$ cp config/server.properties config/server1.properties

Edit config/server1.properties and replace the existing configuration to below values as follows

broker.id=1
port=9092
log.dir=/tmp/kafka-logs-1

Create the config file for the Second broker

$ cp config/server.properties config/server2.properties

Edit config/server2.properties and replace the existing configuration to below values as follows

broker.id=2
port=9093
log.dir=/tmp/kafka-logs-2


Create the config file for the Third broker

$ cp config/server.properties config/server3.properties

Edit config/server1.properties and replace the existing configuration to below values as follows

broker.id=3
port=9094
log.dir=/tmp/kafka-logs-3

Now you need to start each Kafka broker in a separate console.

Start the first broker

$ env JMX_PORT=9999 bin/kafka-server-start.sh config/server1.properties

Start the second broker

$ env JMX_PORT=10000 bin/kafka-server-start.sh config/server2.properties

Start the third broker

$ env JMX_PORT=10001 bin/kafka-server-start.sh config/server3.properties


Summary of the configuration

             Broker 1           Broker 2        Broker 3     
------------------------------------------------------
Kafka   *:9092/tcp   *:9093/tcp      *:9094/tcp
JMX     *:9999/tcp   *:10000/tcp   *:10001/tcp


Create a Kafka topic

In Kafka 0.8, there are 2 ways of creating a new topic:
First one is turn on auto.create.topics.enable option on the broker. When the broker receives the first message for a new topic, it creates that topic with num.partitions and default.replication.factor.

Second way for creating Topic is to use the admin command bin/kafka-topics.sh.

Note: In Kafka 0.8.0 release use the admin command bin/kafka-create-topic.sh. kafka-topics.sh was removed in the release version and split into kafka-create-topic.sh and kafka-list-topic.sh.

$ bin/kafka-topics.sh --zookeeper localhost:2181 \
 --create --topic iqubal.kafka.topic --partitions 3 --replication-factor 2

Here Kafka will create three logical partitions and two replicas per partition for the topic. For each partition it will pick two brokers that will host those replicas. For each partition Kafka will elect a “leader” broker.

Note: For Kafka 0.8.0 release you must use the command: 

$ bin/kafka-create-topic.sh –zookeeper localhost:2181 –partition 3 –replica 2 –topic iqubal.kafka.topic

To get a list of topics, we can use "--list -- ..." command

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
Kafkatopic iqubal.kafka.topic

Note: For Kafka 0.8.0 release you must use the command

$ bin/kafka-list-topic.sh –zookeeper localhost:2181 –topic iqubal.kafka.topic

Starts Producer - Sending Messages

The producer client can accept inputs from the command line and publishes them as a message to the Kafka cluster. Each new line entered, by default, is a new message as shown below.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 localhost:9093 localhost:9094 --topic iqubal.kafka.topic

Iqubal Mustafa Kaki’s Blog on Apache Kafka.

Internal Kafka Multiple Broker Cluster

Starts Consumer- Consuming Messages

Consumer client consumes messages, and we'll use the same consumer client.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic iqubal.kafka.topic

And at the end of the output you will see the following messages

Iqubal Mustafa Kaki’s Blog on Apache Kafka.

Internal Kafka Multiple Broker Cluster

Hope you have enjoyed the article.

Author: Iqubal Mustafa Kaki, Technical Specialist.

Want to connect with me
If you want to connect with me, please connect through my email - 
iqubal.kaki@gmail.com

Friday, January 8, 2016

Solr Indexing - MariaDB Table Data into Apache Solr

Solr Indexing - MariaDB Table Data into Apache Solr


In one of my assignments I was asked to indexed MariaDB data inside Apache Solr. That was very interesting to integrate MariaDB to Apache Solr. Here I will explain you step by step to achieve the target.















First of we need to download and install below artifacts.
First we will create database named ContactDB inside MariaDB and inside this Database will create table and then put some raw data to do the same.

MariaDB [(none)]> create database contactdb;

MariaDB [(none)]> use contactdb;

MariaDB [contactdb]> create table EMPLOYEE (first_name varchar (30), last_name     varchar (30), company varchar (30), gender varchar (10), contact varchar (10));

MariaDB [contactdb]> insert into EMPLOYEE(first_name, last_name, company, gender
, contact) values('Iqubal M', 'Kaki', 'TechM', 'M', '839090000');

MariaDB [contactdb]> select * from employee;




















Download Solr from apache SOLR5.0.0 Extracts solr5.0.0.zip folder into your preferred directory. Here I am referring D:/ApacheSolr/solr-5.0.0 as $SOLAR_HOME

Place MariaDB connector mariadb-java-client-1.2.0.jar files into
"$SOLR_HOME/contrib/dataimporthandler/lib". Create sub directory "lib" if it is not present there.










Now start solr by executing below command from command prompt.
>> cd solr5.0.0/bin;
>> solr start

It will start solr on 8983 port, verify by using the URL http://localhost:8983/solr 

Direct your Web browser to http://localhost:8983/solr  to visit the Solr Admin UI.






  1. Then Create Core into SOLR by executing below commands from SOLR_HOME/bin.
# bin/solr create_core -c employees -d basic_configs

  1. Create db-data-config.xml file inside solr5.0.0/server/solr/employees/conf folder.
And now put your MariaDB related information here. From this file solr knows what to show on the search.

<dataConfig>
<dataSource type="JdbcDataSource" driver="org.mariadb.jdbc.Driver" url="jdbc:mysql://localhost:3306/contactdb" user="root" password="root"/>
<document>
            <entity name="id" query="select first_name as 'id' , first_name, last_name, company, gender, contact from EMPLOYEE"/>
</document>
</dataConfig>

  1. Now made below changes into the solr5.0.0/server/solr/employees/conf/solrconfig.xml and save the file.
And add following line within config tag.

<lib dir="D:/ApacheSolr/solr-5.0.0/contrib/dataimporthandler/lib/" regex=".*\.jar" />
<lib dir="D:/ApacheSolr/solr-5.0.0/dist/" regex="solr-dataimporthandler-\d.*\.jar" />
<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">
             <lst name="defaults">
               <str name="config">db-data-config.xml</str>
              </lst>
  </requestHandler>

  1. Now open schema.xml in the same directory and add following lines. Here I am configuring solr what to index from MariaDB table. i.e. solr5.0.0/server/solr/employees/schema.xml
<dynamicField name="*_name" type="text_general" multiValued="false" indexed="true" stored="true" />
<dynamicField name="*company" type="text_general" multiValued="false" indexed="true" stored="true" />
<dynamicField name="*gender" type="text_general" multiValued="false" indexed="true" stored="true" />
<dynamicField name="*contact" type="text_general" multiValued="false" indexed="true" stored="true" />

  1. Now stop and start the solr server.
>> cd solr5.0.0/bin;
>> solr stop -all
>> solr start
Now indexed the MariaDB table data into Apache Solr by hitting the below URL. It will show how many documents indexed. The first URL indexed all of the table data whether there is changes happened on DB or not, however second URL picked only the later Database changes.

http://localhost:8983/solr/employees/dataimport?command=full-import
http://localhost:8983/solr/employees/dataimport?command=delta-import

Solr Full Import




















Solr Delta Import





















Now you can see the results details onto Solr Admin Console


Now you can query on solr by using below URL.

http://localhost:8983/solr/employees/select?q=*&wt=json&qf=first_name%20last_name&defType=edismax&indent=true





















Hope you have enjoyed the article.

Author: Iqubal Mustafa Kaki, Technical Specialist.

Want to connect with me
If you want to connect with me, please connect through my email - 
iqubal.kaki@gmail.com