Skip to content

Commit

Permalink
Do not optimize append-only if seen normal op with higher seqno (elas…
Browse files Browse the repository at this point in the history
…tic#28787)

When processing an append-only operation, primary knows that operations 
can only conflict with another instance of the same operation. This is
true as the id was freshly generated. However this property doesn't hold
for replicas. As soon as an auto-generated ID was indexed into the
primary, it can be exposed to a search and users can issue a follow up
operation on it. In extremely rare cases, the follow up operation can be
arrived and processed on a replica before the original append-only
request. In this case we can't simply proceed with the append-only
request and blindly add it to the index without consulting the version
map. 

The following scenario can cause difference between primary and
replica.

1. Primary indexes an auto-gen-id doc. (id=X, v=1, s#=20)
2. A refresh cycle happens on primary
3. The new doc is picked up and modified - say by a delete by query
   request - Primary gets a delete doc (id=X, v=2, s#=30)
4. Delete doc is processed first on the replica (id=X, v=2, s#=30)
5. Indexing operation arrives on the replica, since it's an auto-gen-id
   request and the retry marker is lower, we put it into lucene without 
   any check. Replica has a doc the primary doesn't have.

To deal with a potential conflict between an append-only operation and a 
normal operation on replicas, we need to rely on sequence numbers. This
commit maintains the max seqno of non-append-only operations on replica
then only apply optimization for an append-only operation only if its
seq# is higher than the seq# of all non-append-only.
  • Loading branch information
dnhatn authored Mar 26, 2018
1 parent 7bf9091 commit 0ac89a3
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
/**
Expand Down Expand Up @@ -186,7 +187,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
Expand Down Expand Up @@ -345,15 +346,20 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
break;
final String key = entry.getKey();
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
}
}
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

@Override
Expand Down Expand Up @@ -803,11 +809,24 @@ public IndexResult index(Index index) throws IOException {

private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
/*
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
Expand Down Expand Up @@ -942,6 +961,11 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
return mayHaveBeenIndexBefore;
}

// for testing
long getMaxSeqNoOfNonAppendOnlyOperations() {
return maxSeqNoOfNonAppendOnlyOperations.get();
}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
Expand Down Expand Up @@ -1097,6 +1121,9 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4530,4 +4530,96 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
}
}

public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
final CountDownLatch latch = new CountDownLatch(1);
final Thread appendOnlyIndexer = new Thread(() -> {
try {
latch.countDown();
final int numDocs = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) {
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
} else {
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
}
}
} catch (Exception ex) {
throw new RuntimeException("Failed to index", ex);
}
});
appendOnlyIndexer.setName("append-only indexer");
appendOnlyIndexer.start();
latch.await();
long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED;
final int numOps = scaledRandomIntBetween(100, 1000);
for (int i = 0; i < numOps; i++) {
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
if (randomBoolean()) {
engine.index(doc);
} else {
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
}
maxSeqNoOfNonAppendOnly = seqno;
} else { // On primary - do not update max_seqno for non-append-only operations
if (randomBoolean()) {
engine.index(indexForDoc(parsedDocument));
} else {
engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id())));
}
}
}
appendOnlyIndexer.join(120_000);
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
engine.flush();
}
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
assertThat("max_seqno from non-append-only was not bootstrap from the safe commit",
engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get()));
}
}

public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
long lookupTimes = 0L;
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
final int initDocs = between(0, 10);
for (int i = 0; i < initDocs; i++) {
index(engine, i);
lookupTimes++;
}
// doc1 is delayed and arrived after a non-append-only op.
final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo();
final long seqnoNormalOp = localCheckpointTracker.generateSeqNo();
if (randomBoolean()) {
engine.index(replicaIndexForDoc(
testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false));
} else {
engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong()));
}
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp));

// should not optimize for doc1 and process as a regular doc (eg. look up in version map)
engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), seqNoAppendOnly1));
lookupTimes++;
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));

// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -419,6 +420,27 @@ private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) {
shard.onSettingsChanged();
}

/**
* This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
* of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
* deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
*/
public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
// Append-only request - without id
final BulkShardRequest indexRequest = indexOnPrimary(
new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
final String docId = Iterables.get(getShardDocUIDs(primary), 0);
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
deleteOnReplica(deleteRequest, shards, replica);
indexOnReplica(indexRequest, shards, replica);
shards.assertAllEqual(0);
}
}

/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;
Expand Down

0 comments on commit 0ac89a3

Please sign in to comment.