Skip to content

Commit

Permalink
Trim translog when safe commit advanced (#32967)
Browse files Browse the repository at this point in the history
Since #28140 when the global checkpoint is advanced, we try to move the
safe commit forward, and clean up old index commits if possible. However,
we forget to trim unreferenced translog.

This change makes sure that we prune both old translog and index commits
when the safe commit advanced.

Relates #28140
Closes #32089
  • Loading branch information
dnhatn authored and jasontedor committed Aug 21, 2018
1 parent 260d098 commit 55cff96
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ public Translog.Location getTranslogLastWriteLocation() {
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
translog.trimUnreferencedReaders();
}
}

Expand Down Expand Up @@ -1949,6 +1950,8 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
// Here we don't have to trim translog because snapshotting an index commit
// does not lock translog or prevents unreferenced files from trimming.
indexWriter.deleteUnusedFiles();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4465,13 +4465,18 @@ public void testAcquireIndexCommit() throws Exception {

public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",
Settings.builder().put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1)
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1).build());
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
try (Store store = createStore();
InternalEngine engine =
createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
if (rarely()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
Expand All @@ -4485,6 +4490,7 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
assertThat(engine.estimateTranslogOperationsFromMinSeq(0L), equalTo(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void testTranslogHistoryTransferred() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32089")
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
Expand Down

0 comments on commit 55cff96

Please sign in to comment.