Skip to content

Commit

Permalink
Adapt the Recovery API for closed indices (#38421)
Browse files Browse the repository at this point in the history
This commit adapts the Recovery API to make it work with 
shards of replicated closed indices.

Relates #33888
  • Loading branch information
tlrx authored Feb 25, 2019
1 parent 4fd1bb2 commit 4db7fd9
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,60 @@
\n
)+
$/
---
"Test cat recovery output for closed index":
- skip:
version: " - 7.99.99"
reason: closed indices are replicated starting version 8.0.0

- do:
indices.create:
index: index2
body:
settings:
index:
number_of_replicas: 0

- do:
indices.close:
index: index2
- is_true: acknowledged

- do:
cluster.health:
index: index2
wait_for_status: green

- do:
cat.recovery:
index: index2
h: i,s,t,ty,st,shost,thost,rep,snap,f,fr,fp,tf,b,br,bp,tb,to,tor,top

- match:
$body: |
/^
(
index2 \s+
\d \s+ # shard
(?:\d+ms|\d+(?:\.\d+)?s) \s+ # time in ms or seconds
existing_store \s+ # source type (always existing_store for closed indices)
done \s+ # stage
[-\w./]+ \s+ # source_host
[-\w./]+ \s+ # target_host
[-\w./]+ \s+ # repository
[-\w./]+ \s+ # snapshot
\d+ \s+ # files
\d+ \s+ # files_recovered
\d+\.\d+% \s+ # files_percent
\d+ \s+ # files_total
\d+ \s+ # bytes
\d+ \s+ # bytes_recovered
\d+\.\d+% \s+ # bytes_percent
\d+ \s+ # bytes_total
0 \s+ # translog_ops (always 0 for closed indices)
0 \s+ # translog_ops_recovered (always 0 for closed indices)
100\.0% # translog_ops_percent (always 100.0% for closed indices)
\n
)+
$/
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,56 @@
- gte: { test_1.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test for closed index":
- skip:
version: " - 7.99.99"
reason: closed indices are replicated starting version 8.0.0

- do:
indices.create:
index: test_2
body:
settings:
index:
number_of_replicas: 0

- do:
indices.close:
index: test_2
- is_true: acknowledged

- do:
cluster.health:
index: test_2
wait_for_status: green

- do:
indices.recovery:
index: [test_2]
human: true

- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test index name not matching":

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.recovery;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -47,7 +48,7 @@ public RecoveryRequest() {
* @param indices Comma-separated list of indices about which to gather recovery information
*/
public RecoveryRequest(String... indices) {
super(indices);
super(indices, IndicesOptions.STRICT_EXPAND_OPEN_CLOSED);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ protected ShardsIterator shards(ClusterState state, RecoveryRequest request, Str

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ protected BroadcastRequest(String[] indices) {
this.indices = indices;
}

protected BroadcastRequest(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices;
this.indicesOptions = indicesOptions;
}

@Override
public String[] indices() {
return indices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
Expand Down Expand Up @@ -209,24 +210,34 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception {
}

public void testReplicaRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.build());
ensureGreen(INDEX_NAME);

final int numOfDocs = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), numOfDocs)) {
waitForDocs(numOfDocs, indexer);
}

logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs);

logger.info("--> start node B");
String nodeB = internalCluster().startNode();
ensureGreen();
final boolean closedIndex = randomBoolean();
if (closedIndex) {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
ensureGreen(INDEX_NAME);
}

// force a shard recovery from nodeA to nodeB
logger.info("--> bump replica count");
client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put("number_of_replicas", 1)).execute().actionGet();
ensureGreen();
final String nodeB = internalCluster().startNode();
assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen(INDEX_NAME);

logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
final RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

// we should now have two total shards, one primary and one replica
List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
Expand All @@ -238,14 +249,27 @@ public void testReplicaRecovery() throws Exception {
assertThat(nodeBResponses.size(), equalTo(1));

// validate node A recovery
RecoveryState nodeARecoveryState = nodeAResponses.get(0);
assertRecoveryState(nodeARecoveryState, 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE, true, Stage.DONE, null, nodeA);
final RecoveryState nodeARecoveryState = nodeAResponses.get(0);
final RecoverySource expectedRecoverySource;
if (closedIndex == false) {
expectedRecoverySource = RecoverySource.EmptyStoreRecoverySource.INSTANCE;
} else {
expectedRecoverySource = RecoverySource.ExistingStoreRecoverySource.INSTANCE;
}
assertRecoveryState(nodeARecoveryState, 0, expectedRecoverySource, true, Stage.DONE, null, nodeA);
validateIndexRecoveryState(nodeARecoveryState.getIndex());

// validate node B recovery
RecoveryState nodeBRecoveryState = nodeBResponses.get(0);
final RecoveryState nodeBRecoveryState = nodeBResponses.get(0);
assertRecoveryState(nodeBRecoveryState, 0, PeerRecoverySource.INSTANCE, false, Stage.DONE, nodeA, nodeB);
validateIndexRecoveryState(nodeBRecoveryState.getIndex());

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));

if (closedIndex) {
assertAcked(client().admin().indices().prepareOpen(INDEX_NAME));
}
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs);
}

@TestLogging(
Expand Down

0 comments on commit 4db7fd9

Please sign in to comment.