Skip to content

Commit

Permalink
Revisit deletion policy after release the last snapshot (#28627)
Browse files Browse the repository at this point in the history
We currently revisit the index deletion policy whenever the global
checkpoint has advanced enough. We should also revisit the deletion
policy after releasing the last snapshot of a snapshotting commit. With 
this change, the old index commits will be cleaned up as soon as
possible.

Follow-up of #28140
#28140 (comment)
  • Loading branch information
dnhatn committed Feb 19, 2018
1 parent 4ad361d commit 27eb1c4
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
Expand All @@ -178,6 +180,8 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1769,13 +1769,21 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
indexWriter.deleteUnusedFiles();
}
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,15 @@ public void testAcquireIndexCommit() throws Exception {
assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData()));
}
}
randomSubsetOf(snapshottingCommits).forEach(snapshot -> {
final List<IndexCommit> releasingSnapshots = randomSubsetOf(snapshottingCommits);
for (IndexCommit snapshot : releasingSnapshots) {
snapshottingCommits.remove(snapshot);
indexPolicy.releaseCommit(snapshot);
});
final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count();
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(indexPolicy.releaseCommit(snapshot),
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false));
}
// Snapshotting commits must not be deleted.
snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
// We don't need to retain translog for snapshotting commits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2097,9 +2097,9 @@ public void testSeqNoAndCheckpoints() throws IOException {
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
List<Engine.IndexCommitRef> commits = new ArrayList<>();
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
final List<Engine.IndexCommitRef> commits = new ArrayList<>();

final int numIndexingThreads = scaledRandomIntBetween(2, 4);
final int numDocsPerThread = randomIntBetween(500, 1000);
Expand Down Expand Up @@ -2184,8 +2184,6 @@ public void testConcurrentWritesAndCommits() throws Exception {
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
} finally {
IOUtils.close(commits);
}
}

Expand Down Expand Up @@ -4474,6 +4472,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
}
}

public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
int numSnapshots = between(1, 10);
final List<Engine.IndexCommitRef> snapshots = new ArrayList<>();
for (int i = 0; i < numSnapshots; i++) {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
}
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.syncTranslog();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
for (int i = 0; i < numSnapshots - 1; i++) {
snapshots.get(i).close();
// pending snapshots - should not release any commit.
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
}
snapshots.get(numSnapshots - 1).close(); // release the last snapshot - delete all except the last commit
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1));
}
}

public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
int numDocs = between(10, 100);
Expand Down

0 comments on commit 27eb1c4

Please sign in to comment.