Local Kafka setup on Mac OS X

Local Kafka set up guide is available at: http://kafka.apache.org/documentation.html#quickstart

However, this out of the box set up on Mac OS X (Yosemite) did not work for me directly.  When trying to publish a message to a newly created topic, it would fail as follows

kafka08.client.ClientUtils$ - Successfully fetched metadata for 1 topic(s) Set(my-topic)
kafka08.producer.BrokerPartitionInfo - Getting broker partition info for topic my-topic
kafka08.producer.BrokerPartitionInfo  - Partition [my-topic,0] has leader 0
kafka08.producer.async.DefaultEventHandler - Broker partitions registered for topic: my-topic are 0
kafka08.producer.async.DefaultEventHandler - Sending 1 messages with compression codec 2 to [my-topic,0]
kafka08.producer.async.DefaultEventHandler - Producer sending messages with correlation id 7 for topics [my-topic,0] to broker 0 on developer-mbp:9092
kafka08.producer.SyncProducer - Connected to developer-mbp:9092 for producing
kafka08.producer.SyncProducer - Disconnecting from developer-mbp:9092
kafka08.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 7 to broker 0 with data for partitions [my-topic,0]
java.nio.channels.ClosedChannelException
	at kafka08.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka08.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
	at kafka08.producer.SyncProducer.kafka08$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer.send(SyncProducer.scala:101)
	at kafka08.producer.async.DefaultEventHandler.kafka08$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)

Banging my head against the wall for a few hours, I couldn’t figure out why if my application  wasn’t able to talk to kafka or kafka wasn’t able to talk to zookeeper. netstat claimed that both kakfa and zookeeper were running fine, listening on the default ports without any errors in the logs.

SOLUTION

It turns out that even if localhost is provided to Kafka client (running with java 8), it tries to resolve the server via my machine name which in this case was developer-mbp:9092. This should not be a problem since my machine should be accessible via the machine name, however, because machine name was changed on my macbook pro, it had an empty /etc/hosts file.

This was the default file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost
255.255.255.255	broadcasthost
::1             localhost

Notice how the machine name is not configured here. Pointing your 127.0.0.1 to your machine name fixed the problem. This was the updated /etc/hosts file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost developer-mbp
255.255.255.255	broadcasthost
::1             localhost

Noe Kakfa client was able to resolve the developer-mbp and publish to my-topic.

Zookeeper Leader election and timeouts

My cluster of 3 nodes running fine for a while till one of the nodes died. This node was the LEADER. I guessed the cluster would still be fine since 2/3 nodes were still healthy. However, it looked like it was unable to elect a leader a set up quorum properly.

Here’s what I was getting:

2014-11-11 12:09:36,101 [myid:1] - WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@89] - Exception when following the leader
java.net.ConnectException: Connection refused
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382)
        at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:365)
        at java.net.Socket.connect(Socket.java:527)
        at org.apache.zookeeper.server.quorum.Learner.connectToLeader(Learner.java:225)
        at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:71)
        at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)

and

2014-11-11 12:09:36,102 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@166] - shutdown called
java.lang.Exception: shutdown Follower
        at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:166)
        at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:790)

There’s a configuration initLimit which defines the amount of time (in ticks) that the initial synchronization phase can take. This value defaults to 10 in zookeeper. However, it turns out that my cluster had enough data to sync in the initial phase, which took longer the initLimit specified. Increasing the initLimit to about 50 fixed the issue. However, I wonder the side effects of a much higher initLimit value on the cluster.

More details after searching the net:

What happened here was that the server that was being elected as leader did go through leader election process successfully. It then started to send a snapshot of the state to its follower, however, before that process could be completed and the follower could finish sync to the leader state, the initLimit timeout was reached, and the leader thread decided it had to give up. So increasing initLimit to a value that allowed the snapshot transfer to complete fixed this problem.

Zookeeper Error Guide : Part 1

The last few weeks with Zookeeper/Curator have been a good experience. I am going to maintain a continuous list of errors that come up with Zookeeper and how I fixed/stepped over them

Running out of connections
WARN [NIOServerCxn.Factory: 0.0.0.0/0.0.0.0:2181:NIOServerCnxn$Factory@352] - Too many connections from /ab.cd.ef.ghi - max is 60

This is indicative of the client running out of connections. In your zookeeper.cfg, set the following

maxClientCnxns = 500
Unable to load database – disk corruption
FATAL Unable to load database on disk !  java.io.IOException: Failed to process transaction type: 2 error: KeeperErrorCode = NoNode for  at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)!

This typically implies either disk corruption on your server or the process was restarted while snapshotting. There are some bugs filed with Zookeeper in related area. The easiest way is to wipe out the version-2/ directory if other nodes in your cluster are running. The node with the error would rebuild itself from the other nodes.

Unable to load database – Unreasonable length
FATAL Unable to load database on disk java.io.IOException: Unreasonable length = 1048583 at org.apache.jute.BinaryInputArchive.readBuffer(BinaryInputArchive.java:100)

Some versions of zookeeper allowed the client to set the data larger than the max readable size by the server. Increasing the max buffer size JVM property fixes the issue.

-Djute.maxbuffer = xxx
Failure to follow the leader
WARN org.apache.zookeeper.server.quorum.Learner: Exception when following the leader java.net.SocketTimeoutException: Read timed out

This is observed under stress on the system. The stress might be caused by either disk contention or network delays etc. If you cannot reduce the load on the system, try increasing your hardware spec. On EC2, I switched over High I/O instances and the response was much better.