Skip to content

Commit

Permalink
Revert fast refresh using search shards (elastic#115019)
Browse files Browse the repository at this point in the history
As this induces ES-8275 and makes fleet time outs for some APIs.

Relates ES-9573
  • Loading branch information
kingherc authored and lkts committed Oct 18, 2024
1 parent bfd9b11 commit 19d7c6f
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -119,18 +120,27 @@ public void onPrimaryOperationComplete(
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
);

// Indices marked with fast refresh do not rely on refreshing the unpromotables
if (fastRefresh) {
listener.onResponse(null);
} else {
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
ActionResponse.Empty> {
Expand Down Expand Up @@ -76,18 +73,6 @@ protected void unpromotableShardOperation(
return;
}

// During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus,
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
// Note that the fast refresh setting is final.
// TODO: remove assertion (ES-9563)
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO)
: "attempted to refresh a fast refresh search shard "
+ shard
+ " on transport version "
+ transportService.getLocalNodeConnection().getTransportVersion()
+ " (before FAST_REFRESH_RCO)";

ActionListener.run(responseListener, listener -> {
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportGetAction for an index with fast refresh";
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
+ "the fast refresh setting";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -52,7 +53,9 @@ public void refreshShard(
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
Expand All @@ -65,7 +68,9 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class OperationRouting {
Expand Down Expand Up @@ -306,14 +305,8 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
}

public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
// TODO: remove if and always return isSearchable (ES-9563)
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) {
return shardRouting.isSearchable();
} else {
return shardRouting.isPromotableToPrimary();
}
return shardRouting.isPromotableToPrimary();
} else {
return shardRouting.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
* <p>
Expand Down Expand Up @@ -103,7 +105,10 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
if (isStateless) {
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
return loadFiltersEagerlySetting
&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
} else {
return loadFiltersEagerlySetting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -20,7 +19,6 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -29,22 +27,16 @@
public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false, false);
innerReadyForSearch(false, true);
innerReadyForSearch(true, false);
innerReadyForSearch(true, true);
innerReadyForSearch(false);
innerReadyForSearch(true);
}

// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
private void innerReadyForSearch(boolean fastRefresh) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
when(clusterState.getMinTransportVersion()).thenReturn(
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
Expand All @@ -63,7 +55,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand Down Expand Up @@ -99,7 +91,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand All @@ -126,6 +118,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
Expand All @@ -143,8 +137,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -272,21 +273,35 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
for (var hasIndexRole : values) {
for (var loadFiltersEagerly : values) {
for (var isStateless : values) {
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
} else {
assertEquals(loadFiltersEagerly, result);
for (var fastRefresh : values) {
if (isStateless == false && fastRefresh) {
// fast refresh is only relevant for stateless indices
continue;
}

boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
} else {
assertEquals(loadFiltersEagerly, result);
}
}
}
}
}
}

private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);
private IndexSettings bitsetFilterCacheSettings(
boolean isStateless,
boolean hasIndexRole,
boolean loadFiltersEagerly,
boolean fastRefresh
) {
var indexSettingsBuilder = Settings.builder()
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);

var nodeSettingsBuilder = Settings.builder()
.putList(
Expand Down

0 comments on commit 19d7c6f

Please sign in to comment.