S3 Multipart uploads with InputStream

AWS Documentation provides the example to upload a file using S3 Multipart Upload feature. This is available here

In one of my projects, I had a system using InputStream to talk to S3. While upgrading that to use S3 Multipart Feature, I was happy to see that the UploadPartRequest takes an InputStream, which meant that I could easily create the request as follows

UploadPartRequest uploadRequest = new UploadPartRequest().withUploadId(uploadId)
                .withBucketName(s3Bucket)
                .withKey(s3Key)
                .withInputStream(in)
                .withPartNumber(partNumber)
                .withPartSize(partSize)
                .withLastPart(lastPart)

The code would compile fine but interestingly, it would not upload any object with more than one part. The AmazonS3Client contains the following in the uploadPart() method

 finally {
            if (inputStream != null) {
                try {inputStream.close();}
                catch (Exception e) {}
            }
        }

i.e. The client would close the stream after every part. This is pretty interesting behavior from the AWS SDK. Taking a deeper look at how the file based uploads work with the SDK reveals the secret sauce

        InputStream inputStream = null;
        if (uploadPartRequest.getInputStream() != null) {
            inputStream = uploadPartRequest.getInputStream();
        } else if (uploadPartRequest.getFile() != null) {
            try {
                inputStream = new InputSubstream(new RepeatableFileInputStream(uploadPartRequest.getFile()),
                        uploadPartRequest.getFileOffset(), partSize, true);
            } catch (FileNotFoundException e) {
                throw new IllegalArgumentException("The specified file doesn't exist", e);
            }
        } else {
            throw new IllegalArgumentException("A File or InputStream must be specified when uploading part");
        }

i.e. for file based uploads, it creates an InputSubStream for each part to be uploaded and closes that after the part is uploaded successfully. In order to make it work with a provided InputStream, it is your responsibility to provide an InputStream that can closed for each part.

My first hack was to make it so that the client could not close the stream. A very simple way of achieving this is

/**
 * The caller must explictly close() the original stream
 */
public class NonCloseableBufferedInputStream extends InputStream {

    public NonCloseableInputStream(InputStream inputStream) {
        super(inputStream);
    }

    @Override
    public void close() {
        //do nothing
    }

}

By providing an InputStream wrapped with a NonCloseableInputStream, the uploadPart() call wouldn’t be the able to close the stream and the same stream could be passed to all the UploadPartRequests.

The code ran fine for a while however we would see a larger number of failed uploads relative to the previous upload scheme. This was confusing since the client was configured with a RetryPolicy to upload individual parts the same number of times. Scanning through the logs, I found the problem the hack

private void resetRequestAfterError(Request request, Exception cause) throws AmazonClientException {
        if ( request.getContent() == null ) {
            return; // no reset needed
        }
        if ( ! request.getContent().markSupported() ) {
            throw new AmazonClientException("Encountered an exception and stream is not resettable", cause);
        }
        try {
            request.getContent().reset();
        } catch ( IOException e ) {
            // This exception comes from being unable to reset the input stream,
            // so throw the original, more meaningful exception
            throw new AmazonClientException(
                    "Encountered an exception and couldn't reset the stream to retry", cause);
        }
    }

The expectation that every upload part is provided with its own InputStream is built into the retry logic for the client. While an error occurred while uploading a part, the resetRequestAfterError() method would reset the stream to the beginning. Normally this would lead to silent corrupted data uploads, however, since my stream couldn’t reset to the beginning, it failed with the error message “Encountered an exception and couldn’t reset the stream to retry”

Whats the workaround?

I ended up with reading the part into a byte[] and then wrapping it into a ByteArrayInputStream for the UploadPartRequest. This increases the memory requirements for the app but works like a charm.

byte[] part = new byte[partSize];
List partETags = new ArrayList();

long uploaded = 0;

for( int partNumber =  1; partNumber < numParts; partNumber++ ) {
   // make sure you read the data corresponding to the part as InputStream.read() may return with less data than asked for
   part = IOUtils.read(in, partSize);
   ByteArrayInputStream bais = new ByteArrayInputStream(part);
   
   UploadPartRequest uploadRequest = createUploadPartRequest(uploadId, s3Bucket, s3Key, bais, partNumber, partSize, lastPart);
   UploadPartResult result =  getS3Client().uploadPart(uploadRequest);
   partETags.add(result.getPartETag());
   uploaded += partSize;
}

long remaining = size - uploaded;

//read the remaining data into the buffer
part = IOUtils.read(in, remaining);
ByteArrayInputStream bais = new ByteArrayInputStream(part);

UploadPartRequest uploadRequest = createUploadPartRequest(uploadId, s3Bucket, s3Key, bais, partNumber, partSize, lastPart);
UploadPartResult result =  getS3Client().uploadPart(uploadRequest);
partETags.add(result.getPartETag());

If memory is a big concern, then you should create a SlicedInputStream for the range of the part. Note that in this case, a retry would need to reset to the start of the slice which could mean that you are skipping over the input stream from the start to the start of the slice depending upon the underlying stream in your application.