From c812ad54b45cff97ac6457ecf4e29fbde526255c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 3 Apr 2019 22:17:31 -0400 Subject: [PATCH] Primary replica resync should not send ops without seqno (#40433) Primary-replica resync in a mixed-cluster between 6.x and 5.6 can send operations without sequence number to a replica which already processed operations with sequence number. This leads to the failure of that replica for we trip the sequence number assertion when writing resync operations without sequence number to translog. --- .../index/shard/PrimaryReplicaSyncer.java | 5 ++- .../shard/PrimaryReplicaSyncerTests.java | 34 +++++++++++++++++++ .../index/translog/TestTranslog.java | 25 ++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index f3e631f8bf6e0..07aade952923b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -246,11 +246,11 @@ protected void doRun() throws Exception { Translog.Operation operation; while ((operation = snapshot.next()) != null) { final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && - (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) { totalSkippedOps.incrementAndGet(); continue; } + assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]"; operations.add(operation); size += operation.estimateSize(); totalSentOps.incrementAndGet(); @@ -260,7 +260,6 @@ protected void doRun() throws Exception { break; } } - final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO; // have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 85e381b176ccb..c7d59fdb7c25e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -42,21 +42,30 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.translog.TestTranslog; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -186,6 +195,31 @@ public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { } } + public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { + IndexShard shard = spy(newStartedShard(true)); + when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO); + int numOps = between(0, 20); + List operations = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + operations.add(new Translog.Index( + "_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1})); + } + doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong()); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + List sentOperations = new ArrayList<>(); + PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { + sentOperations.addAll(Arrays.asList(request.getOperations())); + listener.onResponse(new ResyncReplicationResponse()); + }; + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); + PlainActionFuture fut = new PlainActionFuture<>(); + syncer.resync(shard, fut); + fut.actionGet(); + assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList()))); + closeShards(shard); + } + public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10), randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000)); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 003054fc71550..a3ebfff478e98 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; @@ -142,4 +143,28 @@ public static List drainSnapshot(Translog.Snapshot snapshot, } return ops; } + + public static Translog.Snapshot newSnapshotFromOperations(List operations) { + final Iterator iterator = operations.iterator(); + return new Translog.Snapshot() { + @Override + public int totalOperations() { + return operations.size(); + } + + @Override + public Translog.Operation next() { + if (iterator.hasNext()) { + return iterator.next(); + } else { + return null; + } + } + + @Override + public void close() { + + } + }; + } }