From 3bfcc60cce6643d883d838c5795c02924c2f08e5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 7 Apr 2020 11:14:25 -0400 Subject: [PATCH] Update translog policy before the next safe commit (#54839) IndexShardIT#testMaybeFlush relies on the assumption that the safe commit and translog deletion policy have advanced after IndexShard#sync returns . This assumption does not hold if there's a race with the global checkpoint sync. Closes #52223 --- .../index/engine/CombinedDeletionPolicy.java | 10 ++--- .../index/engine/InternalEngineTests.java | 39 +++++++++++++++++++ .../index/shard/IndexShardIT.java | 4 -- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 5fbf9e69b2400..77bc5f60968d0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -84,17 +84,17 @@ public void onCommit(List commits) throws IOException { this.safeCommitInfo = SafeCommitInfo.EMPTY; this.lastCommit = commits.get(commits.size() - 1); this.safeCommit = commits.get(keptPosition); - if (keptPosition == commits.size() - 1) { - this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE; - } else { - this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); - } for (int i = 0; i < keptPosition; i++) { if (snapshottedCommits.containsKey(commits.get(i)) == false) { deleteCommit(commits.get(i)); } } updateRetentionPolicy(); + if (keptPosition == commits.size() - 1) { + this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE; + } else { + this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + } safeCommit = this.safeCommit; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5358f63325f96..103c7f2f5e2ab 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -977,6 +977,45 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L)); } + public void testSyncTranslogConcurrently() throws Exception { + IOUtils.close(engine, store); + final Path translogPath = createTempDir(); + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get)); + List ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean()); + applyOperations(engine, ops); + engine.flush(true, true); + final CheckedRunnable checker = () -> { + assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + SequenceNumbers.CommitInfo commitInfo = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet()); + assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint())); + } + }; + final Thread[] threads = new Thread[randomIntBetween(2, 4)]; + final Phaser phaser = new Phaser(threads.length); + globalCheckpoint.set(engine.getProcessedLocalCheckpoint()); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + try { + engine.syncTranslog(); + checker.run(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + checker.run(); + } + public void testSyncedFlushSurvivesEngineRestart() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); IOUtils.close(store, engine); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 95c3002157d7b..29804df62fa9e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -81,7 +81,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.junit.Assert; import java.io.IOException; @@ -319,9 +318,6 @@ public void testIndexCanChangeCustomDataPath() throws Exception { assertPathHasBeenCleared(newIndexDataPath.toAbsolutePath()); } - @TestIssueLogging( - value = "org.elasticsearch.index.engine:DEBUG", - issueUrl = "https://github.com/elastic/elasticsearch/issues/52223") public void testMaybeFlush() throws Exception { createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) .build());