From 51c924bc7c3810fb426e106a16aba5898edb9e49 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 2 Nov 2017 22:09:29 +0100 Subject: [PATCH 1/3] Remove S3 output stream --- docs/plugins/repository-s3.asciidoc | 7 +- .../s3/DefaultS3OutputStream.java | 223 ----------- .../repositories/s3/S3BlobContainer.java | 193 +++++++++- .../repositories/s3/S3BlobStore.java | 4 +- .../repositories/s3/S3OutputStream.java | 119 ------ .../repositories/s3/S3Repository.java | 2 +- .../s3/MockDefaultS3OutputStream.java | 101 ----- .../s3/S3BlobStoreContainerTests.java | 360 +++++++++++++++++- .../repositories/s3/S3OutputStreamTests.java | 143 ------- 9 files changed, 541 insertions(+), 611 deletions(-) delete mode 100644 plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java delete mode 100644 plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java delete mode 100644 plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java delete mode 100644 plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java diff --git a/docs/plugins/repository-s3.asciidoc b/docs/plugins/repository-s3.asciidoc index cb7cc67ddbce9..565c94f5a7d0d 100644 --- a/docs/plugins/repository-s3.asciidoc +++ b/docs/plugins/repository-s3.asciidoc @@ -175,9 +175,10 @@ The following settings are supported: http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html[AWS Multipart Upload API] to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that setting a buffer - size lower than `5mb` is not allowed since it will prevents the use of the - Multipart API and may result in upload errors. Defaults to the minimum - between `100mb` and `5%` of the heap size. + size lower than `5mb` is not allowed since it will prevent the use of the + Multipart API and may result in upload errors. It is also not possible to + set a buffer size greater than `5gb` as it is the maximum upload size + allowed by S3. Defaults to the minimum between `100mb` and `5%` of the heap size. `canned_acl`:: diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java deleted file mode 100644 index 811f6e7214146..0000000000000 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; -import com.amazonaws.util.Base64; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.security.DigestInputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; - -/** - * DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part. - *

- * When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request. - * Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size). - *

- * Quick facts about S3: - *

- * Maximum object size: 5 TB - * Maximum number of parts per upload: 10,000 - * Part numbers: 1 to 10,000 (inclusive) - * Part size: 5 MB to 5 GB, last part can be < 5 MB - *

- * See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html - * See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html - */ -class DefaultS3OutputStream extends S3OutputStream { - - private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); - private static final Logger logger = Loggers.getLogger("cloud.aws"); - /** - * Multipart Upload API data - */ - private String multipartId; - private int multipartChunks; - private List multiparts; - - DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) { - super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption); - } - - @Override - public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException { - SocketAccess.doPrivilegedIOException(() -> { - flushPrivileged(bytes, off, len, closing); - return null; - }); - } - - private void flushPrivileged(byte[] bytes, int off, int len, boolean closing) throws IOException { - if (len > MULTIPART_MAX_SIZE.getBytes()) { - throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3"); - } - - if (!closing) { - if (len < getBufferSize()) { - upload(bytes, off, len); - } else { - if (getFlushCount() == 0) { - initializeMultipart(); - } - uploadMultipart(bytes, off, len, false); - } - } else { - if (multipartId != null) { - uploadMultipart(bytes, off, len, true); - completeMultipart(); - } else { - upload(bytes, off, len); - } - } - } - - /** - * Upload data using a single request. - */ - private void upload(byte[] bytes, int off, int len) throws IOException { - try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { - try { - doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption()); - } catch (AmazonClientException e) { - throw new IOException("Unable to upload object " + getBlobName(), e); - } - } - } - - protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, - boolean serverSideEncryption) throws AmazonS3Exception { - ObjectMetadata md = new ObjectMetadata(); - if (serverSideEncryption) { - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - } - md.setContentLength(length); - - PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, is, md) - .withStorageClass(blobStore.getStorageClass()) - .withCannedAcl(blobStore.getCannedACL()); - blobStore.client().putObject(putRequest); - - } - - private void initializeMultipart() { - while (multipartId == null) { - multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); - if (multipartId != null) { - multipartChunks = 1; - multiparts = new ArrayList<>(); - } - } - } - - protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { - InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName) - .withCannedACL(blobStore.getCannedACL()) - .withStorageClass(blobStore.getStorageClass()); - - if (serverSideEncryption) { - ObjectMetadata md = new ObjectMetadata(); - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - request.setObjectMetadata(md); - } - - return blobStore.client().initiateMultipartUpload(request).getUploadId(); - } - - private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException { - try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { - try { - PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart); - multiparts.add(partETag); - multipartChunks++; - } catch (AmazonClientException e) { - abortMultipart(); - throw e; - } - } - } - - protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, - int length, boolean lastPart) throws AmazonS3Exception { - UploadPartRequest request = new UploadPartRequest() - .withBucketName(bucketName) - .withKey(blobName) - .withUploadId(uploadId) - .withPartNumber(multipartChunks) - .withInputStream(is) - .withPartSize(length) - .withLastPart(lastPart); - - UploadPartResult response = blobStore.client().uploadPart(request); - return response.getPartETag(); - - } - - private void completeMultipart() { - try { - doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts); - multipartId = null; - return; - } catch (AmazonClientException e) { - abortMultipart(); - throw e; - } - } - - protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) - throws AmazonS3Exception { - CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts); - blobStore.client().completeMultipartUpload(request); - } - - private void abortMultipart() { - if (multipartId != null) { - try { - doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId); - } finally { - multipartId = null; - } - } - } - - protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) - throws AmazonS3Exception { - blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId)); - } -} diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index f49f4b348f02b..7eeaa53dacb45 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -21,35 +21,50 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.List; import java.util.Map; class S3BlobContainer extends AbstractBlobContainer { - protected final S3BlobStore blobStore; + static final ByteSizeValue MIN_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); + static final ByteSizeValue MAX_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.TB); + static final ByteSizeValue MAX_UPLOAD_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); - protected final String keyPath; + private final S3BlobStore blobStore; + private final String keyPath; S3BlobContainer(BlobPath path, S3BlobStore blobStore) { super(path); @@ -91,9 +106,12 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t if (blobExists(blobName)) { throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); } - try (OutputStream stream = createOutput(blobName)) { - Streams.copy(inputStream, stream); - } + + final UploadMethod method = (blobSize <= blobStore.bufferSizeInBytes()) ? this::executeSingleUpload : this::executeMultipartUpload; + SocketAccess.doPrivilegedIOException(() -> { + method.upload(blobStore, buildKey(blobName), inputStream, blobSize); + return null; + }); } @Override @@ -109,12 +127,6 @@ public void deleteBlob(String blobName) throws IOException { } } - private OutputStream createOutput(final String blobName) throws IOException { - // UploadS3OutputStream does buffering & retry logic internally - return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), - blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption()); - } - @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { return AccessController.doPrivileged((PrivilegedAction>) () -> { @@ -175,7 +187,160 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } - protected String buildKey(String blobName) { + private String buildKey(String blobName) { return keyPath + blobName; } + + @FunctionalInterface + interface UploadMethod { + void upload(S3BlobStore blobStore, String blobName, InputStream input, long blobSize) throws IOException; + } + + /** + * Uploads a blob using a single upload request + */ + void executeSingleUpload(final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize) throws IOException { + + if (blobSize > MAX_UPLOAD_SIZE.getBytes()) { + throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_UPLOAD_SIZE); + } + if (blobSize > blobStore.bufferSizeInBytes()) { + throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size"); + } + + try { + final ObjectMetadata md = new ObjectMetadata(); + md.setContentLength(blobSize); + if (blobStore.serverSideEncryption()) { + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } + + final PutObjectRequest putRequest = new PutObjectRequest(blobStore.bucket(), blobName, input, md); + putRequest.setStorageClass(blobStore.getStorageClass()); + putRequest.setCannedAcl(blobStore.getCannedACL()); + + blobStore.client().putObject(putRequest); + } catch (AmazonClientException e) { + throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); + } + } + + /** + * Uploads a blob using multipart upload requests. + */ + void executeMultipartUpload(final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize) throws IOException { + + if (blobSize > MAX_FILE_SIZE.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE); + } + if (blobSize < MIN_FILE_SIZE.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + "] can't be smaller than " + MIN_FILE_SIZE); + } + + final long partSize = blobStore.bufferSizeInBytes(); + final Tuple multiparts = numberOfMultiparts(blobSize, partSize); + + if (multiparts.v1() > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger buffer size?"); + } + + final int nbParts = multiparts.v1().intValue(); + final long lastPartSize = multiparts.v2(); + assert blobSize == (nbParts - 1) * partSize + lastPartSize : "blobSize does not match multipart sizes"; + + final SetOnce uploadId = new SetOnce<>(); + final String bucketName = blobStore.bucket(); + boolean success = false; + + try { + final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName); + initRequest.setStorageClass(blobStore.getStorageClass()); + initRequest.setCannedACL(blobStore.getCannedACL()); + if (blobStore.serverSideEncryption()) { + final ObjectMetadata md = new ObjectMetadata(); + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + initRequest.setObjectMetadata(md); + } + + uploadId.set(blobStore.client().initiateMultipartUpload(initRequest).getUploadId()); + if (Strings.isEmpty(uploadId.get())) { + throw new IOException("Failed to initialize multipart upload " + blobName); + } + + final List parts = new ArrayList<>(); + + long bytesCount = 0; + for (int i = 1; i <= nbParts; i++) { + final UploadPartRequest uploadRequest = new UploadPartRequest(); + uploadRequest.setBucketName(bucketName); + uploadRequest.setKey(blobName); + uploadRequest.setUploadId(uploadId.get()); + uploadRequest.setPartNumber(i); + uploadRequest.setInputStream(input); + + if (i < nbParts) { + uploadRequest.setPartSize(partSize); + uploadRequest.setLastPart(false); + } else { + uploadRequest.setPartSize(lastPartSize); + uploadRequest.setLastPart(true); + } + bytesCount += uploadRequest.getPartSize(); + + final UploadPartResult uploadResponse = blobStore.client().uploadPart(uploadRequest); + parts.add(uploadResponse.getPartETag()); + } + + if (bytesCount != blobSize) { + throw new IOException("Failed to execute multipart upload for [" + blobName + "], expected " + blobSize + + "bytes sent but got " + bytesCount); + } + + CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId.get(), parts); + blobStore.client().completeMultipartUpload(complRequest); + success = true; + + } catch (AmazonClientException e) { + throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e); + } finally { + if (success == false && Strings.hasLength(uploadId.get())) { + final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get()); + blobStore.client().abortMultipartUpload(abortRequest); + } + } + } + + /** + * Returns the number parts of size of {@code partSize} needed to reach {@code totalSize}, + * along with the size of the last (or unique) part. + * + * @param totalSize the total size + * @param partSize the part size + * @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and + * the size of the last part + */ + static Tuple numberOfMultiparts(final long totalSize, final long partSize) { + if (partSize <= 0) { + throw new IllegalArgumentException("Part size must be greater than zero"); + } + + if (totalSize == 0L || totalSize <= partSize) { + return Tuple.tuple(1L, totalSize); + } + + final long parts = totalSize / partSize; + final long remaining = totalSize % partSize; + + if (remaining == 0) { + return Tuple.tuple(parts, partSize); + } else { + return Tuple.tuple(parts + 1, remaining); + } + } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index d951b31c07d67..27349f12135ed 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -93,8 +93,8 @@ public boolean serverSideEncryption() { return serverSideEncryption; } - public int bufferSizeInBytes() { - return bufferSize.bytesAsInt(); + public long bufferSizeInBytes() { + return bufferSize.getBytes(); } @Override diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java deleted file mode 100644 index 46c9108f1b585..0000000000000 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * S3OutputStream buffers data before flushing it to an underlying S3OutputStream. - */ -abstract class S3OutputStream extends OutputStream { - - /** - * Limit of upload allowed by AWS S3. - */ - protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); - protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); - - private S3BlobStore blobStore; - private String bucketName; - private String blobName; - private boolean serverSideEncryption; - - private byte[] buffer; - private int count; - private long length; - - private int flushCount = 0; - - S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) { - this.blobStore = blobStore; - this.bucketName = bucketName; - this.blobName = blobName; - this.serverSideEncryption = serverSideEncryption; - - if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) { - throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE); - } - if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) { - throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE); - } - - this.buffer = new byte[bufferSizeInBytes]; - } - - public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException; - - private void flushBuffer(boolean closing) throws IOException { - flush(buffer, 0, count, closing); - flushCount++; - count = 0; - } - - @Override - public void write(int b) throws IOException { - if (count >= buffer.length) { - flushBuffer(false); - } - - buffer[count++] = (byte) b; - length++; - } - - @Override - public void close() throws IOException { - if (count > 0) { - flushBuffer(true); - count = 0; - } - } - - public S3BlobStore getBlobStore() { - return blobStore; - } - - public String getBucketName() { - return bucketName; - } - - public String getBlobName() { - return blobName; - } - - public int getBufferSize() { - return buffer.length; - } - - public boolean isServerSideEncryption() { - return serverSideEncryption; - } - - public long getLength() { - return length; - } - - public int getFlushCount() { - return flushCount; - } -} diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index eeca906ff4998..9ee59ee9456f9 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -87,7 +87,7 @@ class S3Repository extends BlobStoreRepository { * use of the Multipart API and may result in upload errors. Defaults to the minimum between 100MB and 5% of the heap size. */ static final Setting BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, - new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB)); + new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.GB)); /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java deleted file mode 100644 index 3a48b70e307c3..0000000000000 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.PartETag; -import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.repositories.s3.DefaultS3OutputStream; -import org.elasticsearch.repositories.s3.S3BlobStore; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; - -public class MockDefaultS3OutputStream extends DefaultS3OutputStream { - - private ByteArrayOutputStream out = new ByteArrayOutputStream(); - - private boolean initialized = false; - private boolean completed = false; - private boolean aborted = false; - - private int numberOfUploadRequests = 0; - - public MockDefaultS3OutputStream(int bufferSizeInBytes) { - super(null, "test-bucket", "test-blobname", bufferSizeInBytes, false); - } - - @Override - protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception { - try { - long copied = Streams.copy(is, out); - if (copied != length) { - throw new AmazonS3Exception("Not all the bytes were copied"); - } - numberOfUploadRequests++; - } catch (IOException e) { - throw new AmazonS3Exception(e.getMessage()); - } - } - - @Override - protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { - initialized = true; - return RandomizedTest.randomAsciiOfLength(50); - } - - @Override - protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception { - try { - long copied = Streams.copy(is, out); - if (copied != length) { - throw new AmazonS3Exception("Not all the bytes were copied"); - } - return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50)); - } catch (IOException e) { - throw new AmazonS3Exception(e.getMessage()); - } - } - - @Override - protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) throws AmazonS3Exception { - completed = true; - } - - @Override - protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception { - aborted = true; - } - - public int getNumberOfUploadRequests() { - return numberOfUploadRequests; - } - - public boolean isMultipart() { - return (numberOfUploadRequests > 1) && initialized && completed && !aborted; - } - - public byte[] toByteArray() { - return out.toByteArray(); - } -} diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 45ffac30aa7fb..c8d546b0974b6 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -19,10 +19,24 @@ package org.elasticsearch.repositories.s3; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.Logger; +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -30,15 +44,28 @@ import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.mockito.ArgumentCaptor; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase { +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; - private static final Logger logger = Loggers.getLogger(S3BlobStoreContainerTests.class); +public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase { private static ServerSocket mockS3ServerSocket; @@ -69,6 +96,329 @@ protected BlobStore newBlobStore() throws IOException { new ByteSizeValue(10, ByteSizeUnit.MB), "public-read-write", "standard"); } + public void testExecuteSingleUploadBlobSizeTooLarge() throws IOException { + final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(6, 10)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)); + assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage()); + } + + public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bufferSizeInBytes()).thenReturn(ByteSizeUnit.MB.toBytes(1)); + + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeSingleUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2))); + assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage()); + } + + public void testExecuteSingleUpload() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final int bufferSize = randomIntBetween(1024, 2048); + final int blobSize = randomIntBetween(0, bufferSize); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + + final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult()); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]); + blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize); + + final PutObjectRequest request = argumentCaptor.getValue(); + assertEquals(bucketName, request.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, request.getKey()); + assertEquals(inputStream, request.getInputStream()); + assertEquals(blobSize, request.getMetadata().getContentLength()); + assertEquals(storageClass.toString(), request.getStorageClass()); + assertEquals(cannedAccessControlList, request.getCannedAcl()); + if (serverSideEncryption) { + assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, request.getMetadata().getSSEAlgorithm()); + } + } + + public void testExecuteMultipartUploadBlobSizeTooLarge() throws IOException { + final long blobSize = ByteSizeUnit.TB.toBytes(randomIntBetween(6, 10)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + ); + assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage()); + } + + public void testExecuteMultipartUploadBlobSizeTooSmall() throws IOException { + final long blobSize = ByteSizeUnit.MB.toBytes(randomIntBetween(1, 4)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + ); + assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage()); + } + + public void testExecuteMultipartUpload() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(1, 1024)); + final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024)); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + + final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final ArgumentCaptor initArgCaptor = ArgumentCaptor.forClass(InitiateMultipartUploadRequest.class); + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(randomAlphaOfLength(10)); + when(client.initiateMultipartUpload(initArgCaptor.capture())).thenReturn(initResult); + + final ArgumentCaptor uploadArgCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); + + final List expectedEtags = new ArrayList<>(); + long partSize = Math.min(bufferSize, blobSize); + long totalBytes = 0; + do { + expectedEtags.add(randomAlphaOfLength(50)); + totalBytes += partSize; + } while (totalBytes < blobSize); + + when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> { + final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; + final UploadPartResult response = new UploadPartResult(); + response.setPartNumber(request.getPartNumber()); + response.setETag(expectedEtags.get(request.getPartNumber() - 1)); + return response; + }); + + final ArgumentCaptor compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult()); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize); + + final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue(); + assertEquals(bucketName, initRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, initRequest.getKey()); + assertEquals(storageClass, initRequest.getStorageClass()); + assertEquals(cannedAccessControlList, initRequest.getCannedACL()); + if (serverSideEncryption) { + assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, initRequest.getObjectMetadata().getSSEAlgorithm()); + } + + final Tuple numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, bufferSize); + + final List uploadRequests = uploadArgCaptor.getAllValues(); + assertEquals(numberOfParts.v1().intValue(), uploadRequests.size()); + + for (int i = 0; i < uploadRequests.size(); i++) { + UploadPartRequest uploadRequest = uploadRequests.get(i); + + assertEquals(bucketName, uploadRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey()); + assertEquals(initResult.getUploadId(), uploadRequest.getUploadId()); + assertEquals(i + 1, uploadRequest.getPartNumber()); + assertEquals(inputStream, uploadRequest.getInputStream()); + + if (i == (uploadRequests.size() -1)) { + assertTrue(uploadRequest.isLastPart()); + assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize()); + } else { + assertFalse(uploadRequest.isLastPart()); + assertEquals(bufferSize, uploadRequest.getPartSize()); + } + } + + final CompleteMultipartUploadRequest compRequest = compArgCaptor.getValue(); + assertEquals(bucketName, compRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, compRequest.getKey()); + assertEquals(initResult.getUploadId(), compRequest.getUploadId()); + + List actualETags = compRequest.getPartETags().stream().map(PartETag::getETag).collect(Collectors.toList()); + assertEquals(expectedEtags, actualETags); + } + + public void testExecuteMultipartUploadAborted() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final long blobSize = ByteSizeUnit.MB.toBytes(765); + final long bufferSize = ByteSizeUnit.MB.toBytes(150); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + when(blobStore.getStorageClass()).thenReturn(randomFrom(StorageClass.values())); + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final String uploadId = randomAlphaOfLength(25); + + final int stage = randomInt(2); + final List exceptions = Arrays.asList( + new AmazonClientException("Expected initialization request to fail"), + new AmazonClientException("Expected upload part request to fail"), + new AmazonClientException("Expected completion request to fail") + ); + + if (stage == 0) { + // Fail the initialization request + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))) + .thenThrow(exceptions.get(stage)); + + } else if (stage == 1) { + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(uploadId); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult); + + // Fail the upload part request + when(client.uploadPart(any(UploadPartRequest.class))) + .thenThrow(exceptions.get(stage)); + + } else { + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(uploadId); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult); + + when(client.uploadPart(any(UploadPartRequest.class))).thenAnswer(invocationOnMock -> { + final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; + final UploadPartResult response = new UploadPartResult(); + response.setPartNumber(request.getPartNumber()); + response.setETag(randomAlphaOfLength(20)); + return response; + }); + + // Fail the completion request + when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) + .thenThrow(exceptions.get(stage)); + } + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AbortMultipartUploadRequest.class); + doNothing().when(client).abortMultipartUpload(argumentCaptor.capture()); + + final IOException e = expectThrows(IOException.class, () -> { + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize); + }); + + assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage()); + assertThat(e.getCause(), instanceOf(AmazonClientException.class)); + assertEquals(exceptions.get(stage).getMessage(), e.getCause().getMessage()); + + if (stage == 0) { + verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)); + verify(client, times(0)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + + } else { + verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)); + + if (stage == 1) { + verify(client, times(1)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + } else { + verify(client, times(6)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + } + + verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + + final AbortMultipartUploadRequest abortRequest = argumentCaptor.getValue(); + assertEquals(bucketName, abortRequest.getBucketName()); + assertEquals(blobName, abortRequest.getKey()); + assertEquals(uploadId, abortRequest.getUploadId()); + } + } + + public void testNumberOfMultipartsWithZeroPartSize() { + IllegalArgumentException e = + expectThrows(IllegalArgumentException.class, () -> S3BlobContainer.numberOfMultiparts(randomNonNegativeLong(), 0L)); + assertEquals("Part size must be greater than zero", e.getMessage()); + } + + public void testNumberOfMultiparts() { + final ByteSizeUnit unit = randomFrom(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB); + final long size = unit.toBytes(randomIntBetween(1, 10)); + final int factor = randomIntBetween(2, 10); + + // Fits in 1 empty part + assertNumberOfMultiparts(1, 0L, 0L, size); + + // Fits in 1 part exactly + assertNumberOfMultiparts(1, size, size, size); + assertNumberOfMultiparts(1, size, size, size * factor); + + // Fits in N parts exactly + assertNumberOfMultiparts(factor, size, size * factor, size); + + // Fits in N parts plus a bit more + final long remaining = randomIntBetween(1, (size > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) size - 1); + assertNumberOfMultiparts(factor + 1, remaining, size * factor + remaining, size); + } + + private static void assertNumberOfMultiparts(final int expectedParts, final long expectedRemaining, long totalSize, long partSize) { + final Tuple result = S3BlobContainer.numberOfMultiparts(totalSize, partSize); + + assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1()); + assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2()); + } + @AfterClass public static void closeMockSocket() throws IOException, InterruptedException { mockS3ServerSocket.close(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java deleted file mode 100644 index 8f4c7daea7edf..0000000000000 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.test.ESTestCase; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Arrays; - -import static org.elasticsearch.common.io.Streams.copy; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - -/** - * Unit test for {@link S3OutputStream}. - */ -public class S3OutputStreamTests extends ESTestCase { - private static final int BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytesAsInt(); - - public void testWriteLessDataThanBufferSize() throws IOException { - MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); - byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes("UTF-8"); - copy(content, out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) content.length)); - assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); - assertThat(out.getFlushCount(), equalTo(1)); - assertThat(out.getNumberOfUploadRequests(), equalTo(1)); - assertFalse(out.isMultipart()); - - } - - public void testWriteSameDataThanBufferSize() throws IOException { - int size = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE); - MockDefaultS3OutputStream out = newS3OutputStream(size); - - ByteArrayOutputStream content = new ByteArrayOutputStream(size); - for (int i = 0; i < size; i++) { - content.write(randomByte()); - } - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) size)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(size)); - assertThat(out.getFlushCount(), equalTo(1)); - assertThat(out.getNumberOfUploadRequests(), equalTo(1)); - assertFalse(out.isMultipart()); - - } - - public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException { - int n = randomIntBetween(2, 3); - int length = n * BUFFER_SIZE; - ByteArrayOutputStream content = new ByteArrayOutputStream(length); - - for (int i = 0; i < length; i++) { - content.write(randomByte()); - } - - MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) length)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); - assertThat(out.getFlushCount(), equalTo(n)); - - assertThat(out.getNumberOfUploadRequests(), equalTo(n)); - assertTrue(out.isMultipart()); - } - - public void testWriteRandomNumberOfBytes() throws IOException { - Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE); - MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize); - - Integer randomLength = randomIntBetween(1, 2 * BUFFER_SIZE); - ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength); - for (int i = 0; i < randomLength; i++) { - content.write(randomByte()); - } - - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) randomLength)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - assertThat(out.getBufferSize(), equalTo(randomBufferSize)); - int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue()); - assertThat(out.getFlushCount(), equalTo(times)); - if (times > 1) { - assertTrue(out.isMultipart()); - } else { - assertFalse(out.isMultipart()); - } - } - - public void testWrongBufferSize() throws IOException { - Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024); - try { - newS3OutputStream(randomBufferSize); - fail("Buffer size can't be smaller than 5mb"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), is("Buffer size can't be smaller than 5mb")); - } - } - - private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) { - return new MockDefaultS3OutputStream(bufferSizeInBytes); - } - -} From aed3045592bab3080112715db613a9416b41f2f4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 9 Nov 2017 09:50:32 +0100 Subject: [PATCH 2/3] Remove functional interface Close #26969 Related #26993 --- .../repositories/s3/S3BlobContainer.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 7eeaa53dacb45..fe6808ae56aeb 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -107,9 +107,12 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); } - final UploadMethod method = (blobSize <= blobStore.bufferSizeInBytes()) ? this::executeSingleUpload : this::executeMultipartUpload; SocketAccess.doPrivilegedIOException(() -> { - method.upload(blobStore, buildKey(blobName), inputStream, blobSize); + if (blobSize <= blobStore.bufferSizeInBytes()) { + executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); + } else { + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize); + } return null; }); } @@ -191,11 +194,6 @@ private String buildKey(String blobName) { return keyPath + blobName; } - @FunctionalInterface - interface UploadMethod { - void upload(S3BlobStore blobStore, String blobName, InputStream input, long blobSize) throws IOException; - } - /** * Uploads a blob using a single upload request */ From cc3c690ae9d1377cdf6ab68304e79617d3ff31ce Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 9 Nov 2017 14:11:48 +0100 Subject: [PATCH 3/3] Move constants and add doc --- .../repositories/s3/S3BlobContainer.java | 25 ++++++++-------- .../repositories/s3/S3Repository.java | 30 ++++++++++++++++--- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index fe6808ae56aeb..bb1130db42d9a 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -44,8 +44,6 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; import java.io.IOException; import java.io.InputStream; @@ -57,11 +55,11 @@ import java.util.List; import java.util.Map; -class S3BlobContainer extends AbstractBlobContainer { +import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE; +import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART; +import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART; - static final ByteSizeValue MIN_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); - static final ByteSizeValue MAX_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.TB); - static final ByteSizeValue MAX_UPLOAD_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); +class S3BlobContainer extends AbstractBlobContainer { private final S3BlobStore blobStore; private final String keyPath; @@ -202,8 +200,9 @@ void executeSingleUpload(final S3BlobStore blobStore, final InputStream input, final long blobSize) throws IOException { - if (blobSize > MAX_UPLOAD_SIZE.getBytes()) { - throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_UPLOAD_SIZE); + // Extra safety checks + if (blobSize > MAX_FILE_SIZE.getBytes()) { + throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE); } if (blobSize > blobStore.bufferSizeInBytes()) { throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size"); @@ -234,11 +233,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final InputStream input, final long blobSize) throws IOException { - if (blobSize > MAX_FILE_SIZE.getBytes()) { - throw new IllegalArgumentException("Multipart upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE); + if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + + "] can't be larger than " + MAX_FILE_SIZE_USING_MULTIPART); } - if (blobSize < MIN_FILE_SIZE.getBytes()) { - throw new IllegalArgumentException("Multipart upload request size [" + blobSize + "] can't be smaller than " + MIN_FILE_SIZE); + if (blobSize < MIN_PART_SIZE_USING_MULTIPART.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + + "] can't be smaller than " + MIN_PART_SIZE_USING_MULTIPART); } final long partSize = blobStore.bufferSizeInBytes(); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 9ee59ee9456f9..51bb6f2024cd4 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -19,8 +19,6 @@ package org.elasticsearch.repositories.s3; -import java.io.IOException; - import com.amazonaws.services.s3.AmazonS3; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; @@ -37,6 +35,8 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import java.io.IOException; + /** * Shared file system implementation of the BlobStoreRepository *

@@ -80,14 +80,36 @@ class S3Repository extends BlobStoreRepository { */ static final Setting SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false); + /** + * Maximum size of files that can be uploaded using a single upload request. + */ + static final ByteSizeValue MAX_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); + + /** + * Minimum size of parts that can be uploaded using the Multipart Upload API. + * (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html) + */ + static final ByteSizeValue MIN_PART_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.MB); + + /** + * Maximum size of parts that can be uploaded using the Multipart Upload API. + * (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html) + */ + static final ByteSizeValue MAX_PART_SIZE_USING_MULTIPART = MAX_FILE_SIZE; + + /** + * Maximum size of files that can be uploaded using the Multipart Upload API. + */ + static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB); + /** * Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, * the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and * to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the * use of the Multipart API and may result in upload errors. Defaults to the minimum between 100MB and 5% of the heap size. */ - static final Setting BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, - new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.GB)); + static final Setting BUFFER_SIZE_SETTING = + Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, MIN_PART_SIZE_USING_MULTIPART, MAX_PART_SIZE_USING_MULTIPART); /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.