From 52460d5840d036668d6d5bf6ffed3224f3e1f5d7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Dec 2020 10:32:43 +0100 Subject: [PATCH 1/2] Avoid Needless Cache Status Fetches in SearchableSnapshotAllocator We shouldn't fetch cache status if no allocation is possible to begin with. Also, this surfaced an issue with using the `Client` to `reroute` since that won't retry stale shards (failed the invalid license IT for example) so I moved to using the `RerouteService` like we do in the `GatewayAllocator`. (Plus, dried up one method that was 100% the same as in the replica allocator) --- .../reroute/ClusterRerouteResponse.java | 2 +- .../gateway/ReplicaShardAllocator.java | 4 +- .../SearchableSnapshotAllocator.java | 77 ++++++------------- .../SearchableSnapshots.java | 6 +- .../SearchableSnapshotAllocatorTests.java | 13 ++-- 5 files changed, 34 insertions(+), 68 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java index 112c70aaeb35..95ec34acc6b6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java @@ -44,7 +44,7 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC explanations = RoutingExplanations.readFrom(in); } - public ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) { + ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) { super(acknowledged); this.state = state; this.explanations = explanations; diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2a232366ec43..83293f08924e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -241,8 +241,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element * in the returned tuple. */ - private static Tuple> canBeAllocatedToAtLeastOneNode(ShardRouting shard, - RoutingAllocation allocation) { + public static Tuple> canBeAllocatedToAtLeastOneNode(ShardRouting shard, + RoutingAllocation allocation) { Decision madeDecision = Decision.NO; final boolean explain = allocation.debugDecision(); Map nodeDecisions = explain ? new HashMap<>() : null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java index cab8e6081a40..78c05e74adb3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java @@ -5,17 +5,17 @@ */ package org.elasticsearch.xpack.searchablesnapshots; -import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -27,11 +27,13 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.gateway.AsyncShardFetch; +import org.elasticsearch.gateway.ReplicaShardAllocator; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; @@ -60,9 +62,9 @@ public class SearchableSnapshotAllocator implements ExistingShardsAllocator { private static final Logger logger = LogManager.getLogger(SearchableSnapshotAllocator.class); - private static final ActionListener REROUTE_LISTENER = new ActionListener<>() { + private static final ActionListener REROUTE_LISTENER = new ActionListener<>() { @Override - public void onResponse(ClusterRerouteResponse clusterRerouteResponse) { + public void onResponse(ClusterState clusterRerouteResponse) { logger.trace("reroute succeeded after loading snapshot cache information"); } @@ -78,8 +80,11 @@ public void onFailure(Exception e) { private final Client client; - public SearchableSnapshotAllocator(Client client) { + private final RerouteService rerouteService; + + public SearchableSnapshotAllocator(Client client, RerouteService rerouteService) { this.client = client; + this.rerouteService = rerouteService; } @Override @@ -151,20 +156,15 @@ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, null); } - final AsyncShardFetch.FetchResult fetchedCacheData = fetchData(shardRouting, allocation); - if (fetchedCacheData.hasData() == false) { - return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, null); - } - final boolean explain = allocation.debugDecision(); - final MatchingNodes matchingNodes = findMatchingNodes(shardRouting, allocation, fetchedCacheData, explain); - assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; - // pre-check if it can be allocated to any node that currently exists, so we won't list the cache sizes for it for nothing // TODO: in the following logic, we do not account for existing cache size when handling disk space checks, should and can we // reliably do this in a world of concurrent cache evictions or are we ok with the cache size just being a best effort hint // here? - Tuple> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation); + Tuple> result = ReplicaShardAllocator.canBeAllocatedToAtLeastOneNode( + shardRouting, + allocation + ); Decision allocateDecision = result.v1(); if (allocateDecision.type() != Decision.Type.YES && (explain == false || asyncFetchStore.get(shardRouting.shardId()) == null)) { // only return early if we are not in explain mode, or we are in explain mode but we have not @@ -176,6 +176,14 @@ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation ); } + final AsyncShardFetch.FetchResult fetchedCacheData = fetchData(shardRouting, allocation); + if (fetchedCacheData.hasData() == false) { + return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, null); + } + + final MatchingNodes matchingNodes = findMatchingNodes(shardRouting, allocation, fetchedCacheData, explain); + assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; + List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); if (allocateDecision.type() != Decision.Type.YES) { return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()), nodeDecisions); @@ -283,7 +291,7 @@ public void onFailure(Exception e) { } }, () -> { if (asyncFetch.data() != null) { - client.admin().cluster().prepareReroute().execute(REROUTE_LISTENER); + rerouteService.reroute("async_shard_cache_fetch", Priority.HIGH, REROUTE_LISTENER); } }) ); @@ -313,45 +321,6 @@ private static List augmentExplanationsWithStoreInfo( return augmented; } - /** - * Determines if the shard can be allocated on at least one node based on the allocation deciders. - * - * Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one - * node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided - * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element - * in the returned tuple. - * TODO: dry this method up against ReplicaShardAllocator - */ - private static Tuple> canBeAllocatedToAtLeastOneNode( - ShardRouting shard, - RoutingAllocation allocation - ) { - Decision madeDecision = Decision.NO; - final boolean explain = allocation.debugDecision(); - Map nodeDecisions = explain ? new HashMap<>() : null; - for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { - RoutingNode node = allocation.routingNodes().node(cursor.value.getId()); - if (node == null) { - continue; - } - // if we can't allocate it on a node, ignore it - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) { - if (explain) { - madeDecision = decision; - } else { - return Tuple.tuple(decision, null); - } - } else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) { - madeDecision = decision; - } - if (explain) { - nodeDecisions.put(node.nodeId(), new NodeAllocationResult(node.node(), null, decision)); - } - } - return Tuple.tuple(madeDecision, nodeDecisions); - } - private MatchingNodes findMatchingNodes( ShardRouting shard, RoutingAllocation allocation, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 9bca69b3f37b..b06816674790 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -172,8 +172,8 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng private final SetOnce blobStoreCacheService = new SetOnce<>(); private final SetOnce cacheService = new SetOnce<>(); private final SetOnce threadPool = new SetOnce<>(); - private final SetOnce client = new SetOnce<>(); private final SetOnce failShardsListener = new SetOnce<>(); + private final SetOnce allocator = new SetOnce<>(); private final Settings settings; public SearchableSnapshots(final Settings settings) { @@ -235,7 +235,7 @@ public Collection createComponents( } else { PersistentCache.cleanUp(settings, nodeEnvironment); } - this.client.set(client); + this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService())); components.add(new CacheServiceSupplier(cacheService.get())); return Collections.unmodifiableList(components); } @@ -344,7 +344,7 @@ public List getRestHandlers( @Override public Map getExistingShardsAllocators() { - return Map.of(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator(client.get())); + return Map.of(SearchableSnapshotAllocator.ALLOCATOR_NAME, allocator.get()); } // overridable by tests diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java index 8d1fa3f3151f..b18694568f3f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java @@ -11,8 +11,6 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; -import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -28,7 +26,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -119,10 +116,7 @@ public void doE Request request, ActionListener listener ) { - if (action == ClusterRerouteAction.INSTANCE) { - reroutesTriggered.incrementAndGet(); - listener.onResponse((Response) new ClusterRerouteResponse(true, state, new RoutingExplanations())); - } else if (action == TransportSearchableSnapshotCacheStoresAction.TYPE) { + if (action == TransportSearchableSnapshotCacheStoresAction.TYPE) { listener.onResponse( (Response) new TransportSearchableSnapshotCacheStoresAction.NodesCacheFilesMetadata( clusterName, @@ -144,7 +138,10 @@ public void doE } }; - final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator(client); + final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator(client, (reason, priority, listener) -> { + reroutesTriggered.incrementAndGet(); + listener.onResponse(null); + }); allocateAllUnassigned(allocation, allocator); assertEquals(1, reroutesTriggered.get()); From 0cb00177c8066ab431420e4f5c875817b1e2fa28 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 16 Dec 2020 13:59:09 +0100 Subject: [PATCH 2/2] add test --- .../SearchableSnapshotAllocatorTests.java | 140 +++++++++++++----- 1 file changed, 99 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java index b18694568f3f..0035fa073931 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -47,6 +48,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.empty; public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase { @@ -56,51 +58,14 @@ public void testAllocateToNodeWithLargestCache() { final DiscoveryNode localNode = randomFrom(nodes); final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); - final ClusterName clusterName = org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY); - final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random()); - final Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings( - settings(Version.CURRENT).put( - ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), - SearchableSnapshotAllocator.ALLOCATOR_NAME - ).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY) - ) - .numberOfShards(1) - .numberOfReplicas(0) - .putInSyncAllocationIds(shardId.id(), Collections.emptySet()) - ) - .build(); + final Metadata metadata = buildSingleShardIndexMetadata(shardId); final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId)); - final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (DiscoveryNode node : nodes) { - nodesBuilder.add(node); - } - final DiscoveryNodes discoveryNodes = nodesBuilder.build(); - final ClusterState state = ClusterState.builder(clusterName) - .metadata(metadata) - .routingTable(routingTableBuilder.build()) - .nodes(discoveryNodes) - .build(); + final ClusterState state = buildClusterState(nodes, metadata, routingTableBuilder); final long shardSize = randomNonNegativeLong(); - final RoutingAllocation allocation = new RoutingAllocation( - yesAllocationDeciders(), - new RoutingNodes(state, false), - state, - null, - new SnapshotShardSizeInfo(ImmutableOpenMap.of()) { - @Override - public Long getShardSize(ShardRouting shardRouting) { - return shardSize; - } - }, - TimeUnit.MILLISECONDS.toNanos(deterministicTaskQueue.getCurrentTimeMillis()) - ); final AtomicInteger reroutesTriggered = new AtomicInteger(0); @@ -119,7 +84,7 @@ public void doE if (action == TransportSearchableSnapshotCacheStoresAction.TYPE) { listener.onResponse( (Response) new TransportSearchableSnapshotCacheStoresAction.NodesCacheFilesMetadata( - clusterName, + state.getClusterName(), existingCacheSizes.entrySet() .stream() .map( @@ -142,6 +107,8 @@ public void doE reroutesTriggered.incrementAndGet(); listener.onResponse(null); }); + + final RoutingAllocation allocation = buildAllocation(deterministicTaskQueue, state, shardSize, yesAllocationDeciders()); allocateAllUnassigned(allocation, allocator); assertEquals(1, reroutesTriggered.get()); @@ -153,11 +120,102 @@ public void doE final ShardRouting primaryRouting = allocation.routingNodes().assignedShards(shardId).get(0); final String primaryNodeId = primaryRouting.currentNodeId(); - final DiscoveryNode primaryNode = discoveryNodes.get(primaryNodeId); + final DiscoveryNode primaryNode = state.nodes().get(primaryNodeId); assertEquals(bestCacheSize, (long) existingCacheSizes.get(primaryNode)); } } + public void testNoFetchesOnDeciderNo() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final List nodes = randomList(1, 10, () -> newNode("node-" + UUIDs.randomBase64UUID(random()))); + final DiscoveryNode localNode = randomFrom(nodes); + final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random()); + + final Metadata metadata = buildSingleShardIndexMetadata(shardId); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId)); + + final ClusterState state = buildClusterState(nodes, metadata, routingTableBuilder); + final RoutingAllocation allocation = buildAllocation( + deterministicTaskQueue, + state, + randomNonNegativeLong(), + noAllocationDeciders() + ); + + final Client client = new NoOpNodeClient(deterministicTaskQueue.getThreadPool()) { + @Override + public void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + throw new AssertionError("Expecting no requests but received [" + action + "]"); + } + }; + + final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator( + client, + (reason, priority, listener) -> { throw new AssertionError("Expecting no reroutes"); } + ); + allocateAllUnassigned(allocation, allocator); + assertTrue(allocation.routingNodesChanged()); + assertThat(allocation.routingNodes().assignedShards(shardId), empty()); + assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned()); + } + + private static Metadata buildSingleShardIndexMetadata(ShardId shardId) { + return Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings( + settings(Version.CURRENT).put( + ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(), + SearchableSnapshotAllocator.ALLOCATOR_NAME + ).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .putInSyncAllocationIds(shardId.id(), Collections.emptySet()) + ) + .build(); + } + + private ClusterState buildClusterState(List nodes, Metadata metadata, RoutingTable.Builder routingTableBuilder) { + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + for (DiscoveryNode node : nodes) { + nodesBuilder.add(node); + } + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTableBuilder.build()) + .nodes(nodesBuilder) + .build(); + } + + private static RoutingAllocation buildAllocation( + DeterministicTaskQueue deterministicTaskQueue, + ClusterState state, + long shardSize, + AllocationDeciders allocationDeciders + ) { + return new RoutingAllocation( + allocationDeciders, + new RoutingNodes(state, false), + state, + null, + new SnapshotShardSizeInfo(ImmutableOpenMap.of()) { + @Override + public Long getShardSize(ShardRouting shardRouting) { + return shardSize; + } + }, + TimeUnit.MILLISECONDS.toNanos(deterministicTaskQueue.getCurrentTimeMillis()) + ); + } + private static void allocateAllUnassigned(RoutingAllocation allocation, ExistingShardsAllocator allocator) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) {