Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast refresh indices should use search shards #113478

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SEARCH_FAILURE_STATS = def(8_759_00_0);
public static final TransportVersion INGEST_GEO_DATABASE_PROVIDERS = def(8_760_00_0);
public static final TransportVersion DATE_TIME_DOC_VALUES_LOCALES = def(8_761_00_0);
public static final TransportVersion FAST_REFRESH_RCO = def(8_762_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -120,27 +119,18 @@ public void onPrimaryOperationComplete(
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);

// Indices marked with fast refresh do not rely on refreshing the unpromotables
if (fastRefresh) {
listener.onResponse(null);
} else {
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
ActionResponse.Empty> {
Expand Down Expand Up @@ -73,6 +76,18 @@ protected void unpromotableShardOperation(
return;
}

// During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus,
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
// Note that the fast refresh setting is final.
// TODO: remove assertion (ES-9563)
kingherc marked this conversation as resolved.
Show resolved Hide resolved
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO)
: "attempted to refresh a fast refresh search shard "
+ shard
+ " on transport version "
+ transportService.getLocalNodeConnection().getTransportVersion()
+ " (before FAST_REFRESH_RCO)";

ActionListener.run(responseListener, listener -> {
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert indexShard.indexSettings().isFastRefresh() == false
: "a search shard should not receive a TransportGetAction for an index with fast refresh";
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert indexShard.indexSettings().isFastRefresh() == false
: "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
+ "the fast refresh setting";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -53,9 +52,7 @@ public void refreshShard(
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes it for future refreshes after the indexing node upgraded. But it does not guarantee immediate availability of the latest state on the search node. So we risk some seconds of non-realtime GET requests going backwards during such an upgrade? I think real-time GET requests will be saved by the wait-for generation, is that also your understanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning here is this code runs on the primary/indexing node, and indeed that the indexing node will be upgraded after the search nodes.

But it does not guarantee immediate availability of the latest state on the search node.

Doesn't our upgrade process guarantee that, since search nodes are upgraded first?

So we risk some seconds of non-realtime GET requests going backwards during such an upgrade?

A non-realtime GET coordinated by an old search node will go the primary to execute.
A non-realtime GET coordinated by a new search node, with an old primary node, will go the primary to execute.
A non-realtime GET coordinated by a new search node on a fully upgraded cluster, will be executed on the search node as is done for non-fast-refresh indices. Which should be fine as well. Not sure I see when/why it might go backwards?

I think real-time GET requests will be saved by the wait-for generation, is that also your understanding?

A real-time GET coordinated by an old search node will go the primary to execute.
A real-time GET coordinated by a new search node, with an old primary node, will go the primary to execute.
A real-time GET coordinated by a new search node on a fully upgraded cluster, will be executed on the search node as is done for non-fast-refresh indices. Which should use wait-for generation if necessary.

Please tell me if you see any corner cases I might have missed or not considered. It might be useful to think about the above combinations also for searches/mgets, but I believe it should be a similar story for them as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that it works out. The upgrade will force a relocation, which forces a flush, bringing things back into order. Thanks.

refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
Expand All @@ -68,9 +65,7 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class OperationRouting {
Expand Down Expand Up @@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
}

public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
// TODO: remove if and always return isSearchable (ES-9563)
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
return shardRouting.isPromotableToPrimary();
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) {
return shardRouting.isSearchable();
} else {
return shardRouting.isPromotableToPrimary();
}
} else {
return shardRouting.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
if (isStateless) {
return DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
return DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
&& loadFiltersEagerlySetting
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings());
kingherc marked this conversation as resolved.
Show resolved Hide resolved
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,6 +20,7 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -27,16 +29,22 @@
public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false);
innerReadyForSearch(true);
innerReadyForSearch(false, false);
innerReadyForSearch(false, true);
innerReadyForSearch(true, false);
innerReadyForSearch(true, true);
}

private void innerReadyForSearch(boolean fastRefresh) {
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
when(clusterState.getMinTransportVersion()).thenReturn(
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
Expand All @@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand Down Expand Up @@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand All @@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
Expand All @@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
if (fastRefresh && beforeFastRefreshRCO) {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
for (var isStateless : values) {
if (isStateless) {
assertEquals(
loadFiltersEagerly && indexFastRefresh && hasIndexRole,
loadFiltersEagerly && indexFastRefresh && hasIndexRole == false,
BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, indexFastRefresh)
)
Expand Down