diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index aaf7dc6391b65..15afaada84c83 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -62,7 +62,7 @@ private boolean blobExists(String blobName) { logger.trace("blobExists({})", blobName); try { return blobStore.blobExists(buildKey(blobName)); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | IOException e) { logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage()); } return false; @@ -97,7 +97,6 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); - try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 968d8396f7eed..714e29edea29d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Executor; @@ -88,11 +87,11 @@ public BlobContainer blobContainer(BlobPath path) { public void close() { } - public boolean blobExists(String blob) throws URISyntaxException, StorageException { + public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException { return service.blobExists(clientName, container, blob); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException { + public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { service.deleteBlob(clientName, container, blob); } @@ -106,17 +105,17 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag } public Map listBlobsByPrefix(String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public Map children(BlobPath path) throws URISyntaxException, StorageException { + public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + throws URISyntaxException, StorageException, IOException { service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index fbdac39b9f692..ca7cb3475b445 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -268,7 +268,7 @@ public InputStream getInputStream(String account, String container, String blob) } public Map listBlobsByPrefix(String account, String container, String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { // NOTE: this should be here: if (prefix == null) prefix = ""; // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix! @@ -296,7 +296,7 @@ public Map listBlobsByPrefix(String account, String contai return blobsBuilder.immutableMap(); } - public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { + public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException { final Set blobsBuilder = new HashSet<>(); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -320,8 +320,9 @@ public Set children(String account, String container, BlobPath path) thr } public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, - boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException { + assert inputStream.markSupported() + : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java index 1400cc5b06627..18acf088cdb32 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.StorageException; +import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.SpecialPermission; import java.io.IOException; @@ -44,7 +45,9 @@ public static T doPrivilegedIOException(PrivilegedExceptionAction operati try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -53,7 +56,9 @@ public static T doPrivilegedException(PrivilegedExceptionAction operation try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (StorageException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -65,12 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor return null; }); } catch (PrivilegedActionException e) { - Throwable cause = e.getCause(); - if (cause instanceof StorageException) { - throw (StorageException) cause; - } else { - throw (URISyntaxException) cause; - } + Throwables.rethrow(e.getCause()); } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 0aa7a3b0922f3..daf4e9ad57ba7 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -49,6 +49,7 @@ import org.junit.Before; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetAddress; @@ -63,6 +64,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -294,6 +296,44 @@ public void testWriteLargeBlob() throws Exception { assertThat(blocks.isEmpty(), is(true)); } + public void testRetryUntilFail() throws IOException { + final AtomicBoolean requestReceived = new AtomicBoolean(false); + httpServer.createContext("/container/write_blob_max_retries", exchange -> { + try { + if (requestReceived.compareAndSet(false, true)) { + throw new AssertionError("Should not receive two requests"); + } else { + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + } + } finally { + exchange.close(); + } + }); + + final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5)); + try (InputStream stream = new InputStream() { + + @Override + public int read() throws IOException { + throw new IOException("foo"); + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() { + throw new AssertionError("should not be called"); + } + }) { + final IOException ioe = expectThrows(IOException.class, () -> + blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean())); + assertThat(ioe.getMessage(), is("foo")); + } + } + private static byte[] randomBlobContent() { return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb }