diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index afc4cca0531d9..e383c533bf83b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -168,8 +168,10 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { /** * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. + * + * @return true if the snapshotting commit can be clean up. */ - synchronized void releaseCommit(final IndexCommit snapshotCommit) { + synchronized boolean releaseCommit(final IndexCommit snapshotCommit) { final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate; assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" + "snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]"; @@ -178,6 +180,8 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) { if (refCount == 0) { snapshottedCommits.remove(releasingCommit); } + // The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit. + return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index eee4f69a9a514..a33fac944dee7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1769,13 +1769,21 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En logger.trace("finish flush for snapshot"); } final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); - return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit)); + return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); - return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit)); + return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); + } + + private void releaseIndexCommit(IndexCommit snapshot) throws IOException { + // Revisit the deletion policy if we can clean up the snapshotting commit. + if (combinedDeletionPolicy.releaseCommit(snapshot)) { + ensureOpen(); + indexWriter.deleteUnusedFiles(); + } } private boolean failOnTragicEvent(AlreadyClosedException ex) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 8e79859275070..3cf8d767501d2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -132,10 +132,15 @@ public void testAcquireIndexCommit() throws Exception { assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData())); } } - randomSubsetOf(snapshottingCommits).forEach(snapshot -> { + final List releasingSnapshots = randomSubsetOf(snapshottingCommits); + for (IndexCommit snapshot : releasingSnapshots) { snapshottingCommits.remove(snapshot); - indexPolicy.releaseCommit(snapshot); - }); + final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count(); + final IndexCommit lastCommit = commitList.get(commitList.size() - 1); + final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); + assertThat(indexPolicy.releaseCommit(snapshot), + equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false)); + } // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e51a67f10c05c..f115ee8c58d76 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2097,9 +2097,9 @@ public void testSeqNoAndCheckpoints() throws IOException { // this test writes documents to the engine while concurrently flushing/commit // and ensuring that the commit points contain the correct sequence number data public void testConcurrentWritesAndCommits() throws Exception { - List commits = new ArrayList<>(); try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { + final List commits = new ArrayList<>(); final int numIndexingThreads = scaledRandomIntBetween(2, 4); final int numDocsPerThread = randomIntBetween(500, 1000); @@ -2184,8 +2184,6 @@ public void testConcurrentWritesAndCommits() throws Exception { prevLocalCheckpoint = localCheckpoint; prevMaxSeqNo = maxSeqNo; } - } finally { - IOUtils.close(commits); } } @@ -4474,6 +4472,37 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } } + public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { + IOUtils.close(engine, store); + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { + final int numDocs = scaledRandomIntBetween(10, 100); + for (int docId = 0; docId < numDocs; docId++) { + index(engine, docId); + if (frequently()) { + engine.flush(randomBoolean(), randomBoolean()); + } + } + engine.flush(false, randomBoolean()); + int numSnapshots = between(1, 10); + final List snapshots = new ArrayList<>(); + for (int i = 0; i < numSnapshots; i++) { + snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. + } + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + engine.syncTranslog(); + final List commits = DirectoryReader.listCommits(store.directory()); + for (int i = 0; i < numSnapshots - 1; i++) { + snapshots.get(i).close(); + // pending snapshots - should not release any commit. + assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits)); + } + snapshots.get(numSnapshots - 1).close(); // release the last snapshot - delete all except the last commit + assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1)); + } + } + public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); int numDocs = between(10, 100);