Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpHandlers should return correct list of objects #49283

Merged
merged 2 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/container", new AzureHttpHandler("container"));
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
}

@Override
Expand Down Expand Up @@ -115,6 +115,14 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {

AzureBlobStoreHttpHandler(final String container) {
super(container);
}
}

/**
* HTTP handler that injects random Azure service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Map.of(
"/", new GoogleCloudStorageHttpHandler("bucket"),
"/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"),
"/token", new FakeOAuth2HttpHandler()
);
}
Expand Down Expand Up @@ -186,6 +186,14 @@ long getLargeBlobThresholdInBytes() {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint")
private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler {

GoogleCloudStorageBlobStoreHttpHandler(final String bucket) {
super(bucket);
}
}

/**
* HTTP handler that injects random Google Cloud Storage service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
}

@Override
Expand Down Expand Up @@ -134,6 +134,14 @@ void ensureMultiPartUploadSize(long blobSize) {
}
}

@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {

S3BlobStoreHttpHandler(final String bucket) {
super(bucket);
}
}

/**
* HTTP handler that injects random S3 service errors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -153,13 +155,32 @@ public void handle(final HttpExchange exchange) throws IOException {
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<EnumerationResults>");
final String prefix = params.get("prefix");
final Set<String> blobPrefixes = new HashSet<>();
final String delimiter = params.get("delimiter");
if (delimiter != null) {
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
}
list.append("<Blobs>");
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) {
continue;
}
String blobPath = blob.getKey().replace("/" + container + "/", "");
if (delimiter != null) {
int fromIndex = (prefix != null ? prefix.length() : 0);
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
if (delimiterPosition > 0) {
blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
continue;
}
}
list.append("<Blob><Name>").append(blobPath).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
}
if (blobPrefixes.isEmpty() == false) {
blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));

}
list.append("</Blobs>");
list.append("</EnumerationResults>");
Expand All @@ -177,6 +198,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.InetAddresses;
Expand Down Expand Up @@ -64,7 +65,7 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {

private final ConcurrentMap<String, BytesArray> blobs;
private final ConcurrentMap<String, BytesReference> blobs;
private final String bucket;

public GoogleCloudStorageHttpHandler(final String bucket) {
Expand All @@ -86,7 +87,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final Set<String> prefixes = new HashSet<>();
final List<String> listOfBlobs = new ArrayList<>();

for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
for (final Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
final String blobName = blob.getKey();
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
Expand Down Expand Up @@ -122,15 +123,15 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
// Download Object https://cloud.google.com/storage/docs/request-body
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
if (matcher.find() == false) {
throw new AssertionError("Range bytes header does not match expected format: " + range);
}

byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
Expand All @@ -141,8 +142,8 @@ public void handle(final HttpExchange exchange) throws IOException {
} else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
// Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
int deletions = 0;
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesArray> blob = iterator.next();
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
Expand Down Expand Up @@ -208,12 +209,11 @@ public void handle(final HttpExchange exchange) throws IOException {
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);

final String blobName = params.get("test_blob_name");
byte[] blob = blobs.get(blobName).array();
if (blob == null) {
if (blobs.containsKey(blobName) == false) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
return;
}

byte[] blob = BytesReference.toBytes(blobs.get(blobName));
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
final Integer limit = getContentRangeLimit(range);
final int start = getContentRangeStart(range);
Expand Down Expand Up @@ -249,6 +249,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

private String httpServerUrl(final HttpExchange exchange) {
final InetSocketAddress address = exchange.getLocalAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -158,13 +160,34 @@ public void handle(final HttpExchange exchange) throws IOException {
if (prefix != null) {
list.append("<Prefix>").append(prefix).append("</Prefix>");
}
final Set<String> commonPrefixes = new HashSet<>();
final String delimiter = params.get("delimiter");
if (delimiter != null) {
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
}
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
list.append("<Contents>");
list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) {
continue;
}
String blobPath = blob.getKey().replace("/" + bucket + "/", "");
if (delimiter != null) {
int fromIndex = (prefix != null ? prefix.length() : 0);
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
if (delimiterPosition > 0) {
commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
continue;
}
}
list.append("<Contents>");
list.append("<Key>").append(blobPath).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
}
if (commonPrefixes.isEmpty() == false) {
list.append("<CommonPrefixes>");
commonPrefixes.forEach(commonPrefix -> list.append("<Prefix>").append(commonPrefix).append("</Prefix>"));
list.append("</CommonPrefixes>");

}
list.append("</ListBucketResult>");

Expand Down Expand Up @@ -241,6 +264,10 @@ public void handle(final HttpExchange exchange) throws IOException {
}
}

public Map<String, BytesReference> blobs() {
return blobs;
}

private static String multipartKey(final String uploadId, int partNumber) {
return uploadId + "\n" + partNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -272,9 +271,11 @@ public void testIndicesDeletedFromRepository() throws Exception {
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
}
}

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get());
}

protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex(name).setId(Integer.toString(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockHttpServer;
Expand All @@ -41,20 +42,31 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

/**
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {

/**
* A {@link HttpHandler} that allows to list stored blobs
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service")
protected interface BlobStoreHttpHandler extends HttpHandler {
Map<String, BytesReference> blobs();
}

private static final byte[] BUFFER = new byte[1024];

private static HttpServer httpServer;
Expand All @@ -81,7 +93,14 @@ public static void stopHttpServer() {
@After
public void tearDownHttpServer() {
if (handlers != null) {
handlers.keySet().forEach(context -> httpServer.removeContext(context));
for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
httpServer.removeContext(handler.getKey());
if (handler.getValue() instanceof BlobStoreHttpHandler) {
List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
.filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
}
}
}
}

Expand Down Expand Up @@ -110,14 +129,17 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception {
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
final String snapshot = "snapshot";
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
}

protected static String httpServerUrl() {
Expand Down