From ca028462ea8b75c253a8bcb93ffefc2c49fa3b1a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Jan 2019 12:11:21 -0500 Subject: [PATCH 1/4] Soft-deletes policy should always fetch latest leases If a new retention lease is added while a primary's soft-deletes policy is locked for peer-recovery, that lease won't be baked into Lucene commit. --- .../index/engine/SoftDeletesPolicy.java | 6 +++--- .../index/engine/SoftDeletesPolicyTests.java | 18 +++++++++++++++++- .../index/seqno/RetentionLeaseSyncIT.java | 4 ++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index a2452d4b53eb9..ed5f4ff736665 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -46,7 +46,6 @@ final class SoftDeletesPolicy { private long retentionOperations; // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; - private Collection retentionLeases; // provides the retention leases used to calculate the minimum sequence number to retain private final Supplier> retentionLeasesSupplier; @@ -59,7 +58,6 @@ final class SoftDeletesPolicy { this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); - retentionLeases = retentionLeasesSupplier.get(); this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.retentionLockCount = 0; } @@ -113,6 +111,9 @@ synchronized long getMinRetainedSeqNo() { } public synchronized Tuple> getRetentionPolicy() { + // when an engine is flushed, we need to provide it the latest collection of retention leases + // even when the soft-deletes policy is locked for peer-recovery. + final Collection retentionLeases = retentionLeasesSupplier.get(); // do not advance if the retention lock is held if (retentionLockCount == 0) { /* @@ -126,7 +127,6 @@ public synchronized Tuple> getRetentionPolicy() */ // calculate the minimum sequence number to retain based on retention leases - retentionLeases = retentionLeasesSupplier.get(); final long minimumRetainingSequenceNumber = retentionLeases .stream() .mapToLong(RetentionLease::retainingSequenceNumber) diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 310e83e9d2cef..99c12284b4a19 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -30,12 +30,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -46,7 +48,7 @@ public class SoftDeletesPolicyTests extends ESTestCase { */ public void testSoftDeletesRetentionLock() { long retainedOps = between(0, 10000); - AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)]; for (int i = 0; i < retainingSequenceNumbers.length; i++) { retainingSequenceNumbers[i] = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); @@ -116,4 +118,18 @@ public void testSoftDeletesRetentionLock() { assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } + public void testAlwaysFetchLatestRetentionLeases() { + final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); + final Set leases = new HashSet<>(); + final int numLeases = between(0, 10); + for (int i = 0; i < numLeases; i++) { + leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(NO_OPS_PERFORMED, 1000), randomNonNegativeLong(), "test")); + } + final Supplier> leasesSupplier = () -> Collections.unmodifiableSet(new HashSet<>(leases)); + final SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 1000), between(0, 1000), leasesSupplier); + if (randomBoolean()) { + policy.acquireRetentionLock(); + } + assertThat(policy.getRetentionPolicy().v2(), equalTo(leases)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index 7d6e5fa2dc5a6..f1cc26ad3927d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -71,8 +72,11 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final String source = randomAlphaOfLength(8); final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + // simulate a peer-recovery which locks the soft-deletes policy on the primary. + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); + retentionLock.close(); // check retention leases have been committed on the primary final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( From 1636cb37efc6ee6da3466ea1b4da46ec34af37a5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 28 Jan 2019 22:43:17 -0500 Subject: [PATCH 2/4] Style iteration --- .../index/engine/SoftDeletesPolicy.java | 6 ++++-- .../index/engine/SoftDeletesPolicyTests.java | 17 ++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index ed5f4ff736665..17ec9a172e384 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -111,8 +111,10 @@ synchronized long getMinRetainedSeqNo() { } public synchronized Tuple> getRetentionPolicy() { - // when an engine is flushed, we need to provide it the latest collection of retention leases - // even when the soft-deletes policy is locked for peer-recovery. + /* + * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is + * locked for peer recovery. + */ final Collection retentionLeases = retentionLeasesSupplier.get(); // do not advance if the retention lock is held if (retentionLockCount == 0) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 99c12284b4a19..0e66a3b16a253 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -38,6 +38,8 @@ import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -120,16 +122,21 @@ public void testSoftDeletesRetentionLock() { public void testAlwaysFetchLatestRetentionLeases() { final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); - final Set leases = new HashSet<>(); - final int numLeases = between(0, 10); + final Collection leases = new ArrayList<>(); + final int numLeases = randomIntBetween(0, 10); for (int i = 0; i < numLeases; i++) { leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(NO_OPS_PERFORMED, 1000), randomNonNegativeLong(), "test")); } - final Supplier> leasesSupplier = () -> Collections.unmodifiableSet(new HashSet<>(leases)); - final SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 1000), between(0, 1000), leasesSupplier); + final Supplier> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases)); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier); if (randomBoolean()) { policy.acquireRetentionLock(); } - assertThat(policy.getRetentionPolicy().v2(), equalTo(leases)); + if (numLeases == 0) { + assertThat(policy.getRetentionPolicy().v2(), empty()); + } else { + assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0]))); + } } } From c7666458f32232bf0d6e9438f9c5f1a852cdfb67 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 29 Jan 2019 16:55:20 -0500 Subject: [PATCH 3/4] adjust retaining_seq_no in test --- .../org/elasticsearch/index/engine/SoftDeletesPolicyTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 0e66a3b16a253..d1e8fcc41ca19 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -125,7 +125,7 @@ public void testAlwaysFetchLatestRetentionLeases() { final Collection leases = new ArrayList<>(); final int numLeases = randomIntBetween(0, 10); for (int i = 0; i < numLeases; i++) { - leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(NO_OPS_PERFORMED, 1000), randomNonNegativeLong(), "test")); + leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test")); } final Supplier> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases)); final SoftDeletesPolicy policy = From 38bff332cb3bf29390a29531a85b321cce92ac18 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 30 Jan 2019 20:03:48 -0500 Subject: [PATCH 4/4] Fix imports --- .../org/elasticsearch/index/engine/SoftDeletesPolicyTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 61489d3ef3743..acad319d94ccc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -24,7 +24,6 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.seqno.RetentionLease; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList;