From 58420ce346cfc986089e053290e80829d5ffa012 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 19 Nov 2019 09:55:36 -0500 Subject: [PATCH] HttpHandlers should return correct list of objects (#49283) This commit fixes the server side logic of "List Objects" operations of Azure and S3 fixtures. Until today, the fixtures were returning a " flat" view of stored objects and were not correctly handling the delimiter parameter. This causes some objects listing to be wrongly interpreted by the snapshot deletion logic in Elasticsearch which relies on the ability to list child containers of BlobContainer (#42653) to correctly delete stale indices. As a consequence, the blobs were not correctly deleted from the emulated storage service and stayed in heap until they got garbage collected, causing CI failures like #48978. This commit fixes the server side logic of Azure and S3 fixture when listing objects so that it now return correct common blob prefixes as expected by the snapshot deletion process. It also adds an after-test check to ensure that tests leave the repository empty (besides the root index files). Closes #48978 --- .../azure/AzureBlobStoreRepositoryTests.java | 10 ++++- ...eCloudStorageBlobStoreRepositoryTests.java | 13 ++++++- .../s3/S3BlobStoreRepositoryTests.java | 10 ++++- .../java/fixture/azure/AzureHttpHandler.java | 33 +++++++++++++++-- .../gcs/GoogleCloudStorageHttpHandler.java | 22 ++++++----- .../main/java/fixture/s3/S3HttpHandler.java | 37 ++++++++++++++++--- .../ESBlobStoreRepositoryIntegTestCase.java | 5 ++- ...ESMockAPIBasedRepositoryIntegTestCase.java | 28 ++++++++++++-- 8 files changed, 131 insertions(+), 27 deletions(-) diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 5bc0d2684f1e3..e87699ff93ca5 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,7 +64,7 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new AzureHttpHandler("container")); + return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); } @Override @@ -115,6 +115,14 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint") + private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler { + + AzureBlobStoreHttpHandler(final String container) { + super(container); + } + } + /** * HTTP handler that injects random Azure service errors * diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9ff7192bda667..275b5c940520f 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.SuppressForbidden; @@ -77,8 +78,8 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { final Map handlers = new HashMap<>(2); - handlers.put("/", new GoogleCloudStorageHttpHandler("bucket")); - handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler()); + handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket")); + handlers.put("/token", new FakeOAuth2HttpHandler()); return Collections.unmodifiableMap(handlers); } @@ -186,6 +187,14 @@ long getLargeBlobThresholdInBytes() { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint") + private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler { + + GoogleCloudStorageBlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random Google Cloud Storage service errors * diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 5fb1ec8e3eb00..16dd4cdc27e0b 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -67,7 +67,7 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/bucket", new S3HttpHandler("bucket")); + return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket")); } @Override @@ -134,6 +134,14 @@ void ensureMultiPartUploadSize(long blobSize) { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint") + private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler { + + S3BlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random S3 service errors * diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index affd118122141..57c45ab40ec5c 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -36,9 +36,11 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -153,13 +155,32 @@ public void handle(final HttpExchange exchange) throws IOException { list.append(""); list.append(""); final String prefix = params.get("prefix"); + final Set blobPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } list.append(""); for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) { - list.append("").append(blob.getKey().replace("/" + container + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append("BlockBlob"); + if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) { + continue; + } + String blobPath = blob.getKey().replace("/" + container + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } } + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append("BlockBlob"); + } + if (blobPrefixes.isEmpty() == false) { + blobPrefixes.forEach(p -> list.append("").append(p).append("")); + } list.append(""); list.append(""); @@ -177,6 +198,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException { final Headers headers = exchange.getResponseHeaders(); headers.add("Content-Type", "application/xml"); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index dae02548e37d7..f3bd2ecfe3091 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.network.InetAddresses; @@ -64,7 +65,7 @@ @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageHttpHandler implements HttpHandler { - private final ConcurrentMap blobs; + private final ConcurrentMap blobs; private final String bucket; public GoogleCloudStorageHttpHandler(final String bucket) { @@ -86,7 +87,7 @@ public void handle(final HttpExchange exchange) throws IOException { final Set prefixes = new HashSet<>(); final List listOfBlobs = new ArrayList<>(); - for (final Map.Entry blob : blobs.entrySet()) { + for (final Map.Entry blob : blobs.entrySet()) { final String blobName = blob.getKey(); if (prefix.isEmpty() || blobName.startsWith(prefix)) { int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1; @@ -122,7 +123,7 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) { // Download Object https://cloud.google.com/storage/docs/request-body - BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); + BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); if (blob != null) { final String range = exchange.getRequestHeaders().getFirst("Range"); Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range); @@ -130,7 +131,7 @@ public void handle(final HttpExchange exchange) throws IOException { throw new AssertionError("Range bytes header does not match expected format: " + range); } - byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0]; + byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0]; exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); @@ -141,8 +142,8 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) { // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete int deletions = 0; - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); if (blob.getKey().equals(exchange.getRequestURI().toString())) { iterator.remove(); deletions++; @@ -209,12 +210,11 @@ public void handle(final HttpExchange exchange) throws IOException { RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); final String blobName = params.get("test_blob_name"); - byte[] blob = blobs.get(blobName).array(); - if (blob == null) { + if (blobs.containsKey(blobName) == false) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; } - + byte[] blob = BytesReference.toBytes(blobs.get(blobName)); final String range = exchange.getRequestHeaders().getFirst("Content-Range"); final Integer limit = getContentRangeLimit(range); final int start = getContentRangeStart(range); @@ -250,6 +250,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + private String httpServerUrl(final HttpExchange exchange) { final InetSocketAddress address = exchange.getLocalAddress(); return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 468035b828b6d..f4a467d314a27 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -41,10 +41,12 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; @@ -158,13 +160,34 @@ public void handle(final HttpExchange exchange) throws IOException { if (prefix != null) { list.append("").append(prefix).append(""); } + final Set commonPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) { - list.append(""); - list.append("").append(blob.getKey().replace("/" + bucket + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append(""); + if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) { + continue; + } + String blobPath = blob.getKey().replace("/" + bucket + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } } + list.append(""); + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append(""); + } + if (commonPrefixes.isEmpty() == false) { + list.append(""); + commonPrefixes.forEach(commonPrefix -> list.append("").append(commonPrefix).append("")); + list.append(""); + } list.append(""); @@ -241,6 +264,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + private static String multipartKey(final String uploadId, int partNumber) { return uploadId + "\n" + partNumber; } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 814550be5f899..68b4465f5d20b 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -42,7 +42,6 @@ import java.util.Locale; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -270,9 +269,11 @@ public void testIndicesDeletedFromRepository() throws Exception { assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index } } + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get()); } - protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { + protected void addRandomDocuments(String name, int numDocs) throws InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i)) diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index dc99cab8a28dc..82f052b57442c 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.MockHttpServer; @@ -41,13 +42,16 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; /** * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services. @@ -55,6 +59,14 @@ @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase { + /** + * A {@link HttpHandler} that allows to list stored blobs + */ + @SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service") + protected interface BlobStoreHttpHandler extends HttpHandler { + Map blobs(); + } + private static final byte[] BUFFER = new byte[1024]; private static HttpServer httpServer; @@ -81,7 +93,14 @@ public static void stopHttpServer() { @After public void tearDownHttpServer() { if (handlers != null) { - handlers.keySet().forEach(context -> httpServer.removeContext(context)); + for(Map.Entry handler : handlers.entrySet()) { + httpServer.removeContext(handler.getKey()); + if (handler.getValue() instanceof BlobStoreHttpHandler) { + List blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream() + .filter(blob -> blob.contains("index") == false).collect(Collectors.toList()); + assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0)); + } + } } } @@ -110,14 +129,17 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception { assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); - assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot") + final String snapshot = "snapshot"; + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot) .setWaitForCompletion(true).setIndices(index)); assertAcked(client().admin().indices().prepareDelete(index)); - assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true)); + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true)); ensureGreen(index); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get()); } protected static String httpServerUrl() {