-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry get_from_translog during relocations #104579
Changes from all commits
dc81a98
f87877e
2818475
2345a19
9fc8704
5e567d9
09088f9
da3f4e4
5dbc77b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -8,17 +8,23 @@ | |||
|
||||
package org.elasticsearch.action.get; | ||||
|
||||
import org.apache.logging.log4j.LogManager; | ||||
import org.apache.logging.log4j.Logger; | ||||
import org.elasticsearch.ElasticsearchException; | ||||
import org.elasticsearch.ExceptionsHelper; | ||||
import org.elasticsearch.action.ActionListener; | ||||
import org.elasticsearch.action.ActionListenerResponseHandler; | ||||
import org.elasticsearch.action.ActionRunnable; | ||||
import org.elasticsearch.action.ActionType; | ||||
import org.elasticsearch.action.NoShardAvailableActionException; | ||||
import org.elasticsearch.action.UnavailableShardsException; | ||||
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; | ||||
import org.elasticsearch.action.support.ActionFilters; | ||||
import org.elasticsearch.action.support.replication.BasicReplicationRequest; | ||||
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; | ||||
import org.elasticsearch.client.internal.node.NodeClient; | ||||
import org.elasticsearch.cluster.ClusterState; | ||||
import org.elasticsearch.cluster.ClusterStateObserver; | ||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||||
import org.elasticsearch.cluster.routing.OperationRouting; | ||||
|
@@ -27,15 +33,17 @@ | |||
import org.elasticsearch.cluster.service.ClusterService; | ||||
import org.elasticsearch.common.inject.Inject; | ||||
import org.elasticsearch.common.io.stream.Writeable; | ||||
import org.elasticsearch.core.TimeValue; | ||||
import org.elasticsearch.index.IndexNotFoundException; | ||||
import org.elasticsearch.index.IndexService; | ||||
import org.elasticsearch.index.engine.Engine; | ||||
import org.elasticsearch.index.get.GetResult; | ||||
import org.elasticsearch.index.shard.IndexShard; | ||||
import org.elasticsearch.index.shard.ShardId; | ||||
import org.elasticsearch.index.shard.ShardNotFoundException; | ||||
import org.elasticsearch.indices.ExecutorSelector; | ||||
import org.elasticsearch.indices.IndicesService; | ||||
import org.elasticsearch.logging.LogManager; | ||||
import org.elasticsearch.logging.Logger; | ||||
import org.elasticsearch.node.NodeClosedException; | ||||
import org.elasticsearch.threadpool.ThreadPool; | ||||
import org.elasticsearch.transport.TransportService; | ||||
|
||||
|
@@ -184,8 +192,8 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes | |||
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener) | ||||
throws IOException { | ||||
ShardId shardId = indexShard.shardId(); | ||||
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); | ||||
if (request.refresh()) { | ||||
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId); | ||||
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId()); | ||||
var refreshRequest = new BasicReplicationRequest(shardId); | ||||
refreshRequest.setParentTask(request.getParentTask()); | ||||
|
@@ -194,44 +202,97 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh | |||
refreshRequest, | ||||
listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) | ||||
); | ||||
} else if (request.realtime()) { | ||||
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId); | ||||
getFromTranslogRequest.setParentTask(request.getParentTask()); | ||||
transportService.sendRequest( | ||||
node, | ||||
TransportGetFromTranslogAction.NAME, | ||||
getFromTranslogRequest, | ||||
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { | ||||
if (r.getResult() != null) { | ||||
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id()); | ||||
l.onResponse(new GetResponse(r.getResult())); | ||||
} else { | ||||
logger.debug( | ||||
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})", | ||||
request.id(), | ||||
r.segmentGeneration() | ||||
); | ||||
if (r.segmentGeneration() == -1) { | ||||
// Nothing to wait for (no previous unsafe generation), just handle the Get locally. | ||||
ActionRunnable.supply(l, () -> shardOperation(request, shardId)).run(); | ||||
} else { | ||||
assert r.segmentGeneration() > -1L; | ||||
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM; | ||||
indexShard.waitForPrimaryTermAndGeneration( | ||||
r.primaryTerm(), | ||||
r.segmentGeneration(), | ||||
listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll)) | ||||
); | ||||
} | ||||
} | ||||
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId)) | ||||
return; | ||||
} | ||||
if (request.realtime()) { | ||||
final var state = clusterService.state(); | ||||
final var observer = new ClusterStateObserver( | ||||
state, | ||||
clusterService, | ||||
TimeValue.timeValueSeconds(60), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder how we decide the 60 seconds timeout. Is it to match the default timeout from ReplicationRequest or the default global timeout for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is to match the default of the |
||||
logger, | ||||
threadPool.getThreadContext() | ||||
); | ||||
getFromTranslog(request, indexShard, state, observer, listener); | ||||
} else { | ||||
// A non-real-time get with no explicit refresh requested. | ||||
super.asyncShardOperation(request, shardId, listener); | ||||
} | ||||
} | ||||
|
||||
private void getFromTranslog( | ||||
GetRequest request, | ||||
IndexShard indexShard, | ||||
ClusterState state, | ||||
ClusterStateObserver observer, | ||||
ActionListener<GetResponse> listener | ||||
) { | ||||
tryGetFromTranslog(request, indexShard, state, listener.delegateResponse((l, e) -> { | ||||
ywangd marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a little tricky readability-wise that this (IIUC) only works because Can we refine this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @henningandersen I guess you mean As for the case where currently there is no node ie the primary is unassigned, I thought we want to retry on that. Why not retry with a new state in case the shard gets assigned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My knownledge is limited in the allocation/relocation area. So please correct me if I am wrong here. In my understanding, the PR is to retry when the primary shard is relocating. When a shard is relocating, can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think fail fast is the expected behaviour here. That is at least my reading of the regular get action and how we normally handle reads. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So making sure The larger issue is about retries of (m)getFromTranslog. When I prepared the PR I considered only two exceptions which was IndexNotFound and ShardNotFoundException. As we discussed, it seemed that there are more exceptions to retry on. So those are not necessarily relocated-caused, I think. So I guess my question goes back to those exceptions. Should we retry on NoShardAvailableActionException and UnavailableShardsException? To me it seems reasonable to do so. If not, I can take them out of the list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we were replying at the same time. Didn't see yours! :) Thanks for the clarification. Then I'll open a PR to fix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that looks reasonable to me. |
||||
final var cause = ExceptionsHelper.unwrapCause(e); | ||||
logger.debug("get_from_translog failed", cause); | ||||
if (cause instanceof ShardNotFoundException | ||||
|| cause instanceof IndexNotFoundException | ||||
|| cause instanceof NoShardAvailableActionException | ||||
|| cause instanceof UnavailableShardsException) { | ||||
logger.debug("retrying get_from_translog"); | ||||
observer.waitForNextChange(new ClusterStateObserver.Listener() { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to have a state predicate instead of retry on every state change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if that is necessary or what the guideline is on this. This shouldn't be a very common occurrence, so I assume retrying on the next change should be fine. Unless there is a good reason not to, I'd rather keep it simple. I see also quite a few other places where we retry simply on the next state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for explaining. IIUC, this can potentially retry for an extended period of time, i.e. if the cluster state changes within 60 seconds and the trigger retry still failed so that a new retry will be scheduled. If cluster state keeps having at least one change within the next 60 seconds, it will then retry again. This is likely fine. But I'd like to ensure that we explicitly agree on this. Or please let me know if my understanding is off. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ywangd (just in case this helps with the review) this whole pattern of how we retry and chase the primary is the same that is used for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that's true. Although since we set the parent task ID on the get from translog requests, then at some point I assume/expect the get request timeout would cancel this if it is set, or once explicitly cancelled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The GET request does not seem to have a timeout nor cancellable. I think we want to make it at least cancellable in future. |
||||
@Override | ||||
public void onNewClusterState(ClusterState state) { | ||||
getFromTranslog(request, indexShard, state, observer, l); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use logging message for retry. If this is rare enough, it can probably be INFO. Otherwise, we could go with DEBUG. |
||||
} | ||||
Comment on lines
+240
to
+242
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this method runs on the cluster state thread. In that case, we should dispatch the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if that is strictly necessary when we are not doing any heavy computations. I see quite a few places where we just send a request in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for making the extra effort to confirm this is not an issue. |
||||
|
||||
@Override | ||||
public void onClusterServiceClose() { | ||||
l.onFailure(new NodeClosedException(clusterService.localNode())); | ||||
} | ||||
|
||||
@Override | ||||
public void onTimeout(TimeValue timeout) { | ||||
l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause)); | ||||
} | ||||
}); | ||||
} else { | ||||
l.onFailure(e); | ||||
} | ||||
})); | ||||
} | ||||
|
||||
private void tryGetFromTranslog(GetRequest request, IndexShard indexShard, ClusterState state, ActionListener<GetResponse> listener) { | ||||
ShardId shardId = indexShard.shardId(); | ||||
var node = getCurrentNodeOfPrimary(state, shardId); | ||||
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId); | ||||
getFromTranslogRequest.setParentTask(request.getParentTask()); | ||||
transportService.sendRequest( | ||||
node, | ||||
TransportGetFromTranslogAction.NAME, | ||||
getFromTranslogRequest, | ||||
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> { | ||||
if (r.getResult() != null) { | ||||
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id()); | ||||
l.onResponse(new GetResponse(r.getResult())); | ||||
} else { | ||||
logger.debug( | ||||
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})", | ||||
request.id(), | ||||
r.segmentGeneration() | ||||
); | ||||
if (r.segmentGeneration() == -1) { | ||||
// Nothing to wait for (no previous unsafe generation), just handle the Get locally. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there is a corner-case here where we might "lose" the previous unsafe generation during a relocation. For example right after there is a switch from unsafe to safe and we flush and record the generation and the commit is uploaded, the indexing shard is moved to a new node. The GET hits the old node and has to be retried on the new node, which is not aware of any unsafe generation and would return -1. Is there anything preventing the search shard to handle the get locally if for whatever reason the commit that was created on the old node is still not on the search shard? It is probably a bit of an extreme corner-case. If it is possible, one way would be to conservatively set the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need sometime to think over this corner case. But it does remind me of a higher level question: why do we need to retry the RTG on relocating shard? I don't think we do that for Stateful, i.e. the list of shards for GET is computed once and any subsequent cluster changes do not affect it. In serverless, if we have an index with 1 primary shard and 1 replica, we do not retry a "non"-RTG if the search shard is relocating. If that is true, why should we retry RTG when primary shard is relocating? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the separation of index/search tier is the difference. An RTG is ALWAYS cross tier in stateless and therefore things like relocation shouldn't simply disrupt the request, as it is an internal corner-case that we should cover. As for the corner case, I've mentioned above, I can try to verify it in a test. For now, we could ignore it maybe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I believe this could also happen today w/o the retry logic of get_from_translog. If a get (not a retried one, but a new one) hits the new indexing shard (freshly relocated), and the search shard hasn't finished receiving/processing the new commit that comes out of the relocation-flush (and is on a generation that is less than the lastUnsafeGeneration), the search shard could get -1 back from get_from_translog, and handle the get locally. As mentioned, this is a bit extreme, but might be possible. So I think we shouldn't tie that corner-case to this PR. I'll open a separate PR with at least a test to verify it and a fix if necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Right. I also came to realise the same thing during my morning time walk, fresh air helped :) |
||||
ActionRunnable.supply(l, () -> shardOperation(request, shardId)).run(); | ||||
} else { | ||||
assert r.segmentGeneration() > -1L; | ||||
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM; | ||||
indexShard.waitForPrimaryTermAndGeneration( | ||||
r.primaryTerm(), | ||||
r.segmentGeneration(), | ||||
listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll)) | ||||
); | ||||
} | ||||
} | ||||
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId)) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this PR. I think we should make sure elasticsearch/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java Line 55 in 35cc9e1
A follow-up on this would be useful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. I'll follow this up in a new PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ywangd Checking this now, I don't think this is an issue. In TransportGetAction we also use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The The |
||||
); | ||||
} | ||||
|
||||
static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) { | ||||
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); | ||||
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) { | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch has been moved as-is to
tryGetFromTranslog