Skip to content

Commit

Permalink
Sync retention leases on expiration (#37902)
Browse files Browse the repository at this point in the history
This commit introduces a sync of retention leases when a retention lease
expires. As expiration of retention leases is lazy, their expiration is
managed only when getting the current retention leases from the
replication tracker. At this point, we callback to our full retention
lease sync to sync and flush these on all shard copies. With this
change, replicas do not locally manage expiration of retention leases;
instead, that is done only on the primary.
  • Loading branch information
jasontedor authored Jan 28, 2019
1 parent 758eb9d commit 194cdfe
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final LongSupplier currentTimeMillisSupplier;

/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
Expand All @@ -171,21 +171,45 @@ private Collection<RetentionLease> copyRetentionLeases() {
}

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
.collect(Collectors.toList());
retentionLeases.clear();
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
public Collection<RetentionLease> getRetentionLeases() {
final boolean wasPrimaryMode;
final Collection<RetentionLease> nonExpiredRetentionLeases;
synchronized (this) {
if (primaryMode) {
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
.collect(Collectors.toList());
if (expiredRetentionLeases.isEmpty()) {
// early out as no retention leases have expired
return copyRetentionLeases();
}
// clean up the expired retention leases
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
retentionLeases.remove(expiredRetentionLease.id());
}
}
/*
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
* non-expired retention leases, instead receiving them on syncs from the primary.
*/
wasPrimaryMode = primaryMode;
nonExpiredRetentionLeases = copyRetentionLeases();
}
if (wasPrimaryMode) {
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
}
return nonExpiredRetentionLeases;
}

/**
Expand Down Expand Up @@ -215,7 +239,7 @@ public RetentionLease addRetentionLease(
retentionLeases.put(id, retentionLease);
currentRetentionLeases = copyRetentionLeases();
}
onNewRetentionLease.accept(currentRetentionLeases, listener);
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}

Expand Down Expand Up @@ -500,11 +524,11 @@ private static long inSyncCheckpointStates(
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onNewRetentionLease a callback when a new retention lease is created
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
Expand All @@ -513,7 +537,7 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -524,7 +548,7 @@ public ReplicationTracker(
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -30,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -67,17 +69,17 @@ public void testAddOrRenewRetentionLease() {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
}

for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true);
}
}

public void testOnNewRetentionLease() {
public void testAddRetentionLeaseCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final Map<String, Long> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
Expand Down Expand Up @@ -113,14 +115,23 @@ public void testOnNewRetentionLease() {
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());

// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
assertFalse(invoked.get());
}
}

public void testExpiration() {
public void testExpirationOnPrimary() {
runExpirationTest(true);
}

public void testExpirationOnReplica() {
runExpirationTest(false);
}

private void runExpirationTest(final boolean primaryMode) {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
Expand All @@ -141,42 +152,136 @@ public void testExpiration() {
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
if (primaryMode) {
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
}
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
if (primaryMode) {
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}

// renew the lease
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
if (primaryMode) {
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}

// now force the lease to expire
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
if (primaryMode) {
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
} else {
// leases do not expire on replicas until synced from the primary
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
}
}

public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
(leases, listener) -> {
// we do not want to hold a lock on the replication tracker in the callback!
assertFalse(Thread.holdsLock(reference.get()));
invoked.set(true);
assertThat(
leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
});
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);

final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());

// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
currentTimeMillis.set(1 + currentTimeMillis.get());
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");

// reset the invocation marker so that we can assert the callback was invoked if any leases are expired
assertFalse(invoked.get());
// randomly expire some leases
final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get());
// calculate the expired leases and update our tracking map
final List<String> expiredIds = retentionLeases.entrySet()
.stream()
.filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
expiredIds.forEach(retentionLeases::remove);
currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
// getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
final Collection<RetentionLease> current = replicationTracker.getRetentionLeases();
// the current leases should equal our tracking map
assertThat(
current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
// the callback should only be invoked if there were expired leases
assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
}
}

private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp());
}

private void assertRetentionLeases(
final ReplicationTracker replicationTracker,
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final boolean primaryMode) {
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
Expand All @@ -188,9 +293,12 @@ private void assertRetentionLeases(
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
if (primaryMode) {
// retention leases can be expired on replicas, so we can only assert on primaries here
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
}
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}
Expand Down
Loading

0 comments on commit 194cdfe

Please sign in to comment.