Skip to content

Commit

Permalink
Primary replica resync should not send ops without seqno (#40433)
Browse files Browse the repository at this point in the history
Primary-replica resync in a mixed-cluster between 6.x and 5.6 can send
operations without sequence number to a replica which already processed
operations with sequence number. This leads to the failure of that
replica for we trip the sequence number assertion when writing resync
operations without sequence number to translog.
  • Loading branch information
dnhatn authored Apr 4, 2019
1 parent aed30d6 commit c737943
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ protected void doRun() throws Exception {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 &&
(seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) {
totalSkippedOps.incrementAndGet();
continue;
}
assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]";
operations.add(operation);
size += operation.estimateSize();
totalSentOps.incrementAndGet();
Expand All @@ -260,7 +260,6 @@ protected void doRun() throws Exception {
break;
}
}

final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,30 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

Expand Down Expand Up @@ -186,6 +195,31 @@ public void onResponse(PrimaryReplicaSyncer.ResyncTask result) {
}
}

public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception {
IndexShard shard = spy(newStartedShard(true));
when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO);
int numOps = between(0, 20);
List<Translog.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
operations.add(new Translog.Index(
"_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
}
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
List<Translog.Operation> sentOperations = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
sentOperations.addAll(Arrays.asList(request.getOperations()));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.actionGet();
assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList())));
closeShards(shard);
}

public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -142,4 +143,28 @@ public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot,
}
return ops;
}

public static Translog.Snapshot newSnapshotFromOperations(List<Translog.Operation> operations) {
final Iterator<Translog.Operation> iterator = operations.iterator();
return new Translog.Snapshot() {
@Override
public int totalOperations() {
return operations.size();
}

@Override
public Translog.Operation next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}

@Override
public void close() {

}
};
}
}

0 comments on commit c737943

Please sign in to comment.