Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IndexFoldersDeletionListenerIT.testListenersInvokedWhenIndexIsRelocated (#66329) #66511

Merged
merged 2 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.Environment;
Expand All @@ -45,6 +46,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
Expand All @@ -56,7 +58,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class IndexFoldersDeletionListenerIT extends ESIntegTestCase {

@Override
Expand All @@ -67,26 +69,37 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testListenersInvokedWhenIndexIsDeleted() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
ensureStableCluster(2 + 1, masterNode);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2 * between(1, 2))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1))
.build());

final NumShards numShards = getNumShards(indexName);
ensureClusterSizeConsistency(); // wait for a stable cluster
ensureGreen(indexName); // wait for no relocation

final ClusterState clusterState = clusterService().state();
assertFalse(client().admin().cluster().prepareHealth()
.setIndices(indexName)
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get()
.isTimedOut());

final ClusterState clusterState = internalCluster().clusterService(masterNode).state();
final Index index = clusterState.metadata().index(indexName).getIndex();
final Map<String, List<ShardRouting>> shardsByNodes = shardRoutingsByNodes(clusterState, index);
assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards));

for (Map.Entry<String, List<ShardRouting>> shardsByNode : shardsByNodes.entrySet()) {
final String nodeName = shardsByNode.getKey();
final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName);
assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty());
assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty());
assertNoDeletions(shardsByNode.getKey());
}

assertAcked(client().admin().indices().prepareDelete(indexName));
assertPendingDeletesProcessed();

assertBusy(() -> {
for (Map.Entry<String, List<ShardRouting>> shardsByNode : shardsByNodes.entrySet()) {
Expand All @@ -106,30 +119,37 @@ public void testListenersInvokedWhenIndexIsDeleted() throws Exception {
deletedShards.contains(shardId));
}
}
});
}, 30L, TimeUnit.SECONDS);
}

public void testListenersInvokedWhenIndexIsRelocated() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(4);
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(4);
ensureStableCluster(4 + 1, masterNode);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(4, 10))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 4 * between(1, 2))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1))
.build());

final NumShards numShards = getNumShards(indexName);
ensureGreen(indexName);

final ClusterState clusterState = clusterService().state();
assertFalse(client().admin().cluster().prepareHealth()
.setIndices(indexName)
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get()
.isTimedOut());

final ClusterState clusterState = internalCluster().clusterService(masterNode).state();
final Index index = clusterState.metadata().index(indexName).getIndex();
final Map<String, List<ShardRouting>> shardsByNodes = shardRoutingsByNodes(clusterState, index);
assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards));

for (Map.Entry<String, List<ShardRouting>> shardsByNode : shardsByNodes.entrySet()) {
final String nodeName = shardsByNode.getKey();
final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName);
assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty());
assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty());
assertNoDeletions(shardsByNode.getKey());
}

final List<String> excludedNodes = randomSubsetOf(2, shardsByNodes.keySet());
Expand Down Expand Up @@ -159,48 +179,58 @@ public void testListenersInvokedWhenIndexIsRelocated() throws Exception {
deletedShards.contains(shardId));
}
} else {
assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty());
assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty());
assertNoDeletions(nodeName);
}
}
});
}, 30L, TimeUnit.SECONDS);
}

public void testListenersInvokedWhenIndexIsDangling() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(4);
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(4);
ensureStableCluster(4 + 1, masterNode);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(4, 10))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 4 * between(1, 2))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1))
.build());

final NumShards numShards = getNumShards(indexName);
ensureGreen(indexName);

final ClusterState clusterState = clusterService().state();
assertFalse(client().admin().cluster().prepareHealth()
.setIndices(indexName)
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get()
.isTimedOut());

final ClusterState clusterState = internalCluster().clusterService(masterNode).state();
final Index index = clusterState.metadata().index(indexName).getIndex();
final Map<String, List<ShardRouting>> shardsByNodes = shardRoutingsByNodes(clusterState, index);
assertThat(shardsByNodes.values().stream().mapToInt(List::size).sum(), equalTo(numShards.totalNumShards));

for (Map.Entry<String, List<ShardRouting>> shardsByNode : shardsByNodes.entrySet()) {
final String nodeName = shardsByNode.getKey();
final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName);
assertTrue("Expecting no indices deleted on node " + nodeName, plugin.deletedIndices.isEmpty());
assertTrue("Expecting no shards deleted on node " + nodeName, plugin.deletedShards.isEmpty());
assertNoDeletions(shardsByNode.getKey());
}

final String stoppedNode = randomFrom(shardsByNodes.keySet());
final Settings stoppedNodeDataPathSettings = internalCluster().dataPathSettings(stoppedNode);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(stoppedNode));
ensureStableCluster(3 + 1, masterNode);

assertAcked(client().admin().indices().prepareDelete(indexName));

final String restartedNode = internalCluster().startNode(stoppedNodeDataPathSettings);
ensureStableCluster(4 + 1, masterNode);
assertPendingDeletesProcessed();

assertBusy(() -> {
final IndexFoldersDeletionListenerPlugin plugin = plugin(restartedNode);
assertTrue("Listener should have been notified of deletion of index " + index + " on node " + restartedNode,
plugin.deletedIndices.contains(index));
});
}, 30L, TimeUnit.SECONDS);
}

public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
Expand Down Expand Up @@ -229,6 +259,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {

logger.debug("--> stopping data node [{}], the data left on disk will be injected as left-overs in a newer data node", dataNode);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode));
ensureStableCluster(1, masterNode);

logger.debug("--> deleting leftover indices");
assertAcked(client().admin().indices().prepareDelete("index-*"));
Expand Down Expand Up @@ -266,6 +297,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
dataPaths.stream().map(p -> p.toAbsolutePath().toString()).collect(Collectors.toList()))
.putNull(Environment.PATH_SHARED_DATA_SETTING.getKey())
.build());
ensureStableCluster(1 + 1, masterNode);

final IndexFoldersDeletionListenerPlugin plugin = plugin(dataNode);
assertTrue("Expecting no shards deleted on node " + dataNode, plugin.deletedShards.isEmpty());
Expand Down Expand Up @@ -330,4 +362,12 @@ private static void assertPendingDeletesProcessed() throws Exception {
services.forEach(indicesService -> assertFalse(indicesService.hasUncompletedPendingDeletes()));
});
}

private static void assertNoDeletions(String nodeName) {
final IndexFoldersDeletionListenerPlugin plugin = plugin(nodeName);
assertTrue("Expecting no indices deleted on node [" + nodeName + "] but got: " + plugin.deletedIndices,
plugin.deletedIndices.isEmpty());
assertTrue("Expecting no shards deleted on node [" + nodeName + "] but got: " + plugin.deletedShards,
plugin.deletedShards.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.apache.lucene.document.Document;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardPath;
Expand All @@ -39,7 +39,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66278")
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
Expand All @@ -48,6 +47,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
// ensure the cache is definitely used
.put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(1L, ByteSizeUnit.GB))
// to make cache synchronization predictable
.put(CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(1L))
.build();
}

Expand Down Expand Up @@ -141,19 +142,21 @@ public Settings onNodeStopped(String nodeName) {
}
});

persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
persistentCache = cacheService.getPersistentCache();
ensureGreen(restoredIndexName);

cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
assertThat("Cache files should be repopulated in cache", persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
assertBusy(() -> {
cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)));
assertTrue(internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache().hasDeletions());
});
cacheService.synchronizeCache();

persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}