From 529b0cb545842237cf540c8c2d23edb783f18407 Mon Sep 17 00:00:00 2001
From: Andriy Redko <andriy.redko@aiven.io>
Date: Thu, 16 Dec 2021 15:10:14 -0500
Subject: [PATCH] [backport] [1.2] [plugin] repository-azure is not working
 properly hangs on basic operations (#1745)

* [plugin] repository-azure is not working properly hangs on basic operations

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Added tests cases and TODO items, addressing code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
---
 plugins/repository-azure/build.gradle         |  21 +-
 .../repositories/azure/AzureBlobStore.java    | 181 ++++++++++++------
 .../java/fixture/azure/AzureHttpHandler.java  |   1 +
 3 files changed, 142 insertions(+), 61 deletions(-)

diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle
index 71356129845e5..e87d231a9e5ea 100644
--- a/plugins/repository-azure/build.gradle
+++ b/plugins/repository-azure/build.gradle
@@ -283,4 +283,23 @@ task azureThirdPartyTest(type: Test) {
     nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
   }
 }
-check.dependsOn(azureThirdPartyTest)
+
+task azureThirdPartyDefaultXmlTest(type: Test) {
+  SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
+  SourceSet internalTestSourceSet = sourceSets.getByName(InternalClusterTestPlugin.SOURCE_SET_NAME)
+  setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
+  setClasspath(internalTestSourceSet.getRuntimeClasspath())
+  dependsOn tasks.internalClusterTest
+  include '**/AzureStorageCleanupThirdPartyTests.class'
+  systemProperty 'javax.xml.stream.XMLInputFactory', "com.sun.xml.internal.stream.XMLInputFactoryImpl"
+  systemProperty 'test.azure.account', azureAccount ? azureAccount : ""
+  systemProperty 'test.azure.key', azureKey ? azureKey : ""
+  systemProperty 'test.azure.sas_token', azureSasToken ? azureSasToken : ""
+  systemProperty 'test.azure.container', azureContainer ? azureContainer : ""
+  systemProperty 'test.azure.base', (azureBasePath ? azureBasePath : "") + "_third_party_tests_" + BuildParams.testSeed
+  if (useFixture) {
+    nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
+  }
+}
+
+check.dependsOn(azureThirdPartyTest, azureThirdPartyDefaultXmlTest)
diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java
index d4f3acf0a5c66..6345103c6ecc6 100644
--- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java
+++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java
@@ -35,6 +35,7 @@
 import com.azure.core.http.HttpMethod;
 import com.azure.core.http.HttpRequest;
 import com.azure.core.http.HttpResponse;
+import com.azure.core.http.rest.PagedResponse;
 import com.azure.core.http.rest.Response;
 import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
@@ -51,6 +52,7 @@
 import com.azure.storage.blob.options.BlobParallelUploadOptions;
 import com.azure.storage.common.implementation.Constants;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.util.Throwables;
@@ -82,6 +84,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -217,50 +220,71 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U
         final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);
 
         SocketAccess.doPrivilegedVoidException(() -> {
-            for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
-                // Skipping prefixes as those are not deletable and should not be there
-                assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
-
-                outstanding.incrementAndGet();
-                executor.execute(new AbstractRunnable() {
-                    @Override
-                    protected void doRun() throws Exception {
-                        final long len = blobItem.getProperties().getContentLength();
-
-                        final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
-                        logger.trace(
-                            () -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
-                        );
-                        final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
-                        logger.trace(
-                            () -> new ParameterizedMessage(
-                                "container [{}]: blob [{}] deleted status [{}].",
-                                container,
-                                blobItem.getName(),
-                                response.getStatusCode()
-                            )
-                        );
-
-                        blobsDeleted.incrementAndGet();
-                        if (len >= 0) {
-                            bytesDeleted.addAndGet(len);
+            String continuationToken = null;
+
+            do {
+                // Fetch one page at a time, others are going to be fetched by continuation token
+                // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
+                // gets addressed.
+                final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
+                    .streamByPage(continuationToken)
+                    .findFirst();
+
+                if (!pageOpt.isPresent()) {
+                    // No more pages, should never happen
+                    break;
+                }
+
+                final PagedResponse<BlobItem> page = pageOpt.get();
+                for (final BlobItem blobItem : page.getValue()) {
+                    // Skipping prefixes as those are not deletable and should not be there
+                    assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
+
+                    outstanding.incrementAndGet();
+                    executor.execute(new AbstractRunnable() {
+                        @Override
+                        protected void doRun() throws Exception {
+                            final long len = blobItem.getProperties().getContentLength();
+
+                            final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
+                            logger.trace(
+                                () -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
+                            );
+                            final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
+                            logger.trace(
+                                () -> new ParameterizedMessage(
+                                    "container [{}]: blob [{}] deleted status [{}].",
+                                    container,
+                                    blobItem.getName(),
+                                    response.getStatusCode()
+                                )
+                            );
+
+                            blobsDeleted.incrementAndGet();
+                            if (len >= 0) {
+                                bytesDeleted.addAndGet(len);
+                            }
                         }
-                    }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        exceptions.add(e);
-                    }
+                        @Override
+                        public void onFailure(Exception e) {
+                            exceptions.add(e);
+                        }
 
-                    @Override
-                    public void onAfter() {
-                        if (outstanding.decrementAndGet() == 0) {
-                            result.onResponse(null);
+                        @Override
+                        public void onAfter() {
+                            if (outstanding.decrementAndGet() == 0) {
+                                result.onResponse(null);
+                            }
                         }
-                    }
-                });
-            }
+                    });
+                }
+
+                // Fetch next continuation token
+                continuationToken = page.getContinuationToken();
+            } while (StringUtils.isNotBlank(continuationToken));
         });
+
         if (outstanding.decrementAndGet() == 0) {
             result.onResponse(null);
         }
@@ -301,20 +325,39 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix
             .setPrefix(keyPath + (prefix == null ? "" : prefix));
 
         SocketAccess.doPrivilegedVoidException(() -> {
-            for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
-                // Skipping over the prefixes, only look for the blobs
-                if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
-                    continue;
+            String continuationToken = null;
+
+            do {
+                // Fetch one page at a time, others are going to be fetched by continuation token
+                // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
+                // gets addressed
+                final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
+                    .streamByPage(continuationToken)
+                    .findFirst();
+
+                if (!pageOpt.isPresent()) {
+                    // No more pages, should never happen
+                    break;
                 }
 
-                final String name = getBlobName(blobItem.getName(), container, keyPath);
-                logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
+                final PagedResponse<BlobItem> page = pageOpt.get();
+                for (final BlobItem blobItem : page.getValue()) {
+                    // Skipping over the prefixes, only look for the blobs
+                    if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
+                        continue;
+                    }
 
-                final BlobItemProperties properties = blobItem.getProperties();
-                logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
-                blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
-            }
+                    final String name = getBlobName(blobItem.getName(), container, keyPath);
+                    logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
+
+                    final BlobItemProperties properties = blobItem.getProperties();
+                    logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
+                    blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
+                }
 
+                // Fetch next continuation token
+                continuationToken = page.getContinuationToken();
+            } while (StringUtils.isNotBlank(continuationToken));
         });
 
         return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
@@ -330,18 +373,36 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept
             .setPrefix(keyPath);
 
         SocketAccess.doPrivilegedVoidException(() -> {
-            for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
-                // Skipping over the blobs, only look for prefixes
-                if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
-                    // Expecting name in the form /container/keyPath.* and we want to strip off the /container/
-                    // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
-                    // Lastly, we add the length of keyPath to the offset to strip this container's path.
-                    final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
-                    logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
-                    blobsBuilder.add(name);
+            String continuationToken = null;
+
+            do {
+                // Fetch one page at a time, others are going to be fetched by continuation token
+                // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
+                // gets addressed
+                final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
+                    .streamByPage(continuationToken)
+                    .findFirst();
+
+                if (!pageOpt.isPresent()) {
+                    // No more pages, should never happen
+                    break;
                 }
-            }
-            ;
+
+                final PagedResponse<BlobItem> page = pageOpt.get();
+                for (final BlobItem blobItem : page.getValue()) {
+                    // Skipping over the blobs, only look for prefixes
+                    if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
+                        // Expecting name in the form /container/keyPath.* and we want to strip off the /container/
+                        // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
+                        // Lastly, we add the length of keyPath to the offset to strip this container's path.
+                        final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
+                        logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
+                        blobsBuilder.add(name);
+                    }
+                }
+                // Fetch next continuation token
+                continuationToken = page.getContinuationToken();
+            } while (StringUtils.isNotBlank(continuationToken));
         });
 
         return Collections.unmodifiableMap(
diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
index 68ebeb5265fd0..f12a4579a2d0c 100644
--- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
+++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
@@ -208,6 +208,7 @@ public void handle(final HttpExchange exchange) throws IOException {
 
                 }
                 list.append("</Blobs>");
+                list.append("<NextMarker />");
                 list.append("</EnumerationResults>");
 
                 byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);