Power of Apache Kafka for Bigdata

Power of Apache Kafka for Bigdata

Kafka is a powerful tool for Bigdata, which has elevated the processing speed to a near realtime level. That means tons of giga bytes of data can be processed on fly and in notime. This feature of Kafka made it as a leader in Bigdata ecosystem. Lets see “Power of Apache Kafka for Bigdata”

Apache Kafka is a popular distributed message broker for designing and handling large volumes of real-time data . A Kafka cluster has a much higher throughput compared to other message brokers . Though it is generally used as a pub/sub messaging system, a lot of organizations also use it for log aggregation because it offers persistent storage for published messages.

Power of Apache Kafka for BigdataKafka provides both pub-sub and queue based messaging system in a fast, reliable, persisted, fault-tolerance and zero downtime manner. In both cases, producers simply send the message to a topic and consumer can choose any one type of messaging system depending on their need.

Working of Pub-Sub Messaging

Following is the step wise workIng of the Pub-Sub Messaging –

  1. Producers send message to a topic at regular intervals.
  2. Kafka broker stores all messages in the partitions configured for that particular topic to ensure the messages are equally shared between partitions.
  3. Consumer subscribes to a specific topic.
  4. Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
  5. Consumer will request the Kafka in a regular interval for new messages.
  6. Once Kafka receives the messages from producers, it forwards these messages to the consumers.
  7. Consumer will receive the message and process it.
  8. Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
  9. Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper.
  10. This above flow will repeat until the consumer stops the request.
  11. Consumer has the option to rewind/skip to the desired offset of a topic.

 

Apache Kafka Installation

Let us continue with the following steps to install Kafka on your machine.

– Download Kafka

To install Kafka on your machine, click on the below link −

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

– Extract the tar file

Extract the tar file using the following command −

$ cd opt/

$ tar -zxf kafka_2.11.0.9.0.0 tar.gz

$ cd kafka_2.11.0.9.0.0

– Start Server

You can start the server by giving the following command −

$ bin/kafka-server-start.sh config/server.properties

After the server starts, you would see the below response on your screen −

$ bin/kafka-server-start.sh config/server.properties

[2016-01-02 15:37:30,410] INFO KafkaConfig values:

request.timeout.ms = 30000

log.roll.hours = 168

inter.broker.protocol.version = 0.9.0.X

log.preallocate = false

security.inter.broker.protocol = PLAINTEXT

…………………………………………….

…………………………………………….

– Stop the Server

After performing all the operations, you can stop the server using the following command −

$ bin/kafka-server-stop.sh config/server.properties

Producer and consumer – Java implementation

Connecting and authenticating

To connect to Message Hub. the Kafka API uses the kafka_brokers_sasl credentials, and the user and password from the VCAP_SERVICES environment variable.

Therefore you need to set the sasl.jaas.config property (0.10.21 clients or higher). For example:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required

username=”USERNAME”

password=”PASSWORD”;

This configuration will be used by the producer and consumer.

It’s time for real coding,

Writing producers

Producer properties configuration

To start working with producers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaProducer. These properties must be changed based on user needs.

props = new Properties();

props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

props.put("bootstrap.servers","kafka05-broker.us-south.bluemix.net:9094");

props.put("acks","all");

props.put("block.on.buffer.full","true");

props.put("batch.size","1");

props.put("security.protocol","SASL_SSL");

props.put("ssl.protocol","TLSv1.2");

props.put("ssl.enabled.protocols","TLSv1.2");

props.put("ssl.truststore.location","/usr/lib/j2re1.7-ibm/jre/lib/security/cacerts");

props.put("ssl.truststore.password","password");

props.put("ssl.truststore.type","JKS");

props.put("ssl.endpoint.identification.algorithm","HTTPS");

 

Producer Java API

 

It’s important to define whether the producer is going to publish into a topic (no matter what the partition is) or a specific topic/partition. For instance, using the first option, see the code implementation below:

Publish to a specific topic

Pass the topic name and message that will be published as a parameter of ProducerRecord:

ProducerRecord producerRecord = new ProducerRecord(

topic,message.getBytes("UTF-8"));

Then send to the broker:

kafkaProducer.send(producerRecord)

Putting it all together:

private void publishToTopic(){

String topic="mytopic13";

String message="publish message to a topic";

try {

KafkaProducer kafkaProducer;

kafkaProducer = new KafkaProducer(props);

ProducerRecord producerRecord = new ProducerRecord(topic,message.getBytes("UTF-8"));

RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();

//getting RecordMetadata is possible to validate topic, partition and offset

System.out.println("topic where message is published : " + recordMetadata.topic());

System.out.println("partition where message is published : " + recordMetadata.partition());

System.out.println("message offset # : " + recordMetadata.offset());

kafkaProducer.close();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

Publish to an specific topic/partition

Pass the topic name, partition, message key and message value that will be published as a parameter of ProducerRecord:

ProducerRecord producerRecord = new ProducerRecord(

topic,partition,key.getBytes(“UTF-8”),message.getBytes(“UTF-8”));

Note that the difference just resides in arguments added to ProducerRecord.

For more information about the producer API, please click here

Writing consumers

Consumer properties configuration

In order to start working with consumers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaConsumer. These properties must be changed based on user needs.

props = new Properties();

props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

props.put("bootstrap.servers","kafka05-broker.us-south.bluemix.net:9094");

props.put("group.id","test");

props.put("enable.auto.commit","false");

props.put("auto.offset.reset","earliest");

props.put("auto.commit.interval.ms","1000");

props.put("security.protocol","SASL_SSL");

props.put("ssl.protocol","TLSv1.2");

props.put("ssl.enabled.protocols","TLSv1.2");

props.put("ssl.truststore.location","/usr/lib/j2re1.7-ibm/jre/lib/security/cacerts");

props.put("ssl.truststore.password","password");

props.put("ssl.truststore.type","JKS");

props.put("ssl.endpoint.identification.algorithm","HTTPS")

Consumer Java API

 

 

Subscribe or assign

It’s so important to define which method will be used for message consumption. Below is a summary about the remarkable points of each one of them and when to use it:

Subscribe

  • Use when you want to consume all messages from the desired topic
  • This is automatically consumed from a topic, no matter which partitions the messages are in.
  • After subscribing, a re-balancing (re-assigning partition to a member) occurs. A consumer will coordinate with a group in order to assign partitions, this happens automatically when data consuming starts.
  • Define a List of strings of topics and then KafkaConsumer will subscribeto that list.

Assign

You want to define the topic/partition where messages will be consumed from.

This could be considered as a manual consumption as the partition is programmatically defined for the desired topic.

Define a List of TopicPartition and then KafkaConsumer will assign to that list.

It is not possible to subscribe and assign at the same time using the same consumer instance.

Consume from topic

Subscribing

In this code example kafkaConsumer will subscribe to a topic as shown below:

kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());

String topic="mytopic13";

List topicList = new ArrayList();

topicList.add(topic);

kafkaConsumer.subscribe(topicList);

Assigning

kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());

String topic="mytopic13";

List topicPartitionList = new ArrayList();

int partition=5;

topicPartitionList.add(new TopicPartition(topic,partition));

kafkaConsumer.assign(topicPartitionList);

Retrieve messages from topic

In order to retrieve messages, kafkaConsumer will use a “poll” method which fetches data from the topics or partitions specified, using one of the subscribed/assign APIs. This will return a ConsumerRecord if iterated. Here is a code snippet:

Iterator<consumerrecord> it = kafkaConsumer.poll(1000).iterator();

while(it.hasNext()){

ConsumerRecord record = it.next();

final String message = new String(record.value(),Charset.forName("UTF-8"));

if(record.key() != null){

final String key = new String(record.key(),Charset.forName("UTF-8"));

System.out.println("messageKey : " + key);

}

System.out.println("topic : " + record.topic());

System.out.println("partition : " + record.partition());

System.out.println("offset : " + record.offset());

System.out.println("value : " + message);

}

 

Committing message

There are two possible ways to mark all received messages as committed.

Use kafkaConsumer API, after all messages have been iterated use “commitSync”. This ensures commit offset even if the consumer crashes after its call. If you decide to use this option then disable enable.auto.commit setting to false. This option is useful for handling commits manually.

kafkaConsumer.commitSync();

Set “enable.auto.commit=true” and set time to “auto.commit.interval.ms=1000” to regulate frequency. Use when consumer’s offset will be committed in the background periodically.

Note: there is a risk using the second option: if the consumer crashes before committing offset for messages already processed, then a new consumer will end up repeating the process, which means messages will be retrieved twice.

Close consumer

When all messages have been processed and you want to end the consumer’s instance then you need to close as follows:

kafkaConsumer.close();

Putting it all together:

public void run(){

kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());

String topic="mytopic13";

if(isSubscribe){

System.out.println("KafkaConsumer will subscribe");

List topicList = new ArrayList();

topicList.add(topic);

kafkaConsumer.subscribe(topicList);

}else{

System.out.println("KafkaConsumer will assign");

List topicPartitionList = new ArrayList();

int partition=5;

topicPartitionList.add(new TopicPartition(topic,partition));

kafkaConsumer.assign(topicPartitionList);

}

while(!closing){

try{

Iterator<consumerrecord> it = kafkaConsumer.poll(1000).iterator();

while(it.hasNext()){

ConsumerRecord record = it.next();

final String message = new String(record.value(),Charset.forName("UTF-8"));

if(record.key() != null){

final String key = new String(record.key(),Charset.forName("UTF-8"));

System.out.println("messageKey : " + key);

}

System.out.println("topic : " + record.topic());

System.out.println("partition : " + record.partition());

System.out.println("offset : " + record.offset());

System.out.println("value : " + message);

}

kafkaConsumer.commitSync();

} catch (final Exception e) {

System.out.println("Consumer loop has been unexpectedly interrupted");

shutdown();

}

}

kafkaConsumer.close();

}

 

At this point you should be able to write your own producers and consumers, to set the minimum configuration required to start publishing and reading messages from the broker.

In this article we have seen very basics of kafka which is gathered from several sources.

But soon, we will have one more article which will be purely technical and Kafka realtime project will be explained there. Please keep track of this as we are going to be very seriously concentrate on code in upcoming days after one round of theory and basics is completed.

[contact-form-7 id=”1013″ title=”Contact Form 1″]

 

Advertisements

2 comments

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s