Skip to content

Commit

Permalink
Remove LocalCheckpointTracker#resetCheckpoint (#34667)
Browse files Browse the repository at this point in the history
In #34474, we added a new assertion to ensure that the
LocalCheckpointTracker is always consistent with Lucene index. However,
we reset LocalCheckpoinTracker in testDedupByPrimaryTerm cause this
assertion to be violated.

This commit removes resetCheckpoint from LocalCheckpointTracker and
rewrites testDedupByPrimaryTerm without resetting the local checkpoint.

Relates #34474
  • Loading branch information
dnhatn committed Dec 9, 2018
1 parent 086055b commit 7dd6e58
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,6 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as after we restore the local history on promotion.
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5481,7 +5481,7 @@ public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateReplicaHistory(between(1, 100), randomBoolean());
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, ops);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.translog.SnapshotMatchers;
Expand All @@ -32,7 +31,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,35 +148,35 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667")
public void testDedupByPrimaryTerm() throws Exception {
Map<Long, Long> latestOperations = new HashMap<>();
List<Integer> terms = Arrays.asList(between(1, 1000), between(1000, 2000));
/**
* If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation
* into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into
* Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies
* that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies.
*/
public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception {
Map<Long, Long> seqNoToTerm = new HashMap<>();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
int totalOps = 0;
for (long term : terms) {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), false, term, 2, 20, "1");
primaryTerm.set(Math.max(primaryTerm.get(), term));
engine.rollTranslogGeneration();
for (Engine.Operation op : ops) {
// We need to simulate a rollback here as only ops after local checkpoint get into the engine
if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) {
engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1));
engine.rollTranslogGeneration();
}
for (Engine.Operation op : operations) {
// Engine skips deletes or indexes below the local checkpoint
if (engine.getLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) {
seqNoToTerm.put(op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
}
latestOperations.put(op.seqNo(), op.primaryTerm());
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
totalOps += ((Engine.Index) op).docs().size();
} else {
totalOps++;
}
totalOps++;
}
applyOperation(engine, op);
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
}
}
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
Expand All @@ -188,9 +186,9 @@ public void testDedupByPrimaryTerm() throws Exception {
searcher = null;
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size()));
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
} finally {
IOUtils.close(searcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;

import java.util.ArrayList;
Expand Down Expand Up @@ -266,35 +263,6 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte
thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<CountedBitSet>>() {
@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}

@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}

public void testContains() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -316,18 +316,17 @@ protected static ParsedDocument testParsedDocument(
mappingUpdate);
}

public static CheckedFunction<String, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
public static CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
final MapperService mapperService = createMapperService("type");
final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject()
.endObject().endObject());
final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping));
return docId -> {
return (docId, nestedFieldValues) -> {
final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value");
final int nestedValues = between(0, 3);
if (nestedValues > 0) {
if (nestedFieldValues > 0) {
XContentBuilder nestedField = source.startObject("nested_field");
for (int i = 0; i < nestedValues; i++) {
for (int i = 0; i < nestedFieldValues; i++) {
nestedField.field("field-" + i, "value-" + i);
}
source.endObject();
Expand Down Expand Up @@ -718,22 +717,36 @@ public static List<Engine.Operation> generateSingleDocHistory(
return ops;
}

public List<Engine.Operation> generateReplicaHistory(int numOps, boolean allowGapInSeqNo) {
public List<Engine.Operation> generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate,
boolean includeNestedDocs) throws Exception {
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
final int maxIdValue = randomInt(numOps * 2);
final List<Engine.Operation> operations = new ArrayList<>(numOps);
CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory = nestedParsedDocFactory();
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 100));
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
-1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
final String id = Integer.toString(randomInt(maxIdValue));
final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values());
final boolean isNestedDoc = includeNestedDocs && opType == Engine.Operation.TYPE.INDEX && randomBoolean();
final int nestedValues = between(0, 3);
final long startTime = threadPool.relativeTimeInMillis();
final int copies = allowDuplicate && rarely() ? between(2, 4) : 1;
for (int copy = 0; copy < copies; copy++) {
final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null);
switch (opType) {
case INDEX:
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime, -1, true));
break;
case DELETE:
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime));
break;
case NO_OP:
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i));
break;
default:
throw new IllegalStateException("Unknown operation type [" + opType + "]");
}
}
seqNo++;
if (allowGapInSeqNo && rarely()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -567,12 +567,12 @@ public void testProcessOnceOnPrimary() throws Exception {
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
final CheckedFunction<String, ParsedDocument, IOException> nestedDocFactory = EngineTestCase.nestedParsedDocFactory();
final CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedDocFunc = EngineTestCase.nestedParsedDocFactory();
int numOps = between(10, 100);
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String docId = Integer.toString(between(1, 100));
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId);
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3));
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
Expand Down

0 comments on commit 7dd6e58

Please sign in to comment.