From 778a07e188eaf6062872fe81053f7ef8048fddfc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 12 Oct 2019 22:07:50 +0200 Subject: [PATCH 1/2] Fix Bug in Azure Repo Exception Handling We were incorrectly handling `IOExceptions` thrown by the `InputStream` side of the upload operation, resulting in a `ClassCastException` as we expected to never get `IOException` from the Azure SDK code but we do in practice. This PR also sets an assertion on `markSupported` for the streams used by the SDK as adding the test for this scenario revealed that the SDK client would retry uploads for non-mark-supporting streams on `IOException` in the `InputStream`. --- .../azure/AzureBlobContainer.java | 3 +- .../repositories/azure/AzureBlobStore.java | 11 +++-- .../azure/AzureStorageService.java | 13 +++--- .../repositories/azure/SocketAccess.java | 38 +++++++++++++----- .../azure/AzureBlobContainerRetriesTests.java | 40 +++++++++++++++++++ 5 files changed, 81 insertions(+), 24 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index aaf7dc6391b65..15afaada84c83 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -62,7 +62,7 @@ private boolean blobExists(String blobName) { logger.trace("blobExists({})", blobName); try { return blobStore.blobExists(buildKey(blobName)); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | IOException e) { logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage()); } return false; @@ -97,7 +97,6 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); - try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 968d8396f7eed..714e29edea29d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Executor; @@ -88,11 +87,11 @@ public BlobContainer blobContainer(BlobPath path) { public void close() { } - public boolean blobExists(String blob) throws URISyntaxException, StorageException { + public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException { return service.blobExists(clientName, container, blob); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException { + public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { service.deleteBlob(clientName, container, blob); } @@ -106,17 +105,17 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag } public Map listBlobsByPrefix(String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public Map children(BlobPath path) throws URISyntaxException, StorageException { + public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + throws URISyntaxException, StorageException, IOException { service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 3e5638893c8ac..1f19c16a1e185 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -177,7 +177,7 @@ static String blobNameFromUri(URI uri) { return splits[2]; } - public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException { + public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { // Container name must be lower case. final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -187,7 +187,7 @@ public boolean blobExists(String account, String container, String blob) throws }); } - public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException { + public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); // Container name must be lower case. final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -267,7 +267,7 @@ public InputStream getInputStream(String account, String container, String blob) } public Map listBlobsByPrefix(String account, String container, String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { // NOTE: this should be here: if (prefix == null) prefix = ""; // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix! @@ -295,7 +295,7 @@ public Map listBlobsByPrefix(String account, String contai return Map.copyOf(blobsBuilder); } - public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { + public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException { final var blobsBuilder = new HashSet(); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -319,8 +319,9 @@ public Set children(String account, String container, BlobPath path) thr } public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, - boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException { + assert inputStream.markSupported() + : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java index 1400cc5b06627..afad71c274494 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java @@ -44,20 +44,28 @@ public static T doPrivilegedIOException(PrivilegedExceptionAction operati try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(cause); + } } } - public static T doPrivilegedException(PrivilegedExceptionAction operation) throws StorageException { + public static T doPrivilegedException(PrivilegedExceptionAction operation) + throws StorageException, IOException, URISyntaxException { SpecialPermission.check(); try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (StorageException) e.getCause(); + safeRethrowCause(e); + assert false : "always throws"; + return null; } } - public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException { + public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException, IOException { SpecialPermission.check(); try { AccessController.doPrivileged((PrivilegedExceptionAction) () -> { @@ -65,12 +73,22 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor return null; }); } catch (PrivilegedActionException e) { - Throwable cause = e.getCause(); - if (cause instanceof StorageException) { - throw (StorageException) cause; - } else { - throw (URISyntaxException) cause; - } + safeRethrowCause(e); + } + } + + private static void safeRethrowCause(PrivilegedActionException e) throws IOException, URISyntaxException, StorageException { + Throwable cause = e.getCause(); + if (cause instanceof StorageException) { + throw (StorageException) cause; + } else if (cause instanceof URISyntaxException) { + throw (URISyntaxException) cause; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new IOException(cause); } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 0aa7a3b0922f3..daf4e9ad57ba7 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -49,6 +49,7 @@ import org.junit.Before; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetAddress; @@ -63,6 +64,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -294,6 +296,44 @@ public void testWriteLargeBlob() throws Exception { assertThat(blocks.isEmpty(), is(true)); } + public void testRetryUntilFail() throws IOException { + final AtomicBoolean requestReceived = new AtomicBoolean(false); + httpServer.createContext("/container/write_blob_max_retries", exchange -> { + try { + if (requestReceived.compareAndSet(false, true)) { + throw new AssertionError("Should not receive two requests"); + } else { + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + } + } finally { + exchange.close(); + } + }); + + final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5)); + try (InputStream stream = new InputStream() { + + @Override + public int read() throws IOException { + throw new IOException("foo"); + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() { + throw new AssertionError("should not be called"); + } + }) { + final IOException ioe = expectThrows(IOException.class, () -> + blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean())); + assertThat(ioe.getMessage(), is("foo")); + } + } + private static byte[] randomBlobContent() { return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb } From 6cf65fc93e46233dd8b1374abd6197312e250f9b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 14 Oct 2019 14:45:56 +0200 Subject: [PATCH 2/2] throwables it is :) --- .../azure/AzureStorageService.java | 4 +-- .../repositories/azure/SocketAccess.java | 34 +++++-------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 1f19c16a1e185..26ade5bdec624 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -177,7 +177,7 @@ static String blobNameFromUri(URI uri) { return splits[2]; } - public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { + public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException { // Container name must be lower case. final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -187,7 +187,7 @@ public boolean blobExists(String account, String container, String blob) throws }); } - public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { + public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException { final Tuple> client = client(account); // Container name must be lower case. final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java index afad71c274494..18acf088cdb32 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.StorageException; +import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.SpecialPermission; import java.io.IOException; @@ -44,28 +45,24 @@ public static T doPrivilegedIOException(PrivilegedExceptionAction operati try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new IOException(cause); - } + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } - public static T doPrivilegedException(PrivilegedExceptionAction operation) - throws StorageException, IOException, URISyntaxException { + public static T doPrivilegedException(PrivilegedExceptionAction operation) throws StorageException { SpecialPermission.check(); try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - safeRethrowCause(e); + Throwables.rethrow(e.getCause()); assert false : "always throws"; return null; } } - public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException, IOException { + public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException { SpecialPermission.check(); try { AccessController.doPrivileged((PrivilegedExceptionAction) () -> { @@ -73,22 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor return null; }); } catch (PrivilegedActionException e) { - safeRethrowCause(e); - } - } - - private static void safeRethrowCause(PrivilegedActionException e) throws IOException, URISyntaxException, StorageException { - Throwable cause = e.getCause(); - if (cause instanceof StorageException) { - throw (StorageException) cause; - } else if (cause instanceof URISyntaxException) { - throw (URISyntaxException) cause; - } else if (cause instanceof IOException) { - throw (IOException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw new IOException(cause); + Throwables.rethrow(e.getCause()); } }