Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless real-time GET #93976

Merged
merged 5 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,20 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -24,6 +32,8 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -34,8 +44,11 @@
*/
public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {

private static final Logger logger = LogManager.getLogger(TransportGetAction.class);

private final IndicesService indicesService;
private final ExecutorSelector executorSelector;
private final NodeClient client;

@Inject
public TransportGetAction(
Expand All @@ -45,7 +58,8 @@ public TransportGetAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ExecutorSelector executorSelector
ExecutorSelector executorSelector,
NodeClient client
) {
super(
GetAction.NAME,
Expand All @@ -59,6 +73,7 @@ public TransportGetAction(
);
this.indicesService = indicesService;
this.executorSelector = executorSelector;
this.client = client;
// register the internal TransportGetFromTranslogAction
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
}
Expand All @@ -78,7 +93,10 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
request.request().routing(),
request.request().preference()
);
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
if (iterator == null) {
return null;
}
pxsalehi marked this conversation as resolved.
Show resolved Hide resolved
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
}

@Override
Expand All @@ -91,6 +109,12 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
: "A TransportGetAction should always be handled by a search shard in Stateless";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
asyncGet(request, shardId, listener);
} else {
Expand Down Expand Up @@ -148,6 +172,66 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
}
}

private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
throws IOException {
ShardId shardId = indexShard.shardId();
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
if (request.refresh()) {
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
var refreshRequest = new BasicReplicationRequest(shardId);
refreshRequest.setParentTask(request.getParentTask());
client.executeLocally(
TransportShardRefreshAction.TYPE,
refreshRequest,
ActionListener.wrap(replicationResponse -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
);
} else if (request.realtime()) {
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
getFromTranslogRequest.setParentTask(request.getParentTask());
transportService.sendRequest(
node,
TransportGetFromTranslogAction.NAME,
getFromTranslogRequest,
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
if (r.getResult() != null) {
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
l.onResponse(new GetResponse(r.getResult()));
} else {
logger.debug(
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
request.id(),
r.segmentGeneration()
);
if (r.segmentGeneration() == -1) {
// Nothing to wait for (no previous unsafe generation), just handle the Get locally.
ActionRunnable.supply(listener, () -> shardOperation(request, shardId)).run();
} else {
assert r.segmentGeneration() > -1L;
indexShard.waitForSegmentGeneration(
r.segmentGeneration(),
ActionListener.wrap(aLong -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
);
}
}
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
);
} else {
// A non-real-time get with no explicit refresh requested.
super.asyncShardOperation(request, shardId, listener);
}
}

private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
var clusterState = clusterService.state();
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
throw new NoShardAvailableActionException(shardId, "primary shard is not active");
}
DiscoveryNode node = clusterState.nodes().get(shardRoutingTable.primaryShard().currentNodeId());
assert node != null;
return node;
}

private IndexShard getIndexShard(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {

/**
* Shards to use for a {@code GET} operation.
* @return A shard iterator that can be used for GETs, or null if e.g. due to preferences no match is found.
*/
public ShardIterator getShards(
ClusterState clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public class InternalEngine extends Engine {

private final ByteSizeValue totalDiskSpace;

protected static final String REAL_TIME_GET_REFRESH_SOURCE = "realtime_get";
protected static final String UNSAFE_VERSION_MAP_REFRESH_SOURCE = "unsafe_version_map";

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -848,7 +851,7 @@ protected GetResult realtimeGetUnderLock(
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
refreshIfNeeded(REAL_TIME_GET_REFRESH_SOURCE, versionValue.seqNo);
}
if (getFromSearcherIfNotInTranslog) {
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
Expand Down Expand Up @@ -960,7 +963,7 @@ private VersionValue getVersionFromMap(BytesRef id) {
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1);
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
refreshInternalSearcher(UNSAFE_VERSION_MAP_REFRESH_SOURCE, true);
}
versionMap.enforceSafeAccess();
}
Expand Down Expand Up @@ -1929,6 +1932,10 @@ public RefreshResult maybeRefresh(String source) throws EngineException {
return refresh(source, SearcherScope.EXTERNAL, false);
}

protected RefreshResult refreshInternalSearcher(String source, boolean block) throws EngineException {
return refresh(source, SearcherScope.INTERNAL, block);
}

final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
Expand Down Expand Up @@ -3052,7 +3059,7 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL, true);
refreshInternalSearcher(source, true);
}
}
}
Expand Down