Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Fix SnapshotBasedRecoveryIT#testSnapshotBasedRecovery (#77134) #78434

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@

package org.elasticsearch.upgrades;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -26,11 +31,13 @@

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.elasticsearch.upgrades.AbstractRollingTestCase.ClusterType.MIXED;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;

public class SnapshotBasedRecoveryIT extends AbstractRollingTestCase {
public void testSnapshotBasedRecovery() throws Exception {
Expand Down Expand Up @@ -66,17 +73,41 @@ public void testSnapshotBasedRecovery() throws Exception {
break;
case MIXED:
case UPGRADED:
// the following `if` for first round mixed was added as a selective test mute. Sometimes the primary shard ends
// on the upgraded node. This causes issues when removing and adding replicas, since then we cannot allocate to
// any of the old nodes. That is an issue only for the first mixed round, hence this check.
// Ideally we would find the reason the primary ends on the upgraded node and fix that (or figure out that it
// is all good).
// @AwaitsFix(bugUrl = https://github.com/elastic/elasticsearch/issues/76595)
if (CLUSTER_TYPE != MIXED || FIRST_MIXED_ROUND == false) {
// Drop replicas
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0));
if (FIRST_MIXED_ROUND) {
String upgradedNodeId = getUpgradedNodeId();

if (upgradedNodeId != null) {
updateIndexSettings(
indexName,
Settings.builder()
.put("index.routing.allocation.exclude._id", upgradedNodeId)
);
}

String primaryNodeId = getPrimaryNodeIdOfShard(indexName, 0);
Version primaryNodeVersion = getNodeVersion(primaryNodeId);

// Sometimes the primary shard ends on the upgraded node (i.e. after a rebalance)
// This causes issues when removing and adding replicas, since then we cannot allocate to any of the old nodes.
// That is an issue only for the first mixed round.
// In that case we exclude the upgraded node from the shard allocation and cancel the shard to force moving
// the primary to a node in the old version, this allows adding replicas in the first mixed round.
if (primaryNodeVersion.after(UPGRADE_FROM_VERSION)) {
cancelShard(indexName, 0, primaryNodeId);

String currentPrimaryNodeId = getPrimaryNodeIdOfShard(indexName, 0);
assertThat(getNodeVersion(currentPrimaryNodeId), is(equalTo(UPGRADE_FROM_VERSION)));
}
} else {
updateIndexSettings(
indexName,
Settings.builder()
.putNull("index.routing.allocation.exclude._id")
);
}
ensureGreen(indexName);

// Drop replicas
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0));

updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1));
ensureGreen(indexName);
Expand All @@ -88,6 +119,81 @@ public void testSnapshotBasedRecovery() throws Exception {
}
}

@Nullable
private String getUpgradedNodeId() throws IOException {
Request request = new Request(HttpGet.METHOD_NAME, "_nodes/_all");
Response response = client().performRequest(request);
Map<String, Object> responseMap = responseAsMap(response);
Map<String, Map<String, Object>> nodes = extractValue(responseMap, "nodes");
for (Map.Entry<String, Map<String, Object>> nodeInfoEntry : nodes.entrySet()) {
Version nodeVersion = Version.fromString(extractValue(nodeInfoEntry.getValue(), "version"));
if (nodeVersion.after(UPGRADE_FROM_VERSION)) {
return nodeInfoEntry.getKey();
}
}
return null;
}

private Version getNodeVersion(String primaryNodeId) throws IOException {
Request request = new Request(HttpGet.METHOD_NAME, "_nodes/" + primaryNodeId);
Response response = client().performRequest(request);
String nodeVersion = extractValue(responseAsMap(response), "nodes." + primaryNodeId + ".version");
return Version.fromString(nodeVersion);
}

private String getPrimaryNodeIdOfShard(String indexName, int shard) throws Exception {
String primaryNodeId;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.field("index", indexName);
builder.field("shard", shard);
builder.field("primary", true);
}
builder.endObject();

Request request = new Request(HttpGet.METHOD_NAME, "_cluster/allocation/explain");
request.setJsonEntity(Strings.toString(builder));

Response response = client().performRequest(request);
Map<String, Object> responseMap = responseAsMap(response);
primaryNodeId = extractValue(responseMap, "current_node.id");
}
assertThat(primaryNodeId, is(notNullValue()));

return primaryNodeId;
}

private void cancelShard(String indexName, int shard, String nodeName) throws IOException {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startArray("commands");
{
builder.startObject();
{
builder.startObject("cancel");
{
builder.field("index", indexName);
builder.field("shard", shard);
builder.field("node", nodeName);
builder.field("allow_primary", true);
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();

Request request = new Request(HttpPost.METHOD_NAME, "/_cluster/reroute");
request.setJsonEntity(Strings.toString(builder));
Response response = client().performRequest(request);
assertOK(response);
}
}

private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
Expand Down