diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 165625844c371..e59c51d29217c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -235,9 +235,14 @@ public TransportAction(Settings settings, protected Response shardOperation(Request request, ShardId shardId) throws IOException { IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); - final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); - request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint()); + // The following shard generates the request based on the global checkpoint which may not be synced to all leading copies. + // However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies. + final long localCheckpoint = indexShard.getLocalCheckpoint(); + if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) { + throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " + + "to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + localCheckpoint + "], shardId=[" + shardId + "]"); + } final Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); return new Response(indexMetaDataVersion, operations);