Skip to content

Commit

Permalink
Add peer recoveries using snapshot files when possible
Browse files Browse the repository at this point in the history
This commit adds peer recoveries from snapshots. It allows establishing a replica by downloading file data from a snapshot rather than transferring the data from the primary.

Enabling this feature is done on the repository definition. Repositories having the setting `use_for_peer_recovery=true` will be consulted to find a good snapshot when recovering a shard.

Relates elastic#73496
Backport of elastic#76237
  • Loading branch information
fcofdez committed Aug 13, 2021
1 parent 6a6706f commit d6aad93
Show file tree
Hide file tree
Showing 38 changed files with 2,570 additions and 132 deletions.
14 changes: 13 additions & 1 deletion docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,21 @@ When `indices.recovery.use_snapshots` is `false` {es} will construct this new
copy by transferring the index data from the current primary. When this setting
is `true` {es} will attempt to copy the index data from a recent snapshot
first, and will only copy data from the primary if it cannot identify a
suitable snapshot.
suitable snapshot. Defaults to `true`.
+
Setting this option to `true` reduces your operating costs if your cluster runs
in an environment where the node-to-node data transfer costs are higher than
the costs of recovering data from a snapshot. It also reduces the amount of
work that the primary must do during a recovery.
+
Additionally, repositories having the setting `use_for_peer_recovery=true`
will be consulted to find a good snapshot when recovering a shard. If none
of the registered repositories have this setting defined, index files will
be recovered from the source node.

`indices.recovery.max_concurrent_snapshot_file_downloads`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of snapshot file downloads requests
sent in parallel to the target node for each recovery. Defaults to `5`.
+
Do not increase this setting without carefully verifying that your cluster has
the resources available to handle the extra load that will result.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.upgrades;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;

public class SnapshotBasedRecoveryIT extends AbstractRollingTestCase {
public void testSnapshotBasedRecovery() throws Exception {
final String indexName = "snapshot_based_recovery";
final String repositoryName = "snapshot_based_recovery_repo";
final int numDocs = 200;
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
createIndex(indexName, settings.build());
ensureGreen(indexName);
indexDocs(indexName, numDocs);
flush(indexName, true);

registerRepository(
repositoryName,
"fs",
true,
Settings.builder()
.put("location", "./snapshot_based_recovery")
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), true)
.build()
);

createSnapshot(repositoryName, "snap", true);

updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1));
ensureGreen(indexName);
break;
case MIXED:
case UPGRADED:
// Drop replicas
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0));
ensureGreen(indexName);

updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1));
ensureGreen(indexName);
assertMatchAllReturnsAllDocuments(indexName, numDocs);
assertMatchQueryReturnsAllDocuments(indexName, numDocs);
break;
default:
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
}

private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
}

private void assertMatchQueryReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}

private static Map<String, Object> search(String index, QueryBuilder query) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_search");
request.setJsonEntity(new SearchSourceBuilder().trackTotalHits(true).query(query).toString());

final Response response = client().performRequest(request);
assertOK(response);

final Map<String, Object> responseAsMap = responseAsMap(response);
assertThat(
extractValue(responseAsMap, "_shards.failed"),
equalTo(0)
);
return responseAsMap;
}

private void indexDocs(String indexName, int numDocs) throws IOException {
final StringBuilder bulkBody = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Some text ").append(i).append("\"}\n");
}

final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
documents.addParameter("refresh", "true");
documents.setJsonEntity(bulkBody.toString());
assertOK(client().performRequest(documents));
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
}
2 changes: 2 additions & 0 deletions qa/snapshot-based-recoveries/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.internal-test-artifact'
36 changes: 36 additions & 0 deletions qa/snapshot-based-recoveries/fs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

apply plugin: 'elasticsearch.java-rest-test'
apply plugin: 'elasticsearch.rest-resources'

dependencies {
javaRestTestImplementation(testArtifact(project(':qa:snapshot-based-recoveries')))
}

final File repoDir = file("$buildDir/testclusters/snapshot-recoveries-repo")

restResources {
restApi {
include 'indices', 'search', 'bulk', 'snapshot'
}
}

tasks.withType(Test).configureEach {
doFirst {
delete(repoDir)
}
systemProperty 'tests.path.repo', repoDir
}

testClusters.all {
testDistribution = 'DEFAULT'
numberOfNodes = 3
setting 'path.repo', repoDir.absolutePath
setting 'xpack.security.enabled', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.recovery;

import org.elasticsearch.common.settings.Settings;

public class FsSnapshotBasedRecoveryIT extends AbstractSnapshotBasedRecoveryRestTestCase {

@Override
protected String repositoryType() {
return "fs";
}

@Override
protected Settings repositorySettings() {
return Settings.builder()
.put("location", System.getProperty("tests.path.repo"))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.recovery;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;

public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase {
private static final String REPOSITORY_NAME = "repository";
private static final String SNAPSHOT_NAME = "snapshot-for-recovery";

protected abstract String repositoryType();

protected abstract Settings repositorySettings();

public void testRecoveryUsingSnapshots() throws Exception {
final String repositoryType = repositoryType();
Settings repositorySettings = Settings.builder().put(repositorySettings())
.put(BlobStoreRepository.USE_FOR_PEER_RECOVERY_SETTING.getKey(), true)
.build();

registerRepository(REPOSITORY_NAME, repositoryType, true, repositorySettings);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(indexName);

final int numDocs = randomIntBetween(1, 500);
indexDocs(indexName, numDocs);

forceMerge(indexName, randomBoolean(), randomBoolean());

deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);
createSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);

// Add a new replica
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(indexName);

for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false);
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
}

private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_forcemerge");
request.addParameter("only_expunge_deletes", Boolean.toString(onlyExpungeDeletes));
request.addParameter("flush", Boolean.toString(flush));
assertOK(client().performRequest(request));
}

private void indexDocs(String indexName, int numDocs) throws IOException {
final StringBuilder bulkBody = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Some text ").append(i).append("\"}\n");
}

final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
documents.addParameter("refresh", Boolean.TRUE.toString());
documents.setJsonEntity(bulkBody.toString());
assertOK(client().performRequest(documents));
}

private static Map<String, Object> search(String index, QueryBuilder query) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, '/' + index + "/_search");
request.setJsonEntity(new SearchSourceBuilder().trackTotalHits(true).query(query).toString());

final Response response = client().performRequest(request);
assertOK(response);

final Map<String, Object> responseAsMap = responseAsMap(response);
assertThat(
extractValue(responseAsMap, "_shards.failed"),
equalTo(0)
);
return responseAsMap;
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
}
Loading

0 comments on commit d6aad93

Please sign in to comment.