Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary replica resync should not send ops without seqno #40881

Merged
merged 1 commit into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ protected void doRun() throws Exception {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 &&
(seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) {
totalSkippedOps.incrementAndGet();
continue;
}
assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]";
operations.add(operation);
size += operation.estimateSize();
totalSentOps.incrementAndGet();
Expand All @@ -260,7 +260,6 @@ protected void doRun() throws Exception {
break;
}
}

final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,30 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

Expand Down Expand Up @@ -192,6 +201,31 @@ public void onResponse(PrimaryReplicaSyncer.ResyncTask result) {
}
}

public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception {
IndexShard shard = spy(newStartedShard(true));
when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO);
int numOps = between(0, 20);
List<Translog.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
operations.add(new Translog.Index(
"_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
}
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
List<Translog.Operation> sentOperations = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
sentOperations.addAll(Arrays.asList(request.getOperations()));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.actionGet();
assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList())));
closeShards(shard);
}

public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -142,4 +143,28 @@ public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot,
}
return ops;
}

public static Translog.Snapshot newSnapshotFromOperations(List<Translog.Operation> operations) {
final Iterator<Translog.Operation> iterator = operations.iterator();
return new Translog.Snapshot() {
@Override
public int totalOperations() {
return operations.size();
}

@Override
public Translog.Operation next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}

@Override
public void close() {

}
};
}
}