diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 5bc0d2684f1e..e87699ff93ca 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,7 +64,7 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new AzureHttpHandler("container")); + return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); } @Override @@ -115,6 +115,14 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint") + private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler { + + AzureBlobStoreHttpHandler(final String container) { + super(container); + } + } + /** * HTTP handler that injects random Azure service errors * diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9ff7192bda66..275b5c940520 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.SuppressForbidden; @@ -77,8 +78,8 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { final Map handlers = new HashMap<>(2); - handlers.put("/", new GoogleCloudStorageHttpHandler("bucket")); - handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler()); + handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket")); + handlers.put("/token", new FakeOAuth2HttpHandler()); return Collections.unmodifiableMap(handlers); } @@ -186,6 +187,14 @@ long getLargeBlobThresholdInBytes() { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint") + private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler { + + GoogleCloudStorageBlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random Google Cloud Storage service errors * diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 5fb1ec8e3eb0..16dd4cdc27e0 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -67,7 +67,7 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/bucket", new S3HttpHandler("bucket")); + return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket")); } @Override @@ -134,6 +134,14 @@ void ensureMultiPartUploadSize(long blobSize) { } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint") + private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler { + + S3BlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random S3 service errors * diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index affd11812214..57c45ab40ec5 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -36,9 +36,11 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -153,13 +155,32 @@ public void handle(final HttpExchange exchange) throws IOException { list.append(""); list.append(""); final String prefix = params.get("prefix"); + final Set blobPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } list.append(""); for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) { - list.append("").append(blob.getKey().replace("/" + container + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append("BlockBlob"); + if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) { + continue; + } + String blobPath = blob.getKey().replace("/" + container + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } } + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append("BlockBlob"); + } + if (blobPrefixes.isEmpty() == false) { + blobPrefixes.forEach(p -> list.append("").append(p).append("")); + } list.append(""); list.append(""); @@ -177,6 +198,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException { final Headers headers = exchange.getResponseHeaders(); headers.add("Content-Type", "application/xml"); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index dae02548e37d..f3bd2ecfe309 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.network.InetAddresses; @@ -64,7 +65,7 @@ @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageHttpHandler implements HttpHandler { - private final ConcurrentMap blobs; + private final ConcurrentMap blobs; private final String bucket; public GoogleCloudStorageHttpHandler(final String bucket) { @@ -86,7 +87,7 @@ public void handle(final HttpExchange exchange) throws IOException { final Set prefixes = new HashSet<>(); final List listOfBlobs = new ArrayList<>(); - for (final Map.Entry blob : blobs.entrySet()) { + for (final Map.Entry blob : blobs.entrySet()) { final String blobName = blob.getKey(); if (prefix.isEmpty() || blobName.startsWith(prefix)) { int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1; @@ -122,7 +123,7 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) { // Download Object https://cloud.google.com/storage/docs/request-body - BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); + BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); if (blob != null) { final String range = exchange.getRequestHeaders().getFirst("Range"); Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range); @@ -130,7 +131,7 @@ public void handle(final HttpExchange exchange) throws IOException { throw new AssertionError("Range bytes header does not match expected format: " + range); } - byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0]; + byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0]; exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); @@ -141,8 +142,8 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) { // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete int deletions = 0; - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); if (blob.getKey().equals(exchange.getRequestURI().toString())) { iterator.remove(); deletions++; @@ -209,12 +210,11 @@ public void handle(final HttpExchange exchange) throws IOException { RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); final String blobName = params.get("test_blob_name"); - byte[] blob = blobs.get(blobName).array(); - if (blob == null) { + if (blobs.containsKey(blobName) == false) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; } - + byte[] blob = BytesReference.toBytes(blobs.get(blobName)); final String range = exchange.getRequestHeaders().getFirst("Content-Range"); final Integer limit = getContentRangeLimit(range); final int start = getContentRangeStart(range); @@ -250,6 +250,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + private String httpServerUrl(final HttpExchange exchange) { final InetSocketAddress address = exchange.getLocalAddress(); return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 468035b828b6..f4a467d314a2 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -41,10 +41,12 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; @@ -158,13 +160,34 @@ public void handle(final HttpExchange exchange) throws IOException { if (prefix != null) { list.append("").append(prefix).append(""); } + final Set commonPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) { - list.append(""); - list.append("").append(blob.getKey().replace("/" + bucket + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append(""); + if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) { + continue; + } + String blobPath = blob.getKey().replace("/" + bucket + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } } + list.append(""); + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append(""); + } + if (commonPrefixes.isEmpty() == false) { + list.append(""); + commonPrefixes.forEach(commonPrefix -> list.append("").append(commonPrefix).append("")); + list.append(""); + } list.append(""); @@ -241,6 +264,10 @@ public void handle(final HttpExchange exchange) throws IOException { } } + public Map blobs() { + return blobs; + } + private static String multipartKey(final String uploadId, int partNumber) { return uploadId + "\n" + partNumber; } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 814550be5f89..68b4465f5d20 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -42,7 +42,6 @@ import java.util.Locale; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -270,9 +269,11 @@ public void testIndicesDeletedFromRepository() throws Exception { assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index } } + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get()); } - protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { + protected void addRandomDocuments(String name, int numDocs) throws InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i)) diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index dc99cab8a28d..82f052b57442 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.MockHttpServer; @@ -41,13 +42,16 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; /** * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services. @@ -55,6 +59,14 @@ @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase { + /** + * A {@link HttpHandler} that allows to list stored blobs + */ + @SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service") + protected interface BlobStoreHttpHandler extends HttpHandler { + Map blobs(); + } + private static final byte[] BUFFER = new byte[1024]; private static HttpServer httpServer; @@ -81,7 +93,14 @@ public static void stopHttpServer() { @After public void tearDownHttpServer() { if (handlers != null) { - handlers.keySet().forEach(context -> httpServer.removeContext(context)); + for(Map.Entry handler : handlers.entrySet()) { + httpServer.removeContext(handler.getKey()); + if (handler.getValue() instanceof BlobStoreHttpHandler) { + List blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream() + .filter(blob -> blob.contains("index") == false).collect(Collectors.toList()); + assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0)); + } + } } } @@ -110,14 +129,17 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception { assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); - assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot") + final String snapshot = "snapshot"; + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot) .setWaitForCompletion(true).setIndices(index)); assertAcked(client().admin().indices().prepareDelete(index)); - assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true)); + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true)); ensureGreen(index); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get()); } protected static String httpServerUrl() {