Local Kafka setup on Mac OS X

Local Kafka set up guide is available at: http://kafka.apache.org/documentation.html#quickstart

However, this out of the box set up on Mac OS X (Yosemite) did not work for me directly.  When trying to publish a message to a newly created topic, it would fail as follows

kafka08.client.ClientUtils$ - Successfully fetched metadata for 1 topic(s) Set(my-topic)
kafka08.producer.BrokerPartitionInfo - Getting broker partition info for topic my-topic
kafka08.producer.BrokerPartitionInfo  - Partition [my-topic,0] has leader 0
kafka08.producer.async.DefaultEventHandler - Broker partitions registered for topic: my-topic are 0
kafka08.producer.async.DefaultEventHandler - Sending 1 messages with compression codec 2 to [my-topic,0]
kafka08.producer.async.DefaultEventHandler - Producer sending messages with correlation id 7 for topics [my-topic,0] to broker 0 on developer-mbp:9092
kafka08.producer.SyncProducer - Connected to developer-mbp:9092 for producing
kafka08.producer.SyncProducer - Disconnecting from developer-mbp:9092
kafka08.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 7 to broker 0 with data for partitions [my-topic,0]
java.nio.channels.ClosedChannelException
	at kafka08.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka08.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
	at kafka08.producer.SyncProducer.kafka08$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer.send(SyncProducer.scala:101)
	at kafka08.producer.async.DefaultEventHandler.kafka08$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)

Banging my head against the wall for a few hours, I couldn’t figure out why if my application  wasn’t able to talk to kafka or kafka wasn’t able to talk to zookeeper. netstat claimed that both kakfa and zookeeper were running fine, listening on the default ports without any errors in the logs.

SOLUTION

It turns out that even if localhost is provided to Kafka client (running with java 8), it tries to resolve the server via my machine name which in this case was developer-mbp:9092. This should not be a problem since my machine should be accessible via the machine name, however, because machine name was changed on my macbook pro, it had an empty /etc/hosts file.

This was the default file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost
255.255.255.255	broadcasthost
::1             localhost

Notice how the machine name is not configured here. Pointing your 127.0.0.1 to your machine name fixed the problem. This was the updated /etc/hosts file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost developer-mbp
255.255.255.255	broadcasthost
::1             localhost

Noe Kakfa client was able to resolve the developer-mbp and publish to my-topic.

Jersey Filters – ContainerRequestFilter and ContainerResponseFilter

Jersey Filters allow a certain functionality to the performed on every request/response. They are typically used to modify request or response parameters like headers. Jersey user guide provides a good description of what filters can do. This blog however focusses on how to set up filters in Jersey based on different jersey versions

Jersey 1.x

The core of setting up is configuring the appropriate init parameter. In Jersey 1.x, these were

setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, RequestResponseLoggingFilter.class.getName());
setInitParameter(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, RequestResponseLoggingFilter.class.getName());

where the parameter keys were

public static final String PROPERTY_CONTAINER_REQUEST_FILTERS =  "com.sun.jersey.spi.container.ContainerRequestFilters";
public static final String PROPERTY_CONTAINER_RESPONSE_FILTERS =  "com.sun.jersey.spi.container.ContainerResponseFilters";

These parameters can also be set using web.xml

<servlet>  
    <servlet-name>Jersey REST Service</servlet-name>  
    <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>  
    <init-param>
        <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
        <param-value>com.company.org.jersey.filters.RequestResponseLogginFilter</param-value>
    </init-param>
</servlet>

Jersey 2.x

In Jersey 2.x the parameters changed to the following

setInitParameter("javax.ws.rs.container.ContainerRequestFilter", RequestResponseLoggingFilter.class.getName());
setInitParameter( "javax.ws.rs.container.ContainerResponseFilter", RequestResponseLoggingFilter.class.getName());