diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index ed0a4e8973972..05da8963e4519 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -9,12 +9,20 @@ package org.elasticsearch.action.get; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; @@ -24,6 +32,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,8 +44,11 @@ */ public class TransportGetAction extends TransportSingleShardAction { + private static final Logger logger = LogManager.getLogger(TransportGetAction.class); + private final IndicesService indicesService; private final ExecutorSelector executorSelector; + private final NodeClient client; @Inject public TransportGetAction( @@ -45,7 +58,8 @@ public TransportGetAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ExecutorSelector executorSelector + ExecutorSelector executorSelector, + NodeClient client ) { super( GetAction.NAME, @@ -59,6 +73,7 @@ public TransportGetAction( ); this.indicesService = indicesService; this.executorSelector = executorSelector; + this.client = client; // register the internal TransportGetFromTranslogAction new TransportGetFromTranslogAction(transportService, indicesService, actionFilters); } @@ -78,7 +93,10 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { request.request().routing(), request.request().preference() ); - return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator); + if (iterator == null) { + return null; + } + return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList()); } @Override @@ -91,6 +109,12 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); + if (indexShard.routingEntry().isPromotableToPrimary() == false) { + handleGetOnUnpromotableShard(request, indexShard, listener); + return; + } + assert DiscoveryNode.isStateless(clusterService.getSettings()) == false + : "A TransportGetAction should always be handled by a search shard in Stateless"; if (request.realtime()) { // we are not tied to a refresh cycle here anyway asyncGet(request, shardId, listener); } else { @@ -148,6 +172,66 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener listener) + throws IOException { + ShardId shardId = indexShard.shardId(); + DiscoveryNode node = getCurrentNodeOfPrimary(shardId); + if (request.refresh()) { + logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); + var refreshRequest = new BasicReplicationRequest(shardId); + refreshRequest.setParentTask(request.getParentTask()); + client.executeLocally( + TransportShardRefreshAction.TYPE, + refreshRequest, + ActionListener.wrap(replicationResponse -> super.asyncShardOperation(request, shardId, listener), listener::onFailure) + ); + } else if (request.realtime()) { + TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId); + getFromTranslogRequest.setParentTask(request.getParentTask()); + transportService.sendRequest( + node, + TransportGetFromTranslogAction.NAME, + getFromTranslogRequest, + new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { + if (r.getResult() != null) { + logger.debug("received result for real-time get for id '{}' from promotable shard", request.id()); + l.onResponse(new GetResponse(r.getResult())); + } else { + logger.debug( + "no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})", + request.id(), + r.segmentGeneration() + ); + if (r.segmentGeneration() == -1) { + // Nothing to wait for (no previous unsafe generation), just handle the Get locally. + ActionRunnable.supply(listener, () -> shardOperation(request, shardId)).run(); + } else { + assert r.segmentGeneration() > -1L; + indexShard.waitForSegmentGeneration( + r.segmentGeneration(), + ActionListener.wrap(aLong -> super.asyncShardOperation(request, shardId, listener), listener::onFailure) + ); + } + } + }), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId)) + ); + } else { + // A non-real-time get with no explicit refresh requested. + super.asyncShardOperation(request, shardId, listener); + } + } + + private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { + var clusterState = clusterService.state(); + var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); + if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { + throw new NoShardAvailableActionException(shardId, "primary shard is not active"); + } + DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId()); + assert node != null; + return node; + } + private IndexShard getIndexShard(ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 3b3a64a1d2e1b..48cfa6626197d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -54,6 +54,7 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { /** * Shards to use for a {@code GET} operation. + * @return A shard iterator that can be used for GETs, or null if e.g. due to preferences no match is found. */ public ShardIterator getShards( ClusterState clusterState, diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 510de269b577d..8cc6ef53f6d9d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -214,6 +214,9 @@ public class InternalEngine extends Engine { private final ByteSizeValue totalDiskSpace; + protected static final String REAL_TIME_GET_REFRESH_SOURCE = "realtime_get"; + protected static final String UNSAFE_VERSION_MAP_REFRESH_SOURCE = "unsafe_version_map"; + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } @@ -848,7 +851,7 @@ protected GetResult realtimeGetUnderLock( } } assert versionValue.seqNo >= 0 : versionValue; - refreshIfNeeded("realtime_get", versionValue.seqNo); + refreshIfNeeded(REAL_TIME_GET_REFRESH_SOURCE, versionValue.seqNo); } if (getFromSearcherIfNotInTranslog) { return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false); @@ -960,7 +963,7 @@ private VersionValue getVersionFromMap(BytesRef id) { // map so once we pass this point we can safely lookup from the version map. if (versionMap.isUnsafe()) { lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1); - refresh("unsafe_version_map", SearcherScope.INTERNAL, true); + refreshInternalSearcher(UNSAFE_VERSION_MAP_REFRESH_SOURCE, true); } versionMap.enforceSafeAccess(); } @@ -1929,6 +1932,10 @@ public RefreshResult maybeRefresh(String source) throws EngineException { return refresh(source, SearcherScope.EXTERNAL, false); } + protected RefreshResult refreshInternalSearcher(String source, boolean block) throws EngineException { + return refresh(source, SearcherScope.INTERNAL, block); + } + final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException { // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. @@ -3052,7 +3059,7 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) { if (lastRefreshedCheckpoint() < requestingSeqNo) { synchronized (refreshIfNeededMutex) { if (lastRefreshedCheckpoint() < requestingSeqNo) { - refresh(source, SearcherScope.INTERNAL, true); + refreshInternalSearcher(source, true); } } }