Kafka is a powerful tool for Bigdata, which has elevated the processing speed to a near realtime level. That means tons of giga bytes of data can be processed on fly and in notime. This feature of Kafka made it as a leader in Bigdata ecosystem. Lets see “Power of Apache Kafka for Bigdata”
Apache Kafka is a popular distributed message broker for designing and handling large volumes of real-time data . A Kafka cluster has a much higher throughput compared to other message brokers . Though it is generally used as a pub/sub messaging system, a lot of organizations also use it for log aggregation because it offers persistent storage for published messages.
Kafka provides both pub-sub and queue based messaging system in a fast, reliable, persisted, fault-tolerance and zero downtime manner. In both cases, producers simply send the message to a topic and consumer can choose any one type of messaging system depending on their need.
Working of Pub-Sub Messaging
Following is the step wise workIng of the Pub-Sub Messaging –
- Producers send message to a topic at regular intervals.
- Kafka broker stores all messages in the partitions configured for that particular topic to ensure the messages are equally shared between partitions.
- Consumer subscribes to a specific topic.
- Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
- Consumer will request the Kafka in a regular interval for new messages.
- Once Kafka receives the messages from producers, it forwards these messages to the consumers.
- Consumer will receive the message and process it.
- Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
- Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper.
- This above flow will repeat until the consumer stops the request.
- Consumer has the option to rewind/skip to the desired offset of a topic.
Apache Kafka Installation
Let us continue with the following steps to install Kafka on your machine.
– Download Kafka
To install Kafka on your machine, click on the below link −
– Extract the tar file
Extract the tar file using the following command −
– Start Server
You can start the server by giving the following command −
After the server starts, you would see the below response on your screen −
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
– Stop the Server
After performing all the operations, you can stop the server using the following command −
Producer and consumer – Java implementation
Connecting and authenticating
To connect to Message Hub. the Kafka API uses the kafka_brokers_sasl credentials, and the user and password from the VCAP_SERVICES environment variable.
Therefore you need to set the sasl.jaas.config property (0.10.21 clients or higher). For example:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
This configuration will be used by the producer and consumer.
It’s time for real coding,
Producer properties configuration
To start working with producers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaProducer. These properties must be changed based on user needs.
Producer Java API
It’s important to define whether the producer is going to publish into a topic (no matter what the partition is) or a specific topic/partition. For instance, using the first option, see the code implementation below:
Publish to a specific topic
Pass the topic name and message that will be published as a parameter of ProducerRecord:
Publish to an specific topic/partition
Pass the topic name, partition, message key and message value that will be published as a parameter of ProducerRecord:
ProducerRecord producerRecord = new ProducerRecord(
Note that the difference just resides in arguments added to ProducerRecord.
For more information about the producer API, please click here
Consumer properties configuration
In order to start working with consumers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaConsumer. These properties must be changed based on user needs.
Consumer Java API
Subscribe or assign
It’s so important to define which method will be used for message consumption. Below is a summary about the remarkable points of each one of them and when to use it:
- Use when you want to consume all messages from the desired topic
- This is automatically consumed from a topic, no matter which partitions the messages are in.
- After subscribing, a re-balancing (re-assigning partition to a member) occurs. A consumer will coordinate with a group in order to assign partitions, this happens automatically when data consuming starts.
- Define a List of strings of topics and then KafkaConsumer will subscribeto that list.
You want to define the topic/partition where messages will be consumed from.
This could be considered as a manual consumption as the partition is programmatically defined for the desired topic.
Define a List of TopicPartition and then KafkaConsumer will assign to that list.
It is not possible to subscribe and assign at the same time using the same consumer instance.
Consume from topic
In this code example kafkaConsumer will subscribe to a topic as shown below:
Retrieve messages from topic
In order to retrieve messages, kafkaConsumer will use a “poll” method which fetches data from the topics or partitions specified, using one of the subscribed/assign APIs. This will return a ConsumerRecord if iterated. Here is a code snippet:
There are two possible ways to mark all received messages as committed.
Use kafkaConsumer API, after all messages have been iterated use “commitSync”. This ensures commit offset even if the consumer crashes after its call. If you decide to use this option then disable enable.auto.commit setting to false. This option is useful for handling commits manually.
Set “enable.auto.commit=true” and set time to “auto.commit.interval.ms=1000” to regulate frequency. Use when consumer’s offset will be committed in the background periodically.
Note: there is a risk using the second option: if the consumer crashes before committing offset for messages already processed, then a new consumer will end up repeating the process, which means messages will be retrieved twice.
When all messages have been processed and you want to end the consumer’s instance then you need to close as follows:
At this point you should be able to write your own producers and consumers, to set the minimum configuration required to start publishing and reading messages from the broker.
In this article we have seen very basics of kafka which is gathered from several sources.
But soon, we will have one more article which will be purely technical and Kafka realtime project will be explained there. Please keep track of this as we are going to be very seriously concentrate on code in upcoming days after one round of theory and basics is completed.