Kafka Streams

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.

Originally designed as an integral part of LinkedIn’s data platform, Kafka is used in a variety of mission-critical deployments across multiple industries and organizations. Confluent has formed around Apache™ Kafka to accelerate the adoption of Apache Kafka. Confluent provides commercial subscriptions for Confluent Platform which includes enterprise features such as security, 24×7 monitoring, guaranteed uptime, and certified production readiness. The Confluent Platform enhances Apache Kafka by adding operational capabilities for deployment and operations teams responsible for maintaining high-volume transaction systems. Conflux delivers many additional features on top of the open source version including: geospatial replication, Kafka Connect (a high-performance service for efficiently loading data between Apache Kafka and other systems), Kafka Streams (a distributed stream processing system with SQL support), multitenant clusters, security using TLS/SSL and Kerberos integration.

Kafka provides a unified, high-throughput, low-latency platform for handling real-time data feeds. It is well suited for many mission critical uses including online advertising, financial services, telecommunication services etc.

Databases are good at updating records in place; that is why they do not scale well if you need to update all the users when something happens on just one user’s profile page or you have to aggregate data from different sources before updating the master database.

Step-by-Step Guide on Batch Processing with Apache Kafka

1. Download Kafka

Download the correct version of Apache Kafka for your operating system from here:


Here I am using Confluent Platform which ships with both the source code and binaries, so that is what I am installing. To run Kafka or Confluent Platform on Linux you will need to install Java JDK as well as a supported enterprise messaging broker such as JMS (Java Message Service) or AMQP (Advanced Message Queueing Protocol). It is possible to just use the broker without any message processors by setting up embedded brokers, but it is often more secure and easier to configure via standard protocols like AMQP than simply using another language binding directly into the broker.

Either download and install the binary tarball ( 32-bit or 64-bit ) and unpack it in an empty directory, or clone the Kafka git repository:  https://github.com/apache/kafka $ git clone https://github.com/apache/kafka Install this version of Apache Kafka on a machine which you can run multiple java processes with different user accounts. If running it in production, use a separate machine for this if possible to avoid accidental CPU consumption by brokers when testing things out in development or staging environments. $ cd kafka_2.10-0.9.0 $ bin/kafka-installer –zookeeper localhost:2181 \ –topic test \ –broker-list localhost:9092 \ –replication-factor 3

Or download the tarball from


Unpack it in an empty directory, then run bin/kafka-server-start to start Kafka or just move everything into your existing installation of Apache Kafka with no data loss and run bin/kafka-server-start . Make sure you have at least 2GB RAM available on your machine before using Kafka with message processing enabled (that is by setting “processor.class” to “kafka.utils.DefaultMessageProcessor”).

If you are using Confluent Platform, simply follow the instructions here: https://www.confluent.io/download-linux-package/

If you are not using Confluent Platform, start by installing Zookeeper https://zookeeper.apache.org/doc/r3.4.5/zookeeperAdmin.html

Then download Kafka for your operating system from http://kafka.apache.org/downloads . Make sure that java is available in your PATH before starting up Zookeeper and Kafka!

The exact procedure depends on your platform, but here are some guidelines – check out the respective documentation if unsure:

2. Start Kafka

Note: If you are using Confluent Platform, skip this section and proceed to the next one!

$ cd kafka_2.10-0.9.0 $ bin/kafka-server-start . { “metadata”: { “name”: “Kafka” }, “spec”: { “brokers”: 3 } }

Start Zookeeper in a separate terminal window: $ bin/zookeeper-server-start /tmp/myid Use netcat ( nc ) to verify that everything is working: $ nc localhost 2181 [2017-08-19 13:50:31,909] INFO Created /Users/rabbitmqadmin/.kafka

Note: If you are using Confluent Platform, skip this section and proceed to the next one!

$ cd kafka_2.10-0.9.0 $ bin/kafka-server-start . { “metadata”: { “name”: “Kafka” }, “spec”: { “brokers”: 3 } }

3. Create a Kafka Topic

Create a directory for your topic data to live in: $ mkdir -p var/lib/kafka Create a new Kafka topic with replication factor of 2 (this comes from the command line arguments for –replication-factor when installing Apache Kafka) by sending a command to localhost:9092 on the terminal where you started Kafka. Note that you can’t create topics with existing data, this is because it needs to be replicated across multiple brokers before the topic becomes available. If you’re running an embedded broker then use its port 5672 instead of 9092. Default number of partitions is 1, but remember that replication factor * number of partitions will equal the total number of replicas for each key-value pair! Use more partitions if your application requires greater durability/availability or if you are using an SSD storage media since every replica is written at least twice (once on each server) which might shorten lifetime of flash memory devices after many read/write cycles.

4. Start Batch Job

Note: If you are using Apache Kafka, skip this section and proceed to the next one!


https://github.com/bobrik/kafka-postgres-poc into /tmp directory

Unzip it with tar -xvzf kafka-postgres-poc*.tar.gz Start PostgreSQL server with psql postgres Let Bobrik know if this also works for you. 🙂

5. Read Messages from Kafka Topic

Ensure that the topic is created before doing any processing by looking up metadata about existing topics before continuing further: $ bin/kafka-topics –list To read all messages from a single partition use “bin\kafka-console-consumer.sh –zookeeper localhost:2181 –topic TOPIC_NAME”. Replace the topic name with whatever you named it as when creating a new topic. By default, it will read the latest message available and print its contents to STDOUT – but we can also specify what message we’d like to read by passing “–offset” and an integer value representing how many messages before this one we want to start at (e.g., offset = 0 and limit=10 means you’d like to see all 10 most recent messages).

PostgreSQL Table Setup To store Kafka topics into postgres database use following SQL code.

INSERT INTO kafka_tables (id, topic, offset, type, partition_number, timestamp, message) VALUES (1, ‘test_topic’, 0, ‘f8a8ce64-e7ea-4ef8-8fd3-b7d5cd2cabdd’, 0, 1375016504.679613 , ‘{“partition”:0,”maxMessagesPerPartition”:”1″,”minNumberToLive”:1,”replicationFactor”:”3″}’);

You can also pass some extra arguments as described in official documentation: https://kafka.apache.org/documentation/

Read messages from topic and store them into PostgreSQL table: $ bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test_topic –force –from-beginning USE test_topic; SELECT * FROM kafka_tables WHERE id = 1 ;

The table schema can be seen as all messages stored into Kafka topics are stored into `kafka_tables` table, partition number and topic name is stored as type. It also contains offset to know where read from and timestamp to decide which message we’re reading. PostgreSQL as store for your Apache Kafka data will increase speed of access and make it more reliable since you can easily backup data or replicate between different hosts using pg replication features. Another advantage of storing it in postgres db is that you can join your messages with other tables, which you cannot do with messages stored in Kafka directly. If you want to replicate data from Apache Kafka topic to RDBMS (such as PostgreSQL) and keep them updated make sure that all messages before replication should be read otherwise it could fail because of missing messages! From my experience, this solution always works:

SELECT * INTO out_table FROM kafka_tables WHERE id = last_value(id) + 1 ORDER BY timestamp;

The previous command will put the latest message into the table and all previous messages will be updated. To run script every time when new message is available update crontab by following command: $ crontab -e Then add following line at the end of the file: */5 * * * * /usr/bin/python /home/azureuser/kafka_insert_to_postgres.py

Now you can start using Apache Kafka for data that requires low latency and high throughput! The example written in Python but it is very easy to rewrite it into any other programming language (e.g. Java, Scala) as well as to use Kafka Connect feature which allows to import data from external systems and export them into Kafka topics with resolutions less than a second even if those systems cannot be modified. There are also many other features available such as implementing custom logic during import or export procedures which makes this framework perfect for complex ET process needs too!

By Anurag Rathod

Anurag Rathod is an Editor of Appclonescript.com, who is passionate for app-based startup solutions and on-demand business ideas. He believes in spreading tech trends. He is an avid reader and loves thinking out of the box to promote new technologies.