From fa54be2dcd1dfecdf065f5bb74066f97bcf138ac Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 31 May 2018 15:14:32 -0400 Subject: [PATCH] CCR: Do not minimization requesting range on leader (#30980) Today before reading operations on the leading shard, we minimization the requesting range with the global checkpoint. However, this might make the request invalid if the following shard generates a requesting range based on the global-checkpoint from a primary shard and sends that request to a replica whose global checkpoint is lagged. Another issue is that we are mutating the request when applying minimization. If the request becomes invalid on a replica, we will reroute the mutated request instead of the original one to the primary. This commit removes the minimization and replaces it by a range check with the local checkpoint. --- .../xpack/ccr/action/ShardChangesAction.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 165625844c371..e59c51d29217c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -235,9 +235,14 @@ public TransportAction(Settings settings, protected Response shardOperation(Request request, ShardId shardId) throws IOException { IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); - final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); - request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint()); + // The following shard generates the request based on the global checkpoint which may not be synced to all leading copies. + // However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies. + final long localCheckpoint = indexShard.getLocalCheckpoint(); + if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) { + throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " + + "to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + localCheckpoint + "], shardId=[" + shardId + "]"); + } final Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); return new Response(indexMetaDataVersion, operations);