Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 upload integrity check #26

Merged
merged 8 commits into from
Apr 1, 2020
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<dependency>
<groupId>org.gaul</groupId>
<artifactId>s3proxy</artifactId>
<version>1.4.0</version>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/alex/mojaki/s3upload/ConvertibleOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;

/**
* A ByteArrayOutputStream with some useful additional functionality.
Expand All @@ -19,14 +23,14 @@ public ConvertibleOutputStream(int initialCapacity) {
/**
* Creates an InputStream sharing the same underlying byte array, reducing memory usage and copying time.
*/
public InputStream toInputStream(){
public InputStream toInputStream() {
return new ByteArrayInputStream(buf, 0, count);
}

/**
* Truncates this stream to a given size and returns a new stream containing a copy of the remaining data.
*
* @param countToKeep number of bytes to keep in this stream, starting from the first written byte.
* @param countToKeep number of bytes to keep in this stream, starting from the first written byte.
* @param initialCapacityForNewStream buffer capacity to construct the new stream (NOT the number of bytes
* that the new stream will take from this one)
* @return a new stream containing all the bytes previously contained in this one, i.e. from countToKeep + 1 onwards.
Expand All @@ -53,4 +57,11 @@ public void append(ConvertibleOutputStream otherStream) {
throw new AssertionError(e);
}
}

public byte[] getMD5Digest() {
MessageDigest md = Utils.md5();
md.update(buf, 0, count);
return md.digest();
}

}
12 changes: 12 additions & 0 deletions src/main/java/alex/mojaki/s3upload/IntegrityCheckException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package alex.mojaki.s3upload;

/**
* Thrown when final integrity check fails. It suggests that the multipart upload failed
* due to data corruption. See {@link StreamTransferManager#checkIntegrity(boolean)} for details.
*/
public class IntegrityCheckException extends RuntimeException {

public IntegrityCheckException(String message) {
super(message);
}
}
6 changes: 6 additions & 0 deletions src/main/java/alex/mojaki/s3upload/StreamPart.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package alex.mojaki.s3upload;

import com.amazonaws.util.Base64;

import java.io.InputStream;

/**
Expand Down Expand Up @@ -37,6 +39,10 @@ public long size() {
return stream.size();
}

public String getMD5Digest() {
return Base64.encodeAsString(stream.getMD5Digest());
}

@Override
public String toString() {
return String.format("[Part number %d %s]", partNumber,
Expand Down
86 changes: 84 additions & 2 deletions src/main/java/alex/mojaki/s3upload/StreamTransferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.util.BinaryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;

Expand Down Expand Up @@ -109,6 +113,7 @@ public class StreamTransferManager {
protected int numUploadThreads = 1;
protected int queueCapacity = 1;
protected int partSize = 5 * MB;
protected boolean checkIntegrity = false;
private final List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
private List<MultiPartOutputStream> multiPartOutputStreams;
private ExecutorServiceResultsHandler<Void> executorServiceResultsHandler;
Expand Down Expand Up @@ -241,6 +246,39 @@ public StreamTransferManager partSize(long partSize) {
return this;
}

/**
* Sets whether data integrity check should be performed during upload.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some documentation explaining the two ways that integrity is checked, and that the final check is based on undocumented behaviour of S3 that may change, so they may get a false alarm exception. Also just note how this may lead to exceptions in general.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

I added IntegrityCheckException to improve error handling. But I've also noticed that when a part upload fails, logs do not contain any information related. Sample logs when I deliberately make the request fail:

12:09:25.340 [main] INFO  o.g.s.org.eclipse.jetty.util.log - Logging initialized @1232ms
12:09:25.384 [main] INFO  o.g.s.o.eclipse.jetty.server.Server - jetty-9.2.z-SNAPSHOT
12:09:25.421 [main] INFO  o.g.s.o.e.j.server.ServerConnector - Started ServerConnector@9e1cb34{HTTP/1.1}{127.0.0.1:53212}
12:09:25.422 [main] INFO  o.g.s.o.eclipse.jetty.server.Server - Started @1316ms
12:09:26.365 [main] INFO  a.m.s3upload.StreamTransferManager - Initiated multipart upload to s3proxy-1209536669/stuff with full ID f8d39bbe-b92e-47ab-bb16-9debbe56ec47
12:09:28.818 [pool-3-thread-2] INFO  a.m.s3upload.MultiPartOutputStream - Called close() on [MultipartOutputStream for parts 5001 - 10000]
12:09:28.823 [pool-3-thread-1] INFO  a.m.s3upload.MultiPartOutputStream - Called close() on [MultipartOutputStream for parts 1 - 5000]
12:09:31.401 [main] INFO  o.g.s.o.e.j.server.ServerConnector - Stopped ServerConnector@9e1cb34{HTTP/1.1}{127.0.0.1:0}

java.lang.NullPointerException
	at alex.mojaki.s3upload.ExecutorServiceResultsHandler$1.next(ExecutorServiceResultsHandler.java:64)
	at alex.mojaki.s3upload.ExecutorServiceResultsHandler.awaitCompletion(ExecutorServiceResultsHandler.java:96)
	at alex.mojaki.s3upload.StreamTransferManager.complete(StreamTransferManager.java:358)
	at alex.mojaki.s3upload.test.StreamTransferManagerTest.testTransferManager(StreamTransferManagerTest.java:211)
	at alex.mojaki.s3upload.test.StreamTransferManagerTest.testTransferManager(StreamTransferManagerTest.java:156)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? What did you do in this case to make the request fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withPartSize(-1) or .setMd5Digest("123");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I modify UploadPartRequest in alex.mojaki.s3upload.StreamTransferManager#uploadStreamPart

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I reproduced this and ensured that a message shows in d3e4e20

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Thanks!

* <p>
* By default integrity check is disabled.
* <p>
* Essentially, data integrity check consists of two steps. First, each upload part integrity
* is verified. To ensure that data is not corrupted traversing the network, <b>Content-MD5</b>
* header is used. When the header is provided, Amazon S3 checks the object against
* the provided MD5 value and, if they do not match, returns an error. The header value is the
* base64-encoded 128-bit MD5 digest of the request body.
* <p>
* The second step is to ensure integrity of the final object merged from the uploaded parts.
* This is achieved by comparing the expected ETag value with the actual returned by S3.
* However, the ETag value is not a MD5 hash. When S3 combines the parts of a multipart upload
* into the final object, the ETag value is set to the hex-encoded MD5 hash of the concatenated
* binary-encoded MD5 hashes of each part followed by "-" and the number of parts, for instance:
* <pre>57f456164b0e5f365aaf9bb549731f32-95</pre>
* <b>Please note that the final check is based on undocumented behaviour of S3.</b>
*
* @param checkIntegrity <code>true</code> if data integrity should be checked
* @return this {@code StreamTransferManager} for chaining.
* @throws IllegalStateException if {@link StreamTransferManager#getMultiPartOutputStreams} has already
* been called, initiating the upload.
*/
public StreamTransferManager checkIntegrity(boolean checkIntegrity) {
ensureCanSet();
if (checkIntegrity) {
Utils.md5(); // check that algorithm is available
}
this.checkIntegrity = checkIntegrity;
return this;
}

private void ensureCanSet() {
if (queue != null) {
abort();
Expand Down Expand Up @@ -322,7 +360,7 @@ public void complete() {
executorServiceResultsHandler.awaitCompletion();
log.debug("{}: Pool terminated", this);
if (leftoverStreamPart != null) {
log.info("{}: Uploading leftover stream {}", leftoverStreamPart);
log.info("{}: Uploading leftover stream {}", this, leftoverStreamPart);
uploadStreamPart(leftoverStreamPart);
log.debug("{}: Leftover uploaded", this);
}
Expand All @@ -343,14 +381,43 @@ public void complete() {
uploadId,
partETags);
customiseCompleteRequest(completeRequest);
s3Client.completeMultipartUpload(completeRequest);
CompleteMultipartUploadResult completeMultipartUploadResult = s3Client.completeMultipartUpload(completeRequest);
if (checkIntegrity) {
checkCompleteFileIntegrity(completeMultipartUploadResult.getETag());
}
}
log.info("{}: Completed", this);
} catch (IntegrityCheckException e) {
// Nothing to abort. Upload has already finished.
throw e;
} catch (Throwable e) {
throw abort(e);
}
}

private void checkCompleteFileIntegrity(String s3ObjectETag) {
List<PartETag> parts = new ArrayList<PartETag>(partETags);
Collections.sort(parts, new PartNumberComparator());
String expectedETag = computeCompleteFileETag(parts);
if (!expectedETag.equals(s3ObjectETag)) {
throw new IntegrityCheckException(String.format(
"File upload completed, but integrity check failed. Expected ETag: %s but actual is %s",
expectedETag, s3ObjectETag));
}
}

private String computeCompleteFileETag(List<PartETag> parts) {
// When S3 combines the parts of a multipart upload into the final object, the ETag value is set to the
// hex-encoded MD5 hash of the concatenated binary-encoded (raw bytes) MD5 hashes of each part followed by
// "-" and the number of parts.
MessageDigest md = Utils.md5();
for (PartETag partETag : parts) {
md.update(BinaryUtils.fromHex(partETag.getETag()));
}
// Represent byte array as a 32-digit number hexadecimal format followed by "-<partCount>".
return String.format("%032x-%d", new BigInteger(1, md.digest()), parts.size());
}

/**
* Aborts the upload and rethrows the argument, wrapped in a RuntimeException if necessary.
* Write {@code throw abort(e)} to make it clear to the compiler and readers that the code
Expand Down Expand Up @@ -470,6 +537,9 @@ private void uploadStreamPart(StreamPart part) {
.withUploadId(uploadId).withPartNumber(part.getPartNumber())
.withInputStream(part.getInputStream())
.withPartSize(part.size());
if (checkIntegrity) {
uploadRequest.setMd5Digest(part.getMD5Digest());
}
customiseUploadPartRequest(uploadRequest);

UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest);
Expand Down Expand Up @@ -502,4 +572,16 @@ public void customiseCompleteRequest(CompleteMultipartUploadRequest request) {
public void customisePutEmptyObjectRequest(PutObjectRequest request) {
}

private static class PartNumberComparator implements Comparator<PartETag> {
@Override
public int compare(PartETag o1, PartETag o2) {
int partNumber1 = o1.getPartNumber();
int partNumber2 = o2.getPartNumber();

if (partNumber1 == partNumber2) {
return 0;
}
return partNumber1 > partNumber2 ? 1 : -1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace body with:

return Integer.compare(o1.getPartNumber(), o2.getPartNumber())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is available since java 1.7. This project uses 1.6.

}
}
}
12 changes: 12 additions & 0 deletions src/main/java/alex/mojaki/s3upload/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.lang.InterruptedException;
import java.lang.RuntimeException;
import java.lang.Thread;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
* Miscellaneous useful functions.
Expand Down Expand Up @@ -36,4 +38,14 @@ public static String skipMiddle(String string, int length) {
builder.append(string, inputLength - sideLength, inputLength);
return builder.toString();
}

public static MessageDigest md5() {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.reset();
return md;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.util.AwsHostNameUtils;
import com.amazonaws.util.IOUtils;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import com.google.inject.Module;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.gaul.s3proxy.AuthenticationType;
import org.gaul.s3proxy.S3Proxy;
import org.gaul.s3proxy.S3ProxyConstants;
import org.jclouds.Constants;
Expand Down Expand Up @@ -79,6 +81,10 @@ public void setUp() throws Exception {
Constants.PROPERTY_CREDENTIAL);
String endpoint = s3ProxyProperties.getProperty(
Constants.PROPERTY_ENDPOINT);
AuthenticationType s3Authorization = AuthenticationType.valueOf(
CaseFormat.LOWER_HYPHEN.to(CaseFormat.UPPER_UNDERSCORE,
s3ProxyProperties.getProperty(S3ProxyConstants.PROPERTY_AUTHORIZATION))
);
String s3Identity = s3ProxyProperties.getProperty(
S3ProxyConstants.PROPERTY_IDENTITY);
String s3Credential = s3ProxyProperties.getProperty(
Expand Down Expand Up @@ -112,7 +118,7 @@ public void setUp() throws Exception {
.endpoint(s3Endpoint);
//noinspection ConstantConditions
if (s3Identity != null || s3Credential != null) {
s3ProxyBuilder.awsAuthentication(s3Identity, s3Credential);
s3ProxyBuilder.awsAuthentication(s3Authorization, s3Identity, s3Credential);
}
if (keyStorePath != null || keyStorePassword != null) {
s3ProxyBuilder.keyStore(
Expand Down Expand Up @@ -176,7 +182,8 @@ public void customiseUploadPartRequest(UploadPartRequest request) {
}.numStreams(numStreams)
.numUploadThreads(2)
.queueCapacity(2)
.partSize(10);
.partSize(10)
.checkIntegrity(true);

final List<MultiPartOutputStream> streams = manager.getMultiPartOutputStreams();
List<StringBuilder> builders = new ArrayList<StringBuilder>(numStreams);
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/s3proxy.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
s3proxy.endpoint=https://127.0.0.1:0
s3proxy.endpoint=http://127.0.0.1:0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I leave https I get com.amazonaws.SdkClientException: Unable to execute HTTP request: Unrecognized SSL message, plaintext connection?. What I am doing wrong. Even without my changes it fails with s3proxy:1.7.0.

# authorization must be aws-v2 or none
s3proxy.authorization=aws-v2
s3proxy.identity=local-identity
Expand Down