Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Blob Download Retries to GCS Repository (#52479) #52521

Merged
merged 1 commit into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand All @@ -34,7 +33,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -47,11 +45,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -176,32 +171,7 @@ Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucketName, blobName);
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId));
return Channels.newInputStream(new ReadableByteChannel() {
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
throw e;
}
}

@Override
public boolean isOpen() {
return readChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

/**
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
* This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
* the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future.
*/
class GoogleCloudStorageRetryingInputStream extends InputStream {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class);

static final int MAX_SUPPRESSED_EXCEPTIONS = 10;

private final Storage client;

private final BlobId blobId;

private final int maxRetries;

private InputStream currentStream;
private int attempt = 1;
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;

GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
this.client = client;
this.blobId = blobId;
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
currentStream = openStream();
}

private InputStream openStream() throws IOException {
try {
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
if (currentOffset > 0L) {
readChannel.seek(currentOffset);
}
return Channels.newInputStream(new ReadableByteChannel() {
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
}
throw e;
}
}

@Override
public boolean isOpen() {
return readChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
} catch (StorageException e) {
throw addSuppressedExceptions(e);
}
}

@Override
public int read() throws IOException {
ensureOpen();
while (true) {
try {
final int result = currentStream.read();
currentOffset += 1;
return result;
} catch (StorageException e) {
reopenStreamOrFail(e);
}
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
while (true) {
try {
final int bytesRead = currentStream.read(b, off, len);
if (bytesRead == -1) {
return -1;
}
currentOffset += bytesRead;
return bytesRead;
} catch (StorageException e) {
reopenStreamOrFail(e);
}
}
}

private void ensureOpen() {
if (closed) {
assert false : "using GoogleCloudStorageRetryingInputStream after close";
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
}
}

private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxRetries) {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
}
IOUtils.closeWhileHandlingException(currentStream);
currentStream = openStream();
}

@Override
public void close() throws IOException {
currentStream.close();
closed = true;
}

@Override
public long skip(long n) {
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
}

@Override
public void reset() {
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
}

private <T extends Exception> T addSuppressedExceptions(T e) {
for (StorageException failure : failures) {
e.addSuppressed(failure);
}
return e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,35 @@ public void testReadBlobWithRetries() throws Exception {
}
}

public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
final int offset = Integer.parseInt(range[0]);
final int end = Integer.parseInt(range[1]);
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
if (randomBoolean() && countDown.countDown() == false) {
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
exchange.close();
return;
}
exchange.getResponseBody().write(chunk);
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
}

public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomIntBetween(1, 3);
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));
Expand Down