Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft-deletes policy should always fetch latest leases #37940

Merged
merged 11 commits into from
Jan 31, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ final class SoftDeletesPolicy {
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private Collection<RetentionLease> retentionLeases;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;

Expand All @@ -59,7 +58,6 @@ final class SoftDeletesPolicy {
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
retentionLeases = retentionLeasesSupplier.get();
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
Expand Down Expand Up @@ -113,6 +111,11 @@ synchronized long getMinRetainedSeqNo() {
}

public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
*/
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
Expand All @@ -126,7 +129,6 @@ public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy()
*/

// calculate the minimum sequence number to retain based on retention leases
retentionLeases = retentionLeasesSupplier.get();
final long minimumRetainingSequenceNumber = retentionLeases
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

Expand All @@ -46,7 +50,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
*/
public void testSoftDeletesRetentionLock() {
long retainedOps = between(0, 10000);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)];
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
retainingSequenceNumbers[i] = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
Expand Down Expand Up @@ -116,4 +120,23 @@ public void testSoftDeletesRetentionLock() {
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}

public void testAlwaysFetchLatestRetentionLeases() {
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final Collection<RetentionLease> leases = new ArrayList<>();
final int numLeases = randomIntBetween(0, 10);
for (int i = 0; i < numLeases; i++) {
leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(NO_OPS_PERFORMED, 1000), randomNonNegativeLong(), "test"));
}
final Supplier<Collection<RetentionLease>> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases));
final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
if (randomBoolean()) {
policy.acquireRetentionLock();
}
if (numLeases == 0) {
assertThat(policy.getRetentionPolicy().v2(), empty());
} else {
assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0])));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -71,8 +72,11 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer-recovery which locks the soft-deletes policy on the primary.
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();

// check retention leases have been committed on the primary
final Collection<RetentionLease> primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
Expand Down