Skip to content

Commit

Permalink
HttpHandlers should return correct list of objects (#49283)
Browse files Browse the repository at this point in the history
This commit fixes the server side logic of "List Objects" operations
of Azure and S3 fixtures. Until today, the fixtures were returning a "
flat" view of stored objects and were not correctly handling the
delimiter parameter. This causes some objects listing to be wrongly
interpreted by the snapshot deletion logic in Elasticsearch which
relies on the ability to list child containers of BlobContainer (#42653)
to correctly delete stale indices.

As a consequence, the blobs were not correctly deleted from the
 emulated storage service and stayed in heap until they got garbage
collected, causing CI failures like #48978.

This commit fixes the server side logic of Azure and S3 fixture when
listing objects so that it now return correct common blob prefixes as
expected by the snapshot deletion process. It also adds an after-test
check to ensure that tests leave the repository empty (besides the
root index files).

Closes #48978
  • Loading branch information
tlrx committed Nov 20, 2019
1 parent e167ae8 commit 58420ce
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/container", new AzureHttpHandler("container"));
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
}

@Override
Expand Down Expand Up @@ -115,6 +115,14 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {

AzureBlobStoreHttpHandler(final String container) {
super(container);
}
}

/**
* HTTP handler that injects random Azure service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -77,8 +78,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
final Map<String, HttpHandler> handlers = new HashMap<>(2);
handlers.put("/", new GoogleCloudStorageHttpHandler("bucket"));
handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler());
handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"));
handlers.put("/token", new FakeOAuth2HttpHandler());
return Collections.unmodifiableMap(handlers);
}

Expand Down Expand Up @@ -186,6 +187,14 @@ long getLargeBlobThresholdInBytes() {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint")
private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler {

GoogleCloudStorageBlobStoreHttpHandler(final String bucket) {
super(bucket);
}
}

/**
* HTTP handler that injects random Google Cloud Storage service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
}

@Override
Expand Down Expand Up @@ -134,6 +134,14 @@ void ensureMultiPartUploadSize(long blobSize) {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {

S3BlobStoreHttpHandler(final String bucket) {
super(bucket);
}
}

/**
* HTTP handler that injects random S3 service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -153,13 +155,32 @@ public void handle(final HttpExchange exchange) throws IOException {
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<EnumerationResults>");
final String prefix = params.get("prefix");
final Set<String> blobPrefixes = new HashSet<>();
final String delimiter = params.get("delimiter");
if (delimiter != null) {
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
}
list.append("<Blobs>");
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) {
continue;
}
String blobPath = blob.getKey().replace("/" + container + "/", "");
if (delimiter != null) {
int fromIndex = (prefix != null ? prefix.length() : 0);
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
if (delimiterPosition > 0) {
blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
continue;
}
}
list.append("<Blob><Name>").append(blobPath).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
}
if (blobPrefixes.isEmpty() == false) {
blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));

}
list.append("</Blobs>");
list.append("</EnumerationResults>");
Expand All @@ -177,6 +198,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.InetAddresses;
Expand Down Expand Up @@ -64,7 +65,7 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {

private final ConcurrentMap<String, BytesArray> blobs;
private final ConcurrentMap<String, BytesReference> blobs;
private final String bucket;

public GoogleCloudStorageHttpHandler(final String bucket) {
Expand All @@ -86,7 +87,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final Set<String> prefixes = new HashSet<>();
final List<String> listOfBlobs = new ArrayList<>();

for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
for (final Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
final String blobName = blob.getKey();
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
Expand Down Expand Up @@ -122,15 +123,15 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
// Download Object https://cloud.google.com/storage/docs/request-body
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
if (matcher.find() == false) {
throw new AssertionError("Range bytes header does not match expected format: " + range);
}

byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
Expand All @@ -141,8 +142,8 @@ public void handle(final HttpExchange exchange) throws IOException {
} else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
// Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
int deletions = 0;
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesArray> blob = iterator.next();
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
Expand Down Expand Up @@ -209,12 +210,11 @@ public void handle(final HttpExchange exchange) throws IOException {
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);

final String blobName = params.get("test_blob_name");
byte[] blob = blobs.get(blobName).array();
if (blob == null) {
if (blobs.containsKey(blobName) == false) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
return;
}

byte[] blob = BytesReference.toBytes(blobs.get(blobName));
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
final Integer limit = getContentRangeLimit(range);
final int start = getContentRangeStart(range);
Expand Down Expand Up @@ -250,6 +250,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

private String httpServerUrl(final HttpExchange exchange) {
final InetSocketAddress address = exchange.getLocalAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -158,13 +160,34 @@ public void handle(final HttpExchange exchange) throws IOException {
if (prefix != null) {
list.append("<Prefix>").append(prefix).append("</Prefix>");
}
final Set<String> commonPrefixes = new HashSet<>();
final String delimiter = params.get("delimiter");
if (delimiter != null) {
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
}
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
list.append("<Contents>");
list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) {
continue;
}
String blobPath = blob.getKey().replace("/" + bucket + "/", "");
if (delimiter != null) {
int fromIndex = (prefix != null ? prefix.length() : 0);
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
if (delimiterPosition > 0) {
commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
continue;
}
}
list.append("<Contents>");
list.append("<Key>").append(blobPath).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
}
if (commonPrefixes.isEmpty() == false) {
list.append("<CommonPrefixes>");
commonPrefixes.forEach(commonPrefix -> list.append("<Prefix>").append(commonPrefix).append("</Prefix>"));
list.append("</CommonPrefixes>");

}
list.append("</ListBucketResult>");

Expand Down Expand Up @@ -241,6 +264,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

private static String multipartKey(final String uploadId, int partNumber) {
return uploadId + "\n" + partNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -270,9 +269,11 @@ public void testIndicesDeletedFromRepository() throws Exception {
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
}
}

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get());
}

protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockHttpServer;
Expand All @@ -41,20 +42,31 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

/**
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {

/**
* A {@link HttpHandler} that allows to list stored blobs
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service")
protected interface BlobStoreHttpHandler extends HttpHandler {
Map<String, BytesReference> blobs();
}

private static final byte[] BUFFER = new byte[1024];

private static HttpServer httpServer;
Expand All @@ -81,7 +93,14 @@ public static void stopHttpServer() {
@After
public void tearDownHttpServer() {
if (handlers != null) {
handlers.keySet().forEach(context -> httpServer.removeContext(context));
for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
httpServer.removeContext(handler.getKey());
if (handler.getValue() instanceof BlobStoreHttpHandler) {
List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
.filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
}
}
}
}

Expand Down Expand Up @@ -110,14 +129,17 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception {
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
final String snapshot = "snapshot";
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
}

protected static String httpServerUrl() {
Expand Down

0 comments on commit 58420ce

Please sign in to comment.