Skip to content

Commit

Permalink
Fix Bug in Azure Repo Exception Handling (#47968) (#48030)
Browse files Browse the repository at this point in the history
We were incorrectly handling `IOExceptions` thrown by
the `InputStream` side of the upload operation, resulting
in a `ClassCastException` as we expected to never get
`IOException` from the Azure SDK code but we do in practice.
This PR also sets an assertion on `markSupported` for the
streams used by the SDK as adding the test for this scenario
revealed that the SDK client would retry uploads for
non-mark-supporting streams on `IOException` in the `InputStream`.
  • Loading branch information
original-brownbear authored Oct 15, 2019
1 parent b858e19 commit 5caa101
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private boolean blobExists(String blobName) {
logger.trace("blobExists({})", blobName);
try {
return blobStore.blobExists(buildKey(blobName));
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | StorageException | IOException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
}
return false;
Expand Down Expand Up @@ -97,7 +97,6 @@ public InputStream readBlob(String blobName) throws IOException {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} catch (URISyntaxException|StorageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -88,11 +87,11 @@ public BlobContainer blobContainer(BlobPath path) {
public void close() {
}

public boolean blobExists(String blob) throws URISyntaxException, StorageException {
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
return service.blobExists(clientName, container, blob);
}

public void deleteBlob(String blob) throws URISyntaxException, StorageException {
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
service.deleteBlob(clientName, container, blob);
}

Expand All @@ -106,17 +105,17 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag
}

public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
throws URISyntaxException, StorageException {
throws URISyntaxException, StorageException, IOException {
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
throws URISyntaxException, StorageException, IOException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public InputStream getInputStream(String account, String container, String blob)
}

public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException {
throws URISyntaxException, StorageException, IOException {
// NOTE: this should be here: if (prefix == null) prefix = "";
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
Expand Down Expand Up @@ -296,7 +296,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
return blobsBuilder.immutableMap();
}

public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException {
final Set<String> blobsBuilder = new HashSet<>();
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
Expand All @@ -320,8 +320,9 @@ public Set<String> children(String account, String container, BlobPath path) thr
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
assert inputStream.markSupported()
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.repositories.azure;

import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.SpecialPermission;

import java.io.IOException;
Expand All @@ -44,7 +45,9 @@ public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operati
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
Throwables.rethrow(e.getCause());
assert false : "always throws";
return null;
}
}

Expand All @@ -53,7 +56,9 @@ public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (StorageException) e.getCause();
Throwables.rethrow(e.getCause());
assert false : "always throws";
return null;
}
}

Expand All @@ -65,12 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor
return null;
});
} catch (PrivilegedActionException e) {
Throwable cause = e.getCause();
if (cause instanceof StorageException) {
throw (StorageException) cause;
} else {
throw (URISyntaxException) cause;
}
Throwables.rethrow(e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.Before;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
Expand All @@ -63,6 +64,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -294,6 +296,44 @@ public void testWriteLargeBlob() throws Exception {
assertThat(blocks.isEmpty(), is(true));
}

public void testRetryUntilFail() throws IOException {
final AtomicBoolean requestReceived = new AtomicBoolean(false);
httpServer.createContext("/container/write_blob_max_retries", exchange -> {
try {
if (requestReceived.compareAndSet(false, true)) {
throw new AssertionError("Should not receive two requests");
} else {
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
}
} finally {
exchange.close();
}
});

final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5));
try (InputStream stream = new InputStream() {

@Override
public int read() throws IOException {
throw new IOException("foo");
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void reset() {
throw new AssertionError("should not be called");
}
}) {
final IOException ioe = expectThrows(IOException.class, () ->
blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean()));
assertThat(ioe.getMessage(), is("foo"));
}
}

private static byte[] randomBlobContent() {
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}
Expand Down

0 comments on commit 5caa101

Please sign in to comment.