diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 70cbebb774e02..0fda2f04ac5a4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -136,6 +136,7 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); /** @@ -186,7 +187,7 @@ public InternalEngine(EngineConfig engineConfig) { this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint, startingCommit); writer = createWriter(startingCommit); - updateMaxUnsafeAutoIdTimestampFromWriter(writer); + bootstrapAppendOnlyInfoFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; @@ -345,15 +346,20 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { } } - private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { - long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; + private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { - if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { - commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue()); - break; + final String key = entry.getKey(); + if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + assert maxUnsafeAutoIdTimestamp.get() == -1 : + "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; + maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue())); + } + if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { + assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : + "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]"; + maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); } } - maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } @Override @@ -803,11 +809,24 @@ public IndexResult index(Index index) throws IOException { private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { final IndexingStrategy plan; - if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { - // no need to deal with out of order delivery - we never saw this one + final boolean appendOnlyRequest = canOptimizeAddDocument(index); + if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { + /* + * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue + * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before + * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. + * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen + * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only + * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. + */ assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { + if (appendOnlyRequest == false) { + maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); + assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; + } versionMap.enforceSafeAccess(); // drop out of order operations assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : @@ -942,6 +961,11 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } + // for testing + long getMaxSeqNoOfNonAppendOnlyOperations() { + return maxSeqNoOfNonAppendOnlyOperations.get(); + } + private static void index(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -1097,6 +1121,9 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + delete.versionType() + "]"; + maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); + assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b0d701bdfdaa1..e022330b664c3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4530,4 +4530,96 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } } + public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { + IOUtils.close(engine, store); + store = createStore(); + final Path translogPath = createTempDir(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { + final CountDownLatch latch = new CountDownLatch(1); + final Thread appendOnlyIndexer = new Thread(() -> { + try { + latch.countDown(); + final int numDocs = scaledRandomIntBetween(100, 1000); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); + if (randomBoolean()) { + engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo())); + } else { + engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); + } + } + } catch (Exception ex) { + throw new RuntimeException("Failed to index", ex); + } + }); + appendOnlyIndexer.setName("append-only indexer"); + appendOnlyIndexer.start(); + latch.await(); + long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED; + final int numOps = scaledRandomIntBetween(100, 1000); + for (int i = 0; i < numOps; i++) { + ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); + if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations + final long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); + if (randomBoolean()) { + engine.index(doc); + } else { + engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), + doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis())); + } + maxSeqNoOfNonAppendOnly = seqno; + } else { // On primary - do not update max_seqno for non-append-only operations + if (randomBoolean()) { + engine.index(indexForDoc(parsedDocument)); + } else { + engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id()))); + } + } + } + appendOnlyIndexer.join(120_000); + assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + engine.syncTranslog(); + engine.flush(); + } + try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { + assertThat("max_seqno from non-append-only was not bootstrap from the safe commit", + engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get())); + } + } + + public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { + long lookupTimes = 0L; + final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker(); + final int initDocs = between(0, 10); + for (int i = 0; i < initDocs; i++) { + index(engine, i); + lookupTimes++; + } + // doc1 is delayed and arrived after a non-append-only op. + final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo(); + final long seqnoNormalOp = localCheckpointTracker.generateSeqNo(); + if (randomBoolean()) { + engine.index(replicaIndexForDoc( + testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); + } else { + engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); + } + lookupTimes++; + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); + + // should not optimize for doc1 and process as a regular doc (eg. look up in version map) + engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), + false, randomNonNegativeLong(), seqNoAppendOnly1)); + lookupTimes++; + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + + // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. + engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null), + false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo())); + assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 4762c23319a4a..baa56ee9585f6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -419,6 +420,27 @@ private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) { shard.onSettingsChanged(); } + /** + * This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation + * of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and + * deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request. + */ + public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + // Append-only request - without id + final BulkShardRequest indexRequest = indexOnPrimary( + new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary); + final String docId = Iterables.get(getShardDocUIDs(primary), 0); + final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary); + deleteOnReplica(deleteRequest, shards, replica); + indexOnReplica(indexRequest, shards, replica); + shards.assertAllEqual(0); + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage;