diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index a2452d4b53eb9..17ec9a172e384 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -46,7 +46,6 @@ final class SoftDeletesPolicy { private long retentionOperations; // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; - private Collection retentionLeases; // provides the retention leases used to calculate the minimum sequence number to retain private final Supplier> retentionLeasesSupplier; @@ -59,7 +58,6 @@ final class SoftDeletesPolicy { this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); - retentionLeases = retentionLeasesSupplier.get(); this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.retentionLockCount = 0; } @@ -113,6 +111,11 @@ synchronized long getMinRetainedSeqNo() { } public synchronized Tuple> getRetentionPolicy() { + /* + * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is + * locked for peer recovery. + */ + final Collection retentionLeases = retentionLeasesSupplier.get(); // do not advance if the retention lock is held if (retentionLockCount == 0) { /* @@ -126,7 +129,6 @@ public synchronized Tuple> getRetentionPolicy() */ // calculate the minimum sequence number to retain based on retention leases - retentionLeases = retentionLeasesSupplier.get(); final long minimumRetainingSequenceNumber = retentionLeases .stream() .mapToLong(RetentionLease::retainingSequenceNumber) diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index e15372d687e55..acad319d94ccc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -24,18 +24,21 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.seqno.RetentionLease; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -46,7 +49,7 @@ public class SoftDeletesPolicyTests extends ESTestCase { */ public void testSoftDeletesRetentionLock() { long retainedOps = between(0, 10000); - AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)]; for (int i = 0; i < retainingSequenceNumbers.length; i++) { retainingSequenceNumbers[i] = new AtomicLong(); @@ -116,4 +119,23 @@ public void testSoftDeletesRetentionLock() { assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } + public void testAlwaysFetchLatestRetentionLeases() { + final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); + final Collection leases = new ArrayList<>(); + final int numLeases = randomIntBetween(0, 10); + for (int i = 0; i < numLeases; i++) { + leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test")); + } + final Supplier> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases)); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier); + if (randomBoolean()) { + policy.acquireRetentionLock(); + } + if (numLeases == 0) { + assertThat(policy.getRetentionPolicy().v2(), empty()); + } else { + assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0]))); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index 98367c0e97188..d009486778d89 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -70,8 +71,11 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final String source = randomAlphaOfLength(8); final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + // simulate a peer-recovery which locks the soft-deletes policy on the primary. + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); + retentionLock.close(); // check retention leases have been committed on the primary final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(