diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 275d73386cb1a..e15ffaa1c783c 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -21,7 +21,6 @@ import com.google.api.gax.paging.Page; import com.google.cloud.BatchResult; -import com.google.cloud.ReadChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; @@ -34,7 +33,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -47,11 +45,8 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.nio.file.FileAlreadyExistsException; -import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -176,32 +171,7 @@ Map listChildren(BlobPath path) throws IOException { * @return the InputStream used to read the blob's content */ InputStream readBlob(String blobName) throws IOException { - final BlobId blobId = BlobId.of(bucketName, blobName); - final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId)); - return Channels.newInputStream(new ReadableByteChannel() { - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int read(ByteBuffer dst) throws IOException { - try { - return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst)); - } catch (StorageException e) { - if (e.getCode() == HTTP_NOT_FOUND) { - throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); - } - throw e; - } - } - - @Override - public boolean isOpen() { - return readChannel.isOpen(); - } - - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(readChannel::close); - } - }); + return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName)); } /** diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java new file mode 100644 index 0000000000000..880087def381f --- /dev/null +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.gcs; + +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; + +/** + * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. + * This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing + * the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future. + */ +class GoogleCloudStorageRetryingInputStream extends InputStream { + + private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class); + + static final int MAX_SUPPRESSED_EXCEPTIONS = 10; + + private final Storage client; + + private final BlobId blobId; + + private final int maxRetries; + + private InputStream currentStream; + private int attempt = 1; + private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); + private long currentOffset; + private boolean closed; + + GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException { + this.client = client; + this.blobId = blobId; + this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1; + currentStream = openStream(); + } + + private InputStream openStream() throws IOException { + try { + final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId)); + if (currentOffset > 0L) { + readChannel.seek(currentOffset); + } + return Channels.newInputStream(new ReadableByteChannel() { + @SuppressForbidden(reason = "Channel is based of a socket not a file") + @Override + public int read(ByteBuffer dst) throws IOException { + try { + return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst)); + } catch (StorageException e) { + if (e.getCode() == HTTP_NOT_FOUND) { + throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist"); + } + throw e; + } + } + + @Override + public boolean isOpen() { + return readChannel.isOpen(); + } + + @Override + public void close() throws IOException { + SocketAccess.doPrivilegedVoidIOException(readChannel::close); + } + }); + } catch (StorageException e) { + throw addSuppressedExceptions(e); + } + } + + @Override + public int read() throws IOException { + ensureOpen(); + while (true) { + try { + final int result = currentStream.read(); + currentOffset += 1; + return result; + } catch (StorageException e) { + reopenStreamOrFail(e); + } + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); + while (true) { + try { + final int bytesRead = currentStream.read(b, off, len); + if (bytesRead == -1) { + return -1; + } + currentOffset += bytesRead; + return bytesRead; + } catch (StorageException e) { + reopenStreamOrFail(e); + } + } + } + + private void ensureOpen() { + if (closed) { + assert false : "using GoogleCloudStorageRetryingInputStream after close"; + throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close"); + } + } + + private void reopenStreamOrFail(StorageException e) throws IOException { + if (attempt >= maxRetries) { + throw addSuppressedExceptions(e); + } + logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying", + blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e); + attempt += 1; + if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { + failures.add(e); + } + IOUtils.closeWhileHandlingException(currentStream); + currentStream = openStream(); + } + + @Override + public void close() throws IOException { + currentStream.close(); + closed = true; + } + + @Override + public long skip(long n) { + throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking"); + } + + @Override + public void reset() { + throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking"); + } + + private T addSuppressedExceptions(T e) { + for (StorageException failure : failures) { + e.addSuppressed(failure); + } + return e; + } +} diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 1d72e9c36c395..867e636243683 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -200,6 +200,35 @@ public void testReadBlobWithRetries() throws Exception { } } + public void testReadLargeBlobWithRetries() throws Exception { + final int maxRetries = randomIntBetween(2, 10); + final CountDown countDown = new CountDown(maxRetries); + + // SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks + final byte[] bytes = randomBytes(1 << 22); + httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> { + Streams.readFully(exchange.getRequestBody()); + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-"); + final int offset = Integer.parseInt(range[0]); + final int end = Integer.parseInt(range[1]); + final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length)); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length); + if (randomBoolean() && countDown.countDown() == false) { + exchange.getResponseBody().write(chunk, 0, chunk.length - 1); + exchange.close(); + return; + } + exchange.getResponseBody().write(chunk); + exchange.close(); + }); + + final BlobContainer blobContainer = createBlobContainer(maxRetries, null); + try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) { + assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream))); + } + } + public void testReadBlobWithReadTimeouts() { final int maxRetries = randomIntBetween(1, 3); final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));