From d3fceaddefcc32c71321768d05f268bce2374634 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 17 Oct 2024 20:11:15 +0300 Subject: [PATCH] Revert fast refresh using search shards (#115019) As this induces ES-8275 and makes fleet time outs for some APIs. Relates ES-9573 --- .../refresh/TransportShardRefreshAction.java | 32 +++++++++++------- ...ansportUnpromotableShardRefreshAction.java | 15 --------- .../action/get/TransportGetAction.java | 4 ++- .../get/TransportShardMultiGetAction.java | 4 ++- .../support/replication/PostWriteRefresh.java | 9 +++-- .../cluster/routing/OperationRouting.java | 9 +---- .../index/cache/bitset/BitsetFilterCache.java | 7 +++- .../routing/IndexRoutingTableTests.java | 24 +++++--------- .../cache/bitset/BitSetFilterCacheTests.java | 33 ++++++++++++++----- 9 files changed, 74 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index cb667400240f0..7857e9a22e9b9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; @@ -119,18 +120,27 @@ public void onPrimaryOperationComplete( ActionListener listener ) { assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed"; - UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest( - indexShardRoutingTable, - replicaRequest.primaryRefreshResult.primaryTerm(), - replicaRequest.primaryRefreshResult.generation(), - false - ); - transportService.sendRequest( - transportService.getLocalNode(), - TransportUnpromotableShardRefreshAction.NAME, - unpromotableReplicaRequest, - new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) + boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get( + clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings() ); + + // Indices marked with fast refresh do not rely on refreshing the unpromotables + if (fastRefresh) { + listener.onResponse(null); + } else { + UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest( + indexShardRoutingTable, + replicaRequest.primaryRefreshResult.primaryTerm(), + replicaRequest.primaryRefreshResult.generation(), + false + ); + transportService.sendRequest( + transportService.getLocalNode(), + TransportUnpromotableShardRefreshAction.NAME, + unpromotableReplicaRequest, + new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) + ); + } } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index f91a983d47885..6c24ec2d17604 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -24,9 +24,6 @@ import java.util.List; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO; -import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; - public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction< UnpromotableShardRefreshRequest, ActionResponse.Empty> { @@ -76,18 +73,6 @@ protected void unpromotableShardOperation( return; } - // During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus, - // when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already. - // Note that the fast refresh setting is final. - // TODO: remove assertion (ES-9563) - assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false - || transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO) - : "attempted to refresh a fast refresh search shard " - + shard - + " on transport version " - + transportService.getLocalNodeConnection().getTransportVersion() - + " (before FAST_REFRESH_RCO)"; - ActionListener.run(responseListener, listener -> { shard.waitForPrimaryTermAndGeneration( request.getPrimaryTerm(), diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index fb4b3907d2bfd..9e535344c9589 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -126,10 +126,12 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (indexShard.routingEntry().isPromotableToPrimary() == false) { + // TODO: Re-evaluate assertion (ES-8227) + // assert indexShard.indexSettings().isFastRefresh() == false + // : "a search shard should not receive a TransportGetAction for an index with fast refresh"; handleGetOnUnpromotableShard(request, indexShard, listener); return; } - // TODO: adapt assertion to assert only that it is not stateless (ES-9563) assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh() : "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting"; if (request.realtime()) { // we are not tied to a refresh cycle here anyway diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 633e7ef6793ab..34b3ae50e0b51 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -124,10 +124,12 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (indexShard.routingEntry().isPromotableToPrimary() == false) { + // TODO: Re-evaluate assertion (ES-8227) + // assert indexShard.indexSettings().isFastRefresh() == false + // : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh"; handleMultiGetOnUnpromotableShard(request, indexShard, listener); return; } - // TODO: adapt assertion to assert only that it is not stateless (ES-9563) assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh() : "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has " + "the fast refresh setting"; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index 7414aeeb2c405..683c3589c893d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; @@ -52,7 +53,9 @@ public void refreshShard( case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() { @Override public void onResponse(Boolean forced) { - if (location != null && indexShard.routingEntry().isSearchable() == false) { + // Fast refresh indices do not depend on the unpromotables being refreshed + boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings()); + if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) { refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout); } else { listener.onResponse(forced); @@ -65,7 +68,9 @@ public void onFailure(Exception e) { } }); case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> { - if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) { + // Fast refresh indices do not depend on the unpromotables being refreshed + boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings()); + if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) { sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout); } else { l.onResponse(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 9120e25b443d7..f7812d284f2af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; public class OperationRouting { @@ -306,14 +305,8 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null } public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) { - // TODO: remove if and always return isSearchable (ES-9563) if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) { - // Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally. - if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) { - return shardRouting.isSearchable(); - } else { - return shardRouting.isPromotableToPrimary(); - } + return shardRouting.isPromotableToPrimary(); } else { return shardRouting.isSearchable(); } diff --git a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 5792cafb91b77..5277999271984 100644 --- a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -58,6 +58,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; + /** * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time. *

@@ -103,7 +105,10 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) { boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING); boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings()); if (isStateless) { - return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE); + return loadFiltersEagerlySetting + && (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE) + || (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE) + && INDEX_FAST_REFRESH_SETTING.get(settings.getSettings()))); } else { return loadFiltersEagerlySetting; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java index 6a7f4bb27a324..21b30557cafea 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -20,7 +19,6 @@ import java.util.List; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -29,22 +27,16 @@ public class IndexRoutingTableTests extends ESTestCase { public void testReadyForSearch() { - innerReadyForSearch(false, false); - innerReadyForSearch(false, true); - innerReadyForSearch(true, false); - innerReadyForSearch(true, true); + innerReadyForSearch(false); + innerReadyForSearch(true); } - // TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563) - private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) { + private void innerReadyForSearch(boolean fastRefresh) { Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID()); ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS); when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn( Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build() ); - when(clusterState.getMinTransportVersion()).thenReturn( - beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current() - ); // 2 primaries that are search and index ShardId p1 = new ShardId(index, 0); IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable( @@ -63,7 +55,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY))); shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY))); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh && beforeFastRefreshRCO) { + if (fastRefresh) { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } else { assertFalse(indexRoutingTable.readyForSearch(clusterState)); @@ -99,7 +91,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR ) ); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh && beforeFastRefreshRCO) { + if (fastRefresh) { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } else { assertFalse(indexRoutingTable.readyForSearch(clusterState)); @@ -126,6 +118,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR assertTrue(indexRoutingTable.readyForSearch(clusterState)); // 2 unassigned primaries that are index only with some replicas that are all available + // Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure + // that readyForSearch allows for searching replicas when the index shard is not available. shardTable1 = new IndexShardRoutingTable( p1, List.of( @@ -143,8 +137,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR ) ); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh && beforeFastRefreshRCO) { - assertFalse(indexRoutingTable.readyForSearch(clusterState)); + if (fastRefresh) { + assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change } else { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } diff --git a/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java b/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java index b12cd256eebcc..d7d5c886e0741 100644 --- a/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java +++ b/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; +import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -272,21 +273,35 @@ public void testShouldLoadRandomAccessFiltersEagerly() { for (var hasIndexRole : values) { for (var loadFiltersEagerly : values) { for (var isStateless : values) { - boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly( - bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly) - ); - if (isStateless) { - assertEquals(loadFiltersEagerly && hasIndexRole == false, result); - } else { - assertEquals(loadFiltersEagerly, result); + for (var fastRefresh : values) { + if (isStateless == false && fastRefresh) { + // fast refresh is only relevant for stateless indices + continue; + } + + boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly( + bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh) + ); + if (isStateless) { + assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result); + } else { + assertEquals(loadFiltersEagerly, result); + } } } } } } - private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) { - var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly); + private IndexSettings bitsetFilterCacheSettings( + boolean isStateless, + boolean hasIndexRole, + boolean loadFiltersEagerly, + boolean fastRefresh + ) { + var indexSettingsBuilder = Settings.builder() + .put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly) + .put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh); var nodeSettingsBuilder = Settings.builder() .putList(