Local Kafka setup on Mac OS X

Local Kafka set up guide is available at: http://kafka.apache.org/documentation.html#quickstart

However, this out of the box set up on Mac OS X (Yosemite) did not work for me directly.  When trying to publish a message to a newly created topic, it would fail as follows

kafka08.client.ClientUtils$ - Successfully fetched metadata for 1 topic(s) Set(my-topic)
kafka08.producer.BrokerPartitionInfo - Getting broker partition info for topic my-topic
kafka08.producer.BrokerPartitionInfo  - Partition [my-topic,0] has leader 0
kafka08.producer.async.DefaultEventHandler - Broker partitions registered for topic: my-topic are 0
kafka08.producer.async.DefaultEventHandler - Sending 1 messages with compression codec 2 to [my-topic,0]
kafka08.producer.async.DefaultEventHandler - Producer sending messages with correlation id 7 for topics [my-topic,0] to broker 0 on developer-mbp:9092
kafka08.producer.SyncProducer - Connected to developer-mbp:9092 for producing
kafka08.producer.SyncProducer - Disconnecting from developer-mbp:9092
kafka08.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 7 to broker 0 with data for partitions [my-topic,0]
java.nio.channels.ClosedChannelException
	at kafka08.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka08.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
	at kafka08.producer.SyncProducer.kafka08$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka08.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka08.producer.SyncProducer.send(SyncProducer.scala:101)
	at kafka08.producer.async.DefaultEventHandler.kafka08$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka08.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)

Banging my head against the wall for a few hours, I couldn’t figure out why if my application  wasn’t able to talk to kafka or kafka wasn’t able to talk to zookeeper. netstat claimed that both kakfa and zookeeper were running fine, listening on the default ports without any errors in the logs.

SOLUTION

It turns out that even if localhost is provided to Kafka client (running with java 8), it tries to resolve the server via my machine name which in this case was developer-mbp:9092. This should not be a problem since my machine should be accessible via the machine name, however, because machine name was changed on my macbook pro, it had an empty /etc/hosts file.

This was the default file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost
255.255.255.255	broadcasthost
::1             localhost

Notice how the machine name is not configured here. Pointing your 127.0.0.1 to your machine name fixed the problem. This was the updated /etc/hosts file

##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost developer-mbp
255.255.255.255	broadcasthost
::1             localhost

Noe Kakfa client was able to resolve the developer-mbp and publish to my-topic.

Using Apache HTTPClient 4.x for MultiPart uploads with Jersey 1.x Server

You can easily find a lot of articles on the web describing the process to use Jersey client with a Jersey 1.x Server to do multi-part uploads. However, when trying to use Apache HTTP client, it uncovers a bug in jersey causing a NullPointerException – https://java.net/jira/browse/JERSEY-1658

SEVERE: The RuntimeException could not be mapped to a response, re-throwing to the HTTP container
java.lang.NullPointerException
    at     com.sun.jersey.multipart.impl.MultiPartReaderClientSide.unquoteMediaTypeParameters(MultiPartReaderClientSide.java:227)
    at com.sun.jersey.multipart.impl.MultiPartReaderClientSide.readMultiPart(MultiPartReaderClientSide.java:154)
    at com.sun.jersey.multipart.impl.MultiPartReaderServerSide.readMultiPart(MultiPartReaderServerSide.java:80)
    at com.sun.jersey.multipart.impl.MultiPartReaderClientSide.readFrom(MultiPartReaderClientSide.java:144)
    at com.sun.jersey.multipart.impl.MultiPartReaderClientSide.readFrom(MultiPartReaderClientSide.java:82)
    at com.sun.jersey.spi.container.ContainerRequest.getEntity(ContainerRequest.java:488)
    at com.sun.jersey.server.impl.model.method.dispatch.EntityParamDispatchProvider$EntityInjectable.getValue(EntityParamDispatchProvider.java:123)
    at com.sun.jersey.server.impl.inject.InjectableValuesProvider.getInjectableValues(InjectableValuesProvider.java:46)
    at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$EntityParamInInvoker.getParams(AbstractResourceMethodDispatchProvider.java:153)
    at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:203)
    at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)

Here’s the relevant piece of code from Jersey

protected static MediaType unquoteMediaTypeParameters(final MediaType mediaType, final String... parameters) {
235        if (parameters == null || parameters.length == 0) {
236            return mediaType;
237        }
238
239        final HashMap unquotedParams = new HashMap(mediaType.getParameters());
240
241        for (final String parameterName : parameters) {
242            String parameterValue = mediaType.getParameters().get(parameterName);
243
244            if (parameterValue.startsWith("\"")) {
245                parameterValue = parameterValue.substring(1, parameterValue.length() - 1);
246                unquotedParams.put(parameterName, parameterValue);
247            }
248        }
249
250        return new MediaType(mediaType.getType(), mediaType.getSubtype(), unquotedParams);
251    }

The error occurs because Jersey Server expects the boundary parameter be set as a part of the content-type header, which is not being set by Apache HTTP Client. It can be verified by looking at the request made by Jersey client vs Apache client

Jersey Client

Content-Type=multipart/form-data;boundary=Boundary-1234567890

Apache HTTP Client

Content-Type=multipart/form-data

And since the boundary parameter is missing, it ends up throwing a NPE.

SOLUTION

I was able manually hack in the boundary parameter into the Content-Type header of the request making it available for Jersey parser and thus avoiding the NPE. The issue with this fix however is that the class MultipartFormEntity is package private and therefore, the utility class described below needs to be created in the package org.apache.http.entity.mime

package org.apache.http.entity.mime;

import org.apache.commons.lang3.Validate;
import org.apache.http.HttpEntity;

public class MultiPartEntityUtil {
	
	public static String getBoundaryValue(HttpEntity entity) {
		Validate.notNull(entity);
		
		if( entity instanceof MultipartFormEntity ) {
			MultipartFormEntity formEntity = (MultipartFormEntity)entity;

			AbstractMultipartForm form =  formEntity.getMultipart();
			Validate.notNull(form);
			
			return form.getBoundary();
		}
		
		throw new IllegalArgumentException("Provided entity is of type: " + entity.getClass() + " instead of expected: MultipartFormEntity");
	}

}

With this utility class, we can simply set the Content-Type header as follows

 MultipartEntityBuilder builder = MultipartEntityBuilder.create();
 builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);

for (File file : files) {
    builder.addBinaryBody(file.getName(), file, ContentType.DEFAULT_BINARY, file.getName());
}

HttpEntity entity = builder.build();
String boundary= MultiPartEntityUtil.getBoundaryValue(entity);

...

request.addHeader(HttpHeaders.CONTENT_TYPE, "multipart/form-data;boundary="+boundary);

This hack makes sure that Jersey server finds the appropriate boundary parameter. Now you can successfully do multipart uploads with Apache client on Jersey 1.x

Zookeeper Leader election and timeouts

My cluster of 3 nodes running fine for a while till one of the nodes died. This node was the LEADER. I guessed the cluster would still be fine since 2/3 nodes were still healthy. However, it looked like it was unable to elect a leader a set up quorum properly.

Here’s what I was getting:

2014-11-11 12:09:36,101 [myid:1] - WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@89] - Exception when following the leader
java.net.ConnectException: Connection refused
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382)
        at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:365)
        at java.net.Socket.connect(Socket.java:527)
        at org.apache.zookeeper.server.quorum.Learner.connectToLeader(Learner.java:225)
        at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:71)
        at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)

and

2014-11-11 12:09:36,102 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@166] - shutdown called
java.lang.Exception: shutdown Follower
        at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:166)
        at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:790)

There’s a configuration initLimit which defines the amount of time (in ticks) that the initial synchronization phase can take. This value defaults to 10 in zookeeper. However, it turns out that my cluster had enough data to sync in the initial phase, which took longer the initLimit specified. Increasing the initLimit to about 50 fixed the issue. However, I wonder the side effects of a much higher initLimit value on the cluster.

More details after searching the net:

What happened here was that the server that was being elected as leader did go through leader election process successfully. It then started to send a snapshot of the state to its follower, however, before that process could be completed and the follower could finish sync to the leader state, the initLimit timeout was reached, and the leader thread decided it had to give up. So increasing initLimit to a value that allowed the snapshot transfer to complete fixed this problem.

S3 Multipart uploads with InputStream

AWS Documentation provides the example to upload a file using S3 Multipart Upload feature. This is available here

In one of my projects, I had a system using InputStream to talk to S3. While upgrading that to use S3 Multipart Feature, I was happy to see that the UploadPartRequest takes an InputStream, which meant that I could easily create the request as follows

UploadPartRequest uploadRequest = new UploadPartRequest().withUploadId(uploadId)
                .withBucketName(s3Bucket)
                .withKey(s3Key)
                .withInputStream(in)
                .withPartNumber(partNumber)
                .withPartSize(partSize)
                .withLastPart(lastPart)

The code would compile fine but interestingly, it would not upload any object with more than one part. The AmazonS3Client contains the following in the uploadPart() method

 finally {
            if (inputStream != null) {
                try {inputStream.close();}
                catch (Exception e) {}
            }
        }

i.e. The client would close the stream after every part. This is pretty interesting behavior from the AWS SDK. Taking a deeper look at how the file based uploads work with the SDK reveals the secret sauce

        InputStream inputStream = null;
        if (uploadPartRequest.getInputStream() != null) {
            inputStream = uploadPartRequest.getInputStream();
        } else if (uploadPartRequest.getFile() != null) {
            try {
                inputStream = new InputSubstream(new RepeatableFileInputStream(uploadPartRequest.getFile()),
                        uploadPartRequest.getFileOffset(), partSize, true);
            } catch (FileNotFoundException e) {
                throw new IllegalArgumentException("The specified file doesn't exist", e);
            }
        } else {
            throw new IllegalArgumentException("A File or InputStream must be specified when uploading part");
        }

i.e. for file based uploads, it creates an InputSubStream for each part to be uploaded and closes that after the part is uploaded successfully. In order to make it work with a provided InputStream, it is your responsibility to provide an InputStream that can closed for each part.

My first hack was to make it so that the client could not close the stream. A very simple way of achieving this is

/**
 * The caller must explictly close() the original stream
 */
public class NonCloseableBufferedInputStream extends InputStream {

    public NonCloseableInputStream(InputStream inputStream) {
        super(inputStream);
    }

    @Override
    public void close() {
        //do nothing
    }

}

By providing an InputStream wrapped with a NonCloseableInputStream, the uploadPart() call wouldn’t be the able to close the stream and the same stream could be passed to all the UploadPartRequests.

The code ran fine for a while however we would see a larger number of failed uploads relative to the previous upload scheme. This was confusing since the client was configured with a RetryPolicy to upload individual parts the same number of times. Scanning through the logs, I found the problem the hack

private void resetRequestAfterError(Request request, Exception cause) throws AmazonClientException {
        if ( request.getContent() == null ) {
            return; // no reset needed
        }
        if ( ! request.getContent().markSupported() ) {
            throw new AmazonClientException("Encountered an exception and stream is not resettable", cause);
        }
        try {
            request.getContent().reset();
        } catch ( IOException e ) {
            // This exception comes from being unable to reset the input stream,
            // so throw the original, more meaningful exception
            throw new AmazonClientException(
                    "Encountered an exception and couldn't reset the stream to retry", cause);
        }
    }

The expectation that every upload part is provided with its own InputStream is built into the retry logic for the client. While an error occurred while uploading a part, the resetRequestAfterError() method would reset the stream to the beginning. Normally this would lead to silent corrupted data uploads, however, since my stream couldn’t reset to the beginning, it failed with the error message “Encountered an exception and couldn’t reset the stream to retry”

Whats the workaround?

I ended up with reading the part into a byte[] and then wrapping it into a ByteArrayInputStream for the UploadPartRequest. This increases the memory requirements for the app but works like a charm.

byte[] part = new byte[partSize];
List partETags = new ArrayList();

long uploaded = 0;

for( int partNumber =  1; partNumber < numParts; partNumber++ ) {
   // make sure you read the data corresponding to the part as InputStream.read() may return with less data than asked for
   part = IOUtils.read(in, partSize);
   ByteArrayInputStream bais = new ByteArrayInputStream(part);
   
   UploadPartRequest uploadRequest = createUploadPartRequest(uploadId, s3Bucket, s3Key, bais, partNumber, partSize, lastPart);
   UploadPartResult result =  getS3Client().uploadPart(uploadRequest);
   partETags.add(result.getPartETag());
   uploaded += partSize;
}

long remaining = size - uploaded;

//read the remaining data into the buffer
part = IOUtils.read(in, remaining);
ByteArrayInputStream bais = new ByteArrayInputStream(part);

UploadPartRequest uploadRequest = createUploadPartRequest(uploadId, s3Bucket, s3Key, bais, partNumber, partSize, lastPart);
UploadPartResult result =  getS3Client().uploadPart(uploadRequest);
partETags.add(result.getPartETag());

If memory is a big concern, then you should create a SlicedInputStream for the range of the part. Note that in this case, a retry would need to reset to the start of the slice which could mean that you are skipping over the input stream from the start to the start of the slice depending upon the underlying stream in your application.

Java Garbage Collection Statistics

Anyone who has done even a moderate  sized project in java knows about the GC hell that comes with it. Its simple enough to have those GC statistics from the beginning rather than adding later and waiting for the process to run into GC issues. Here’s the piece that I typically add to the process and then fire up the log4J levels when the process shows signs of GC troubles.


private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); // single thread for logging
//in your method - initialize the logger
executor.scheduleWithFixedDelay(gcStatLogger, 60000, 60000, TimeUnit.MILLISECONDS ); // log every minute

private class GCStatLogger implements Runnable {

@Override
public void run() {
 logGCStats();
}

private void logGCStats() {
 long gcCount = 0;
 long gcTime = 0;
 for(GarbageCollectorMXBean gc :ManagementFactory.getGarbageCollectorMXBeans()) {
   long count = gc.getCollectionCount();
   if(count >=0){
    gcCount += count;
   }

   long time = gc.getCollectionTime();
   if(time >=0) {
    gcTime += time;
   }
  }

  log.debug("Total Garbage Collections: " + gcCount );
  log.debug("Total Garbage Collection Time (ms): "+ gcTime);
 }
}