Skip to content

Commit

Permalink
Add test for persistent cache clean up after relocation (#72682)
Browse files Browse the repository at this point in the history
This commit adds a test to verify that the persistent cache is 
correctly cleaned up after a shard is relocated. I've been 
surprised to see we don't have integ tests for this.
  • Loading branch information
tlrx authored May 6, 2021
1 parent 051bbb2 commit c9ca64c
Showing 1 changed file with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@
package org.elasticsearch.xpack.searchablesnapshots.cache.full;

import org.apache.lucene.document.Document;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -30,6 +36,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -154,4 +161,111 @@ public Settings onNodeStopped(String nodeName) {
assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L));
});
}

public void testPersistentCacheCleanUpAfterRelocation() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().ensureAtLeastNumDataNodes(3);
ensureStableCluster(cluster().size());

final String prefix = getTestName().toLowerCase(Locale.ROOT) + '-';

final String fsRepoName = prefix + "repository";
createRepository(fsRepoName, FsRepository.TYPE);

final String indexName = prefix + "index";
createAndPopulateIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
);

final String snapshotName = prefix + "snapshot";
createFullSnapshot(fsRepoName, snapshotName);

assertAcked(client().admin().indices().prepareDelete(prefix + '*'));

final String mountedIndexName = mountSnapshot(fsRepoName, snapshotName, indexName, Settings.EMPTY);

assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(mountedIndexName).get();
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(mountedIndexName));
assertTrue(
recoveryResponse.shardRecoveryStates()
.get(mountedIndexName)
.stream()
.allMatch(recoveryState -> recoveryState.getStage() == RecoveryState.Stage.DONE)
);

final ClusterStateResponse state = client().admin()
.cluster()
.prepareState()
.clear()
.setRoutingTable(true)
.setMetadata(true)
.setIndices(mountedIndexName)
.get();
final Index mountedIndex = state.getState().metadata().index(mountedIndexName).getIndex();

final Set<DiscoveryNode> dataNodes = new HashSet<>();
for (DiscoveryNode node : getDiscoveryNodes()) {
if (node.getRoles().stream().anyMatch(DiscoveryNodeRole::canContainData)) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node.getName());
if (indicesService.hasIndex(mountedIndex)) {
assertBusy(() -> {
CacheService cacheService = internalCluster().getInstance(CacheService.class, node.getName());
cacheService.synchronizeCache();

assertThat(cacheService.getPersistentCache().getNumDocs(), greaterThan(0L));
dataNodes.add(node);
});
}
}
}

final DiscoveryNode excludedDataNode = randomFrom(dataNodes);
logger.info("--> relocating mounted index {} away from {}", mountedIndex, excludedDataNode);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(mountedIndexName)
.setSettings(Settings.builder().put(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", excludedDataNode.getId()))
.get()
);

ensureGreen(mountedIndexName);

recoveryResponse = client().admin().indices().prepareRecoveries(mountedIndexName).get();
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(mountedIndexName));
assertTrue(
recoveryResponse.shardRecoveryStates()
.get(mountedIndexName)
.stream()
.allMatch(recoveryState -> recoveryState.getStage() == RecoveryState.Stage.DONE)
);

assertBusy(() -> {
for (DiscoveryNode dataNode : dataNodes) {
CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode.getName());
cacheService.synchronizeCache();

assertThat(
cacheService.getPersistentCache().getNumDocs(),
dataNode.equals(excludedDataNode) ? equalTo(0L) : greaterThan(0L)
);
}
});

logger.info("--> deleting mounted index {}", mountedIndex);
assertAcked(client().admin().indices().prepareDelete(mountedIndexName));

assertBusy(() -> {
for (CacheService cacheService : internalCluster().getDataNodeInstances(CacheService.class)) {
cacheService.synchronizeCache();
assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L));
}
});
}
}

0 comments on commit c9ca64c

Please sign in to comment.