diff --git a/docs/plugins/repository-s3.asciidoc b/docs/plugins/repository-s3.asciidoc
index cb7cc67ddbce9..565c94f5a7d0d 100644
--- a/docs/plugins/repository-s3.asciidoc
+++ b/docs/plugins/repository-s3.asciidoc
@@ -175,9 +175,10 @@ The following settings are supported:
http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html[AWS Multipart Upload API]
to split the chunk into several parts, each of `buffer_size` length, and
to upload each part in its own request. Note that setting a buffer
- size lower than `5mb` is not allowed since it will prevents the use of the
- Multipart API and may result in upload errors. Defaults to the minimum
- between `100mb` and `5%` of the heap size.
+ size lower than `5mb` is not allowed since it will prevent the use of the
+ Multipart API and may result in upload errors. It is also not possible to
+ set a buffer size greater than `5gb` as it is the maximum upload size
+ allowed by S3. Defaults to the minimum between `100mb` and `5%` of the heap size.
`canned_acl`::
diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java
deleted file mode 100644
index 811f6e7214146..0000000000000
--- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.repositories.s3;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-import com.amazonaws.util.Base64;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.DigestInputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part.
- *
- * When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request.
- * Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size).
- *
- * Quick facts about S3:
- *
- * Maximum object size: 5 TB
- * Maximum number of parts per upload: 10,000
- * Part numbers: 1 to 10,000 (inclusive)
- * Part size: 5 MB to 5 GB, last part can be < 5 MB
- *
- * See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
- * See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html
- */
-class DefaultS3OutputStream extends S3OutputStream {
-
- private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
- private static final Logger logger = Loggers.getLogger("cloud.aws");
- /**
- * Multipart Upload API data
- */
- private String multipartId;
- private int multipartChunks;
- private List multiparts;
-
- DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
- super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption);
- }
-
- @Override
- public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
- SocketAccess.doPrivilegedIOException(() -> {
- flushPrivileged(bytes, off, len, closing);
- return null;
- });
- }
-
- private void flushPrivileged(byte[] bytes, int off, int len, boolean closing) throws IOException {
- if (len > MULTIPART_MAX_SIZE.getBytes()) {
- throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
- }
-
- if (!closing) {
- if (len < getBufferSize()) {
- upload(bytes, off, len);
- } else {
- if (getFlushCount() == 0) {
- initializeMultipart();
- }
- uploadMultipart(bytes, off, len, false);
- }
- } else {
- if (multipartId != null) {
- uploadMultipart(bytes, off, len, true);
- completeMultipart();
- } else {
- upload(bytes, off, len);
- }
- }
- }
-
- /**
- * Upload data using a single request.
- */
- private void upload(byte[] bytes, int off, int len) throws IOException {
- try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
- try {
- doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
- } catch (AmazonClientException e) {
- throw new IOException("Unable to upload object " + getBlobName(), e);
- }
- }
- }
-
- protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length,
- boolean serverSideEncryption) throws AmazonS3Exception {
- ObjectMetadata md = new ObjectMetadata();
- if (serverSideEncryption) {
- md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
- }
- md.setContentLength(length);
-
- PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, is, md)
- .withStorageClass(blobStore.getStorageClass())
- .withCannedAcl(blobStore.getCannedACL());
- blobStore.client().putObject(putRequest);
-
- }
-
- private void initializeMultipart() {
- while (multipartId == null) {
- multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
- if (multipartId != null) {
- multipartChunks = 1;
- multiparts = new ArrayList<>();
- }
- }
- }
-
- protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
- InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName)
- .withCannedACL(blobStore.getCannedACL())
- .withStorageClass(blobStore.getStorageClass());
-
- if (serverSideEncryption) {
- ObjectMetadata md = new ObjectMetadata();
- md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
- request.setObjectMetadata(md);
- }
-
- return blobStore.client().initiateMultipartUpload(request).getUploadId();
- }
-
- private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
- try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
- try {
- PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
- multiparts.add(partETag);
- multipartChunks++;
- } catch (AmazonClientException e) {
- abortMultipart();
- throw e;
- }
- }
- }
-
- protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is,
- int length, boolean lastPart) throws AmazonS3Exception {
- UploadPartRequest request = new UploadPartRequest()
- .withBucketName(bucketName)
- .withKey(blobName)
- .withUploadId(uploadId)
- .withPartNumber(multipartChunks)
- .withInputStream(is)
- .withPartSize(length)
- .withLastPart(lastPart);
-
- UploadPartResult response = blobStore.client().uploadPart(request);
- return response.getPartETag();
-
- }
-
- private void completeMultipart() {
- try {
- doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
- multipartId = null;
- return;
- } catch (AmazonClientException e) {
- abortMultipart();
- throw e;
- }
- }
-
- protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts)
- throws AmazonS3Exception {
- CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts);
- blobStore.client().completeMultipartUpload(request);
- }
-
- private void abortMultipart() {
- if (multipartId != null) {
- try {
- doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId);
- } finally {
- multipartId = null;
- }
- }
- }
-
- protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId)
- throws AmazonS3Exception {
- blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
- }
-}
diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
index f49f4b348f02b..bb1130db42d9a 100644
--- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
+++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
@@ -21,35 +21,48 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
-import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.collect.Tuple;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-class S3BlobContainer extends AbstractBlobContainer {
+import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
+import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
+import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
- protected final S3BlobStore blobStore;
+class S3BlobContainer extends AbstractBlobContainer {
- protected final String keyPath;
+ private final S3BlobStore blobStore;
+ private final String keyPath;
S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
@@ -91,9 +104,15 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
- try (OutputStream stream = createOutput(blobName)) {
- Streams.copy(inputStream, stream);
- }
+
+ SocketAccess.doPrivilegedIOException(() -> {
+ if (blobSize <= blobStore.bufferSizeInBytes()) {
+ executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
+ } else {
+ executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
+ }
+ return null;
+ });
}
@Override
@@ -109,12 +128,6 @@ public void deleteBlob(String blobName) throws IOException {
}
}
- private OutputStream createOutput(final String blobName) throws IOException {
- // UploadS3OutputStream does buffering & retry logic internally
- return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName),
- blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption());
- }
-
@Override
public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
return AccessController.doPrivileged((PrivilegedAction