diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 5fbf9e69b2400..77bc5f60968d0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -84,17 +84,17 @@ public void onCommit(List commits) throws IOException { this.safeCommitInfo = SafeCommitInfo.EMPTY; this.lastCommit = commits.get(commits.size() - 1); this.safeCommit = commits.get(keptPosition); - if (keptPosition == commits.size() - 1) { - this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE; - } else { - this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); - } for (int i = 0; i < keptPosition; i++) { if (snapshottedCommits.containsKey(commits.get(i)) == false) { deleteCommit(commits.get(i)); } } updateRetentionPolicy(); + if (keptPosition == commits.size() - 1) { + this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE; + } else { + this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + } safeCommit = this.safeCommit; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b004cf8776409..58cb051d0da16 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1181,6 +1181,45 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L)); } + public void testSyncTranslogConcurrently() throws Exception { + IOUtils.close(engine, store); + final Path translogPath = createTempDir(); + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get)); + List ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean()); + applyOperations(engine, ops); + engine.flush(true, true); + final CheckedRunnable checker = () -> { + assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + SequenceNumbers.CommitInfo commitInfo = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet()); + assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint())); + } + }; + final Thread[] threads = new Thread[randomIntBetween(2, 4)]; + final Phaser phaser = new Phaser(threads.length); + globalCheckpoint.set(engine.getProcessedLocalCheckpoint()); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + try { + engine.syncTranslog(); + checker.run(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + checker.run(); + } + public void testSyncedFlush() throws IOException { try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null)) {