Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats more robust #64976

Merged
merged 2 commits into from
Nov 12, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -63,12 +65,14 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -465,12 +469,12 @@ public void testFollowerMappingIsUpdated() throws IOException {

public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 2);
final int numberOfShards = randomIntBetween(1, 5);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
.setSource(getIndexSettings(numberOfShards, 0,
Map.of(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.ZERO.getStringRep())), XContentType.JSON));

final int numDocs = scaledRandomIntBetween(0, 1_000);
final int numDocs = scaledRandomIntBetween(0, 500);
if (numDocs > 0) {
final BulkRequestBuilder bulkRequest = leaderClient().prepareBulk(leaderIndex);
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -513,69 +517,62 @@ public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats()
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
}

final CountDownLatch blockCcrRestore = new CountDownLatch(1);

final List<MockTransportService> transportServices = new ArrayList<>();
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
final MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
try {
blockCcrRestore.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
final String followerIndex = "follower";
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);

final Map<Integer, Long> fetchedSnapshotShardSizes = new ConcurrentHashMap<>();

final PlainActionFuture<Void> waitForRestoreInProgress = PlainActionFuture.newFuture();
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
final ClusterStateListener listener = event -> {
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
if (restoreInProgress != null
&& restoreInProgress.isEmpty() == false
&& event.state().routingTable().hasIndex(followerIndex)) {
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex);
for (ShardRouting shardRouting : indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shardRouting.unassignedInfo().getLastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) {
try {
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
assertBusy(() -> {
final Long snapshotShardSize = snapshotsInfoService.snapshotShardSizes().getShardSize(shardRouting);
assertThat(snapshotShardSize, notNullValue());
fetchedSnapshotShardSizes.put(shardRouting.getId(), snapshotShardSize);
}, 30L, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError("Failed to retrieve snapshot shard size for shard " + shardRouting, e);
}
}
}
connection.sendRequest(requestId, action, request, options);
});
transportServices.add(mockTransportService);
}

try {
final String followerIndex = "follower";
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

final PlainActionFuture<IndexRoutingTable> waitForRestoreInProgress = PlainActionFuture.newFuture();
final ClusterStateListener listener = event -> {
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
if (restoreInProgress != null
&& restoreInProgress.isEmpty() == false
&& event.state().routingTable().hasIndex(followerIndex)) {
waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex));
}
};
clusterService.addListener(listener);

final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());

final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
clusterService.removeListener(listener);

final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
assertBusy(() -> {
SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
for (int shardId = 0; shardId < numberOfShards; shardId++) {
Long snapshotShardSize = snapshotShardSizeInfo.getShardSize(indexRoutingTable.shard(shardId).primaryShard());
assertThat(snapshotShardSize,
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
logger.info("--> [{}/{}] snapshot shard sizes fetched", fetchedSnapshotShardSizes.size(), numberOfShards);
if (fetchedSnapshotShardSizes.size() == numberOfShards) {
waitForRestoreInProgress.onResponse(null);
}
}, 60L, TimeUnit.SECONDS);
}
};
clusterService.addListener(listener);

blockCcrRestore.countDown();
ensureFollowerGreen(followerIndex);
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());

waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
clusterService.removeListener(listener);
ensureFollowerGreen(followerIndex);

assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
} finally {
transportServices.forEach(MockTransportService::clearAllRules);
for (int shardId = 0; shardId < numberOfShards; shardId++) {
assertThat("Snapshot shard size fetched for follower shard [" + shardId + "] does not match leader store size",
fetchedSnapshotShardSizes.get(shardId),
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
}

assertHitCount(followerClient().prepareSearch(followerIndex).setSize(0).get(), numDocs);
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
}

public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
Expand Down