Basic setup of a Multi Node Apache Kafka/Zookeeper Cluster

Prerequesites

Install three nodes with CentOS 7 with at least 20GB Disk, 2 GB RAM and two CPU Cores.

Install JDK

yum install -y java-1.8.0-openjdkl java-1.8.0-openjdk-devel net-tools

Set JAVA_HOME in ~/.bashrc

# Set Java-Home
export JAVA_HOME="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-5.b12.el7_4.x86_64/jre"
export PATH=$JAVA_HOME/bin:$PATH

Disable SELinux, Firewall and IPv6

systemctl disable firewalld 
systemctl stop firewalld 
echo "net.ipv6.conf.all.disable_ipv6 = 1" >> /etc/sysctl.conf 

[root@kafka3 ~]# cat /etc/selinux/config | grep "^SELINUX=" SELINUX=permissive

Reboot Server

Installing Kafka

Download Kafka and unpack it under /opt

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz

tar zxvf kafka_2.11-0.11.0.2.tgz

Starting Zookeeper

On each node create a zookeeper directory and a file ‘myid’ with a unique number:

mkdir /zookeeper
echo '1' > /zookeeper/myid

On all three Server go to Kafka home folder /opt/kafka_2.11-0.11.0.1 and setup zookeeper like this

vi config/zookeeper.properties

# the directory where the snapshot is stored.
dataDir=/zookeeper
# the port at which the clients will connect
clientPort=2181
clientPortAddress=192.168.2.56
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial synchronization phase can take
initLimit=10

# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

# zoo servers
server.1=kafka1.fritz.box:2888:3888
server.2=kafka2.fritz.box:2888:3888
server.3=kafka3.fritz.box:2888:3888
#add here more servers if you want

Start Zookeeper on all three servers

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Change the Kafka server.properties on all three servers (set a unique broker id on each server)

vi config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=kafka1.fritz.box:2181,kafka2.fritz.box:2181,kafka3.fritz.box

Start Kafka on all three nodes:

./bin/kafka-server-start.sh -daemon config/server.properties

Verify kafka and zookeper are running:

jps
4150 Jps
2365 QuorumPeerMain
1743 Kafka

Verify also all brokers are registered to zookeeper:

# ./bin/zookeeper-shell.sh kafka1:2181 ls /brokers/ids
Connecting to kafka1:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[1, 2, 3]

Create a example Topic with three partitions and replicationfactor 3

# ./bin/kafka-topics.sh --create --zookeeper kafka1:2181 --topic example-topic --partitions 3 --replication-factor 3
Created topic "example-topic".

# ./bin/kafka-topics.sh --list --zookeeper kafka1:2181 --topic example-topic
example-topic

# ./bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic example-topic
Topic:example-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: example-topic    Partition: 0    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: example-topic    Partition: 1    Leader: 3   Replicas: 3,1,2 Isr: 3,2,1
    Topic: example-topic    Partition: 2    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3

Test the Topic

Start a Producer on one node:

# ./bin/kafka-console-producer.sh --broker-list kafka1:9093,kafka2:9093,kafka3:9093 --topic example-topic

Start also a Consumer on a different node:

# ./bin/kafka-console-consumer.sh --zookeeper kafka1:2181 --topic example-topic --from-beginning

Write some text in the producer console. You should then see the Text on the Consumer Console.

Stop a node and write again some messages in the producer console to verify the high availability is working.