From b50bc3dfbde4a68960fa94b7f31b0107ed5854c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 21 Jul 2021 14:57:43 +0200 Subject: [PATCH 1/3] Fix FollowingEngineTests#testOptimizeMultipleVersions In certain concurrent indexing scenarios where there are deletes executed and then a new indexing operation, the following engine considers those as updates breaking one of the assumed invariants. Closes #72527 --- .../ccr/index/engine/FollowingEngine.java | 17 ++--- .../index/engine/FollowingEngineTests.java | 70 ++++++++++++++++++- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 45927cca81817..7c8ce1456802e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -17,7 +17,6 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; -import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.VersionType; @@ -119,13 +118,15 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { @Override protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { - if (Assertions.ENABLED) { - final long localCheckpoint = getProcessedLocalCheckpoint(); - final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); - assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo : - "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo; - } - super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code + // In some scenarios it is possible to advance maxSeqNoOfUpdatesOrDeletes over the leader + // maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively a it is a replica) we don't check if the previous version + // was a delete and it's possible to consider it as an update, advancing the max sequence number + // over the leader maxSeqNoOfUpdatesOrDeletes. + // See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details. + + // The goal of this marker it's just an optimization and it won't affect the correctness or durability + // of the indexed documents. + super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e0d43cab06cbb..4628ab7cfb2f4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -18,9 +18,9 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -35,6 +35,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -55,6 +56,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -291,12 +293,21 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin) { final long version = randomBoolean() ? 1 : randomNonNegativeLong(); + return indexForFollowing(id, seqNo, origin, version); + } + + private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) { final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null); return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } + private Engine.Delete deleteForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) { + return IndexShard.prepareDelete(id, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, + origin, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + } + private Engine.Index indexForPrimary(String id) { final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null); return new Engine.Index(EngineTestCase.newUid(parsedDoc), primaryTerm.get(), parsedDoc); @@ -450,6 +461,63 @@ public void testOptimizeSingleDocConcurrently() throws Exception { }); } + public void testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates() throws Exception { + // See #72527 for more details + Settings leaderSettings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .build(); + + Settings followerSettings = Settings.builder() + .put(leaderSettings) + .put("index.xpack.ccr.following_index", true) + .build(); + + IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); + IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, leaderSettings); + try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { + EngineConfig followerConfig = engineConfig( + shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(followerStore, followerConfig)) { + final long leaderMaxSeqNoOfUpdatesOnPrimary = 3; + followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(leaderMaxSeqNoOfUpdatesOnPrimary); + + followingEngine.index(indexForFollowing("1", 0, Engine.Operation.Origin.PRIMARY, 1)); + followingEngine.delete(deleteForFollowing("1", 1, Engine.Operation.Origin.PRIMARY, 2)); + followingEngine.index(indexForFollowing("2", 2, Engine.Operation.Origin.PRIMARY, 1)); + + assertBusy(() -> assertThat(followingEngine.getProcessedLocalCheckpoint(), equalTo(2L))); + + CyclicBarrier barrier = new CyclicBarrier(3); + Thread thread1 = new Thread(() -> { + try { + barrier.await(); + followingEngine.delete(deleteForFollowing("2", 3, Engine.Operation.Origin.PRIMARY, 2)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread thread2 = new Thread(() -> { + try { + barrier.await(); + followingEngine.index(indexForFollowing("1", 4, Engine.Operation.Origin.PRIMARY, 3)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + thread1.start(); + thread2.start(); + barrier.await(); + thread1.join(); + thread2.join(); + + assertThat(followingEngine.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leaderMaxSeqNoOfUpdatesOnPrimary)); + } + } + } + private void runFollowTest(CheckedBiConsumer task) throws Exception { final CheckedBiConsumer wrappedTask = (leader, follower) -> { Thread[] threads = new Thread[between(1, 8)]; From b583c5b2cfb23fb8a6ac35f7284921f9c5fc7216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 31 Aug 2021 16:47:26 +0200 Subject: [PATCH 2/3] Review comments --- .../index/engine/InternalEngine.java | 10 ++-- .../ccr/index/engine/FollowingEngine.java | 30 ++++++++---- .../index/engine/FollowingEngineTests.java | 48 ++++++++++++++----- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 629e6bd68a9b5..1414d23a0dd3a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -832,7 +832,11 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { return doGenerateSeqNoForOperation(operation); } - protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { + advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); + } + + protected void advanceMaxSeqNoOfDeleteOnPrimary(long seqNo) { advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); } @@ -900,7 +904,7 @@ public IndexResult index(Index index) throws IOException { final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); + advanceMaxSeqNoOfUpdateOnPrimary(index.seqNo()); } } else { markSeqNoAsSeen(index.seqNo()); @@ -1276,7 +1280,7 @@ public DeleteResult delete(Delete delete) throws IOException { delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm()); - advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo()); + advanceMaxSeqNoOfDeleteOnPrimary(delete.seqNo()); } else { markSeqNoAsSeen(delete.seqNo()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 7c8ce1456802e..2b336f6ebdb32 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -17,6 +17,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.VersionType; @@ -34,7 +35,7 @@ /** * An engine implementation for following shards. */ -public final class FollowingEngine extends InternalEngine { +public class FollowingEngine extends InternalEngine { /** @@ -117,16 +118,27 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { } @Override - protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfDeleteOnPrimary(long seqNo) { + if (Assertions.ENABLED) { + final long localCheckpoint = getProcessedLocalCheckpoint(); + final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); + assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo : + "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo; + } + + super.advanceMaxSeqNoOfDeleteOnPrimary(seqNo); + } + + @Override + protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { // In some scenarios it is possible to advance maxSeqNoOfUpdatesOrDeletes over the leader - // maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively a it is a replica) we don't check if the previous version - // was a delete and it's possible to consider it as an update, advancing the max sequence number - // over the leader maxSeqNoOfUpdatesOrDeletes. - // See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details. + // maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively it is a replica) we don't check if the previous version + // was a delete and it's possible to consider it as an update, advancing the max sequence number over the leader + // maxSeqNoOfUpdatesOrDeletes. + // The goal of this marker it's just an optimization and it won't affect the correctness or durability of the indexed documents. - // The goal of this marker it's just an optimization and it won't affect the correctness or durability - // of the indexed documents. - super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); + // See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details. + super.advanceMaxSeqNoOfUpdateOnPrimary(seqNo); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 4628ab7cfb2f4..017ee605d0464 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -461,39 +461,61 @@ public void testOptimizeSingleDocConcurrently() throws Exception { }); } - public void testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates() throws Exception { + public void testConcurrentIndexOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates() throws Exception { // See #72527 for more details - Settings leaderSettings = Settings.builder() + Settings followerSettings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .put("index.version.created", Version.CURRENT) - .build(); - - Settings followerSettings = Settings.builder() - .put(leaderSettings) .put("index.xpack.ccr.following_index", true) .build(); IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); - IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, leaderSettings); + IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, Settings.EMPTY); try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { - EngineConfig followerConfig = engineConfig( - shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry()); - try (FollowingEngine followingEngine = createEngine(followerStore, followerConfig)) { + EngineConfig followerConfig = + engineConfig(shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry()); + followerStore.createEmpty(); + String translogUuid = Translog.createEmptyTranslog(followerConfig.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + 1L + ); + followerStore.associateIndexWithNewTranslog(translogUuid); + CountDownLatch concurrentDeleteOpLatch = new CountDownLatch(1); + final long indexNewDocWithSameIdSeqNo = 4; + FollowingEngine followingEngine = new FollowingEngine(followerConfig) { + @Override + protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { + if (seqNo == indexNewDocWithSameIdSeqNo) { + // wait until the concurrent delete finishes meaning that processedLocalCheckpoint == maxSeqNoOfUpdatesOrDeletes + try { + concurrentDeleteOpLatch.await(); + assertThat(getProcessedLocalCheckpoint(), equalTo(getMaxSeqNoOfUpdatesOrDeletes())); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + super.advanceMaxSeqNoOfUpdateOnPrimary(seqNo); + } + }; + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), followerConfig.getIndexSettings()); + followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + try (followingEngine) { final long leaderMaxSeqNoOfUpdatesOnPrimary = 3; followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(leaderMaxSeqNoOfUpdatesOnPrimary); followingEngine.index(indexForFollowing("1", 0, Engine.Operation.Origin.PRIMARY, 1)); followingEngine.delete(deleteForFollowing("1", 1, Engine.Operation.Origin.PRIMARY, 2)); followingEngine.index(indexForFollowing("2", 2, Engine.Operation.Origin.PRIMARY, 1)); - - assertBusy(() -> assertThat(followingEngine.getProcessedLocalCheckpoint(), equalTo(2L))); + assertThat(followingEngine.getProcessedLocalCheckpoint(), equalTo(2L)); CyclicBarrier barrier = new CyclicBarrier(3); Thread thread1 = new Thread(() -> { try { barrier.await(); followingEngine.delete(deleteForFollowing("2", 3, Engine.Operation.Origin.PRIMARY, 2)); + concurrentDeleteOpLatch.countDown(); } catch (Exception e) { throw new RuntimeException(e); } @@ -501,7 +523,7 @@ public void testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates Thread thread2 = new Thread(() -> { try { barrier.await(); - followingEngine.index(indexForFollowing("1", 4, Engine.Operation.Origin.PRIMARY, 3)); + followingEngine.index(indexForFollowing("1", indexNewDocWithSameIdSeqNo, Engine.Operation.Origin.PRIMARY, 3)); } catch (Exception e) { throw new RuntimeException(e); } From ed4451dd457c4827c5f1e68b6c30537d9259d676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 1 Sep 2021 17:44:41 +0200 Subject: [PATCH 3/3] Renaming --- .../org/elasticsearch/index/engine/InternalEngine.java | 8 ++++---- .../xpack/ccr/index/engine/FollowingEngine.java | 10 +++++----- .../xpack/ccr/index/engine/FollowingEngineTests.java | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1414d23a0dd3a..ebcdcd742f430 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -832,11 +832,11 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { return doGenerateSeqNoForOperation(operation); } - protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); } - protected void advanceMaxSeqNoOfDeleteOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) { advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); } @@ -904,7 +904,7 @@ public IndexResult index(Index index) throws IOException { final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; if (toAppend == false) { - advanceMaxSeqNoOfUpdateOnPrimary(index.seqNo()); + advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo()); } } else { markSeqNoAsSeen(index.seqNo()); @@ -1280,7 +1280,7 @@ public DeleteResult delete(Delete delete) throws IOException { delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm()); - advanceMaxSeqNoOfDeleteOnPrimary(delete.seqNo()); + advanceMaxSeqNoOfDeletesOnPrimary(delete.seqNo()); } else { markSeqNoAsSeen(delete.seqNo()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 2b336f6ebdb32..772d9c6c86493 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -118,7 +118,7 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { } @Override - protected void advanceMaxSeqNoOfDeleteOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) { if (Assertions.ENABLED) { final long localCheckpoint = getProcessedLocalCheckpoint(); final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); @@ -126,19 +126,19 @@ protected void advanceMaxSeqNoOfDeleteOnPrimary(long seqNo) { "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo; } - super.advanceMaxSeqNoOfDeleteOnPrimary(seqNo); + super.advanceMaxSeqNoOfDeletesOnPrimary(seqNo); } @Override - protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { // In some scenarios it is possible to advance maxSeqNoOfUpdatesOrDeletes over the leader // maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively it is a replica) we don't check if the previous version // was a delete and it's possible to consider it as an update, advancing the max sequence number over the leader // maxSeqNoOfUpdatesOrDeletes. - // The goal of this marker it's just an optimization and it won't affect the correctness or durability of the indexed documents. + // We conservatively advance the seqno in this case, accepting a minor performance hit in this edge case. // See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details. - super.advanceMaxSeqNoOfUpdateOnPrimary(seqNo); + super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 017ee605d0464..f2cd8a103b998 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -486,7 +486,7 @@ public void testConcurrentIndexOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates( final long indexNewDocWithSameIdSeqNo = 4; FollowingEngine followingEngine = new FollowingEngine(followerConfig) { @Override - protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { if (seqNo == indexNewDocWithSameIdSeqNo) { // wait until the concurrent delete finishes meaning that processedLocalCheckpoint == maxSeqNoOfUpdatesOrDeletes try { @@ -496,7 +496,7 @@ protected void advanceMaxSeqNoOfUpdateOnPrimary(long seqNo) { throw new RuntimeException(exception); } } - super.advanceMaxSeqNoOfUpdateOnPrimary(seqNo); + super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo); } }; TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), followerConfig.getIndexSettings());