Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync retention leases on expiration #37902

Merged
merged 3 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final LongSupplier currentTimeMillisSupplier;

/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
Expand All @@ -171,21 +171,45 @@ private Collection<RetentionLease> copyRetentionLeases() {
}

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
.collect(Collectors.toList());
retentionLeases.clear();
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
public Collection<RetentionLease> getRetentionLeases() {
final boolean wasPrimaryMode;
final Collection<RetentionLease> nonExpiredRetentionLeases;
synchronized (this) {
if (primaryMode) {
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
.collect(Collectors.toList());
if (expiredRetentionLeases.isEmpty()) {
// early out as no retention leases have expired
return copyRetentionLeases();
}
// clean up the expired retention leases
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
retentionLeases.remove(expiredRetentionLease.id());
}
}
/*
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
* non-expired retention leases, instead receiving them on syncs from the primary.
*/
wasPrimaryMode = primaryMode;
nonExpiredRetentionLeases = copyRetentionLeases();
}
if (wasPrimaryMode) {
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
}
return nonExpiredRetentionLeases;
}

/**
Expand Down Expand Up @@ -215,7 +239,7 @@ public RetentionLease addRetentionLease(
retentionLeases.put(id, retentionLease);
currentRetentionLeases = copyRetentionLeases();
}
onNewRetentionLease.accept(currentRetentionLeases, listener);
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}

Expand Down Expand Up @@ -500,11 +524,11 @@ private static long inSyncCheckpointStates(
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onNewRetentionLease a callback when a new retention lease is created
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
Expand All @@ -513,7 +537,7 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -524,7 +548,7 @@ public ReplicationTracker(
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -30,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -67,17 +69,17 @@ public void testAddOrRenewRetentionLease() {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
}

for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true);
}
}

public void testOnNewRetentionLease() {
public void testAddRetentionLeaseCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final Map<String, Long> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
Expand Down Expand Up @@ -113,14 +115,23 @@ public void testOnNewRetentionLease() {
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());

// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
assertFalse(invoked.get());
}
}

public void testExpiration() {
public void testExpirationOnPrimary() {
runExpirationTest(true);
}

public void testExpirationOnReplica() {
runExpirationTest(false);
}

private void runExpirationTest(final boolean primaryMode) {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
Expand All @@ -141,42 +152,136 @@ public void testExpiration() {
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
if (primaryMode) {
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
}
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
if (primaryMode) {
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}

// renew the lease
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
if (primaryMode) {
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}

// now force the lease to expire
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
if (primaryMode) {
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
} else {
// leases do not expire on replicas until synced from the primary
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
}
}

public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
(leases, listener) -> {
// we do not want to hold a lock on the replication tracker in the callback!
assertFalse(Thread.holdsLock(reference.get()));
invoked.set(true);
assertThat(
leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
});
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);

final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());

// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
currentTimeMillis.set(1 + currentTimeMillis.get());
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");

// reset the invocation marker so that we can assert the callback was invoked if any leases are expired
assertFalse(invoked.get());
// randomly expire some leases
final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get());
// calculate the expired leases and update our tracking map
final List<String> expiredIds = retentionLeases.entrySet()
.stream()
.filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
expiredIds.forEach(retentionLeases::remove);
currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
// getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
final Collection<RetentionLease> current = replicationTracker.getRetentionLeases();
// the current leases should equal our tracking map
assertThat(
current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
// the callback should only be invoked if there were expired leases
assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
}
}

private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp());
}

private void assertRetentionLeases(
final ReplicationTracker replicationTracker,
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final boolean primaryMode) {
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
Expand All @@ -188,9 +293,12 @@ private void assertRetentionLeases(
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
if (primaryMode) {
// retention leases can be expired on replicas, so we can only assert on primaries here
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
}
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}
Expand Down
Loading