Skip to content

Commit

Permalink
Update translog policy before the next safe commit (#54839)
Browse files Browse the repository at this point in the history
IndexShardIT#testMaybeFlush relies on the assumption that the safe commit 
and translog deletion policy have advanced after IndexShard#sync returns . 
This assumption does not hold if there's a race with the global checkpoint sync.
Closes #52223
  • Loading branch information
dnhatn authored Apr 7, 2020
1 parent b093353 commit 3bfcc60
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
this.safeCommitInfo = SafeCommitInfo.EMPTY;
this.lastCommit = commits.get(commits.size() - 1);
this.safeCommit = commits.get(keptPosition);
if (keptPosition == commits.size() - 1) {
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
} else {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
}
}
updateRetentionPolicy();
if (keptPosition == commits.size() - 1) {
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
} else {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
safeCommit = this.safeCommit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,45 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException {
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
}

public void testSyncTranslogConcurrently() throws Exception {
IOUtils.close(engine, store);
final Path translogPath = createTempDir();
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get));
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean());
applyOperations(engine, ops);
engine.flush(true, true);
final CheckedRunnable<IOException> checker = () -> {
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
SequenceNumbers.CommitInfo commitInfo =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet());
assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint()));
}
};
final Thread[] threads = new Thread[randomIntBetween(2, 4)];
final Phaser phaser = new Phaser(threads.length);
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
try {
engine.syncTranslog();
checker.run();
} catch (IOException e) {
throw new AssertionError(e);
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
checker.run();
}

public void testSyncedFlushSurvivesEngineRestart() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
IOUtils.close(store, engine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.junit.Assert;

import java.io.IOException;
Expand Down Expand Up @@ -319,9 +318,6 @@ public void testIndexCanChangeCustomDataPath() throws Exception {
assertPathHasBeenCleared(newIndexDataPath.toAbsolutePath());
}

@TestIssueLogging(
value = "org.elasticsearch.index.engine:DEBUG",
issueUrl = "https://github.com/elastic/elasticsearch/issues/52223")
public void testMaybeFlush() throws Exception {
createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.build());
Expand Down

0 comments on commit 3bfcc60

Please sign in to comment.