From 26e11d3083c415c299a525920b95a4cbfca8974d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 28 Jan 2019 07:11:51 -0500 Subject: [PATCH] Sync retention leases on expiration (#37902) This commit introduces a sync of retention leases when a retention lease expires. As expiration of retention leases is lazy, their expiration is managed only when getting the current retention leases from the replication tracker. At this point, we callback to our full retention lease sync to sync and flush these on all shard copies. With this change, replicas do not locally manage expiration of retention leases; instead, that is done only on the primary. --- .../index/seqno/ReplicationTracker.java | 70 ++++++--- ...ReplicationTrackerRetentionLeaseTests.java | 136 ++++++++++++++++-- .../index/seqno/RetentionLeaseSyncIT.java | 75 ++++++++++ .../shard/IndexShardRetentionLeaseTests.java | 53 +++++-- 4 files changed, 284 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 7e8560228920..4a614d8874af 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final LongSupplier currentTimeMillisSupplier; /** - * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync - * retention leases to replicas. + * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the + * retention lease sync action, to sync retention leases to replicas. */ - private final BiConsumer, ActionListener> onNewRetentionLease; + private final BiConsumer, ActionListener> onSyncRetentionLeases; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -171,21 +171,45 @@ private Collection copyRetentionLeases() { } /** - * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. + * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only + * the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. * * @return the retention leases */ - public synchronized Collection getRetentionLeases() { - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Collection nonExpiredRetentionLeases = retentionLeases - .values() - .stream() - .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis) - .collect(Collectors.toList()); - retentionLeases.clear(); - retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease))); - return Collections.unmodifiableCollection(nonExpiredRetentionLeases); + public Collection getRetentionLeases() { + final boolean wasPrimaryMode; + final Collection nonExpiredRetentionLeases; + synchronized (this) { + if (primaryMode) { + // the primary calculates the non-expired retention leases and syncs them to replicas + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Collection expiredRetentionLeases = retentionLeases + .values() + .stream() + .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis) + .collect(Collectors.toList()); + if (expiredRetentionLeases.isEmpty()) { + // early out as no retention leases have expired + return copyRetentionLeases(); + } + // clean up the expired retention leases + for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) { + retentionLeases.remove(expiredRetentionLease.id()); + } + } + /* + * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or + * we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the + * non-expired retention leases, instead receiving them on syncs from the primary. + */ + wasPrimaryMode = primaryMode; + nonExpiredRetentionLeases = copyRetentionLeases(); + } + if (wasPrimaryMode) { + onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {})); + } + return nonExpiredRetentionLeases; } /** @@ -215,7 +239,7 @@ public RetentionLease addRetentionLease( retentionLeases.put(id, retentionLease); currentRetentionLeases = copyRetentionLeases(); } - onNewRetentionLease.accept(currentRetentionLeases, listener); + onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } @@ -500,11 +524,11 @@ private static long inSyncCheckpointStates( * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * - * @param shardId the shard ID - * @param allocationId the allocation ID - * @param indexSettings the index settings - * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onNewRetentionLease a callback when a new retention lease is created + * @param shardId the shard ID + * @param allocationId the allocation ID + * @param indexSettings the index settings + * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires */ public ReplicationTracker( final ShardId shardId, @@ -513,7 +537,7 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer, ActionListener> onNewRetentionLease) { + final BiConsumer, ActionListener> onSyncRetentionLeases) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -524,7 +548,7 @@ public ReplicationTracker( checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); - this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease); + this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 3dafb93d6540..7a867027412e 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -30,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -67,17 +69,17 @@ public void testAddOrRenewRetentionLease() { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true); } } - public void testOnNewRetentionLease() { + public void testAddRetentionLeaseCausesRetentionLeaseSync() { final AllocationId allocationId = AllocationId.newInitializing(); final Map retentionLeases = new HashMap<>(); final AtomicBoolean invoked = new AtomicBoolean(); @@ -113,6 +115,7 @@ public void testOnNewRetentionLease() { replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); // assert that the new retention lease callback was invoked assertTrue(invoked.get()); + // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease invoked.set(false); replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); @@ -120,7 +123,15 @@ public void testOnNewRetentionLease() { } } - public void testExpiration() { + public void testExpirationOnPrimary() { + runExpirationTest(true); + } + + public void testExpirationOnReplica() { + runExpirationTest(false); + } + + private void runExpirationTest(final boolean primaryMode) { final AllocationId allocationId = AllocationId.newInitializing(); final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); @@ -141,42 +152,136 @@ public void testExpiration() { Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId), Collections.emptySet()); - replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + if (primaryMode) { + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + } final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + if (primaryMode) { + replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + } else { + replicationTracker.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = replicationTracker.getRetentionLeases(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); } // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + if (primaryMode) { + replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + } else { + replicationTracker.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = replicationTracker.getRetentionLeases(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get); + if (primaryMode) { + assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + } else { + // leases do not expire on replicas until synced from the primary + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + } + } + + public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { + final AllocationId allocationId = AllocationId.newInitializing(); + final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); + final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); + final Settings settings = Settings + .builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) + .build(); + final Map> retentionLeases = new HashMap<>(); + final AtomicBoolean invoked = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference<>(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", settings), + UNASSIGNED_SEQ_NO, + value -> {}, + currentTimeMillis::get, + (leases, listener) -> { + // we do not want to hold a lock on the replication tracker in the callback! + assertFalse(Thread.holdsLock(reference.get())); + invoked.set(true); + assertThat( + leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + equalTo(retentionLeases)); + }); + reference.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); + // assert that the new retention lease callback was invoked + assertTrue(invoked.get()); + + // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease + invoked.set(false); + currentTimeMillis.set(1 + currentTimeMillis.get()); + retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); + replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); + + // reset the invocation marker so that we can assert the callback was invoked if any leases are expired + assertFalse(invoked.get()); + // randomly expire some leases + final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()); + // calculate the expired leases and update our tracking map + final List expiredIds = retentionLeases.entrySet() + .stream() + .filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + expiredIds.forEach(retentionLeases::remove); + currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement); + // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback + final Collection current = replicationTracker.getRetentionLeases(); + // the current leases should equal our tracking map + assertThat( + current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + equalTo(retentionLeases)); + // the callback should only be invoked if there were expired leases + assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false)); + } + } + + private static Tuple toTuple(final RetentionLease retentionLease) { + return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp()); } private void assertRetentionLeases( final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier) { + final LongSupplier currentTimeMillisSupplier, + final boolean primaryMode) { final Collection retentionLeases = replicationTracker.getRetentionLeases(); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { @@ -188,9 +293,12 @@ private void assertRetentionLeases( assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); + if (primaryMode) { + // retention leases can be expired on replicas, so we can only assert on primaries here + assertThat( + currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), + lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); + } assertThat(retentionLease.source(), equalTo("test-" + i)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index fad9e25db12d..7d6e5fa2dc5a 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -23,20 +23,28 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; public class RetentionLeaseSyncIT extends ESIntegTestCase { @@ -89,6 +97,73 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { } } + public void testRetentionLeasesSyncOnExpiration() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final long estimatedTimeIntervalMillis = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + final TimeValue retentionLeaseTimeToLive = + TimeValue.timeValueMillis(randomLongBetween(estimatedTimeIntervalMillis, 2 * estimatedTimeIntervalMillis)); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + // we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas + final int length = randomIntBetween(1, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + final RetentionLease currentRetentionLease = primary.addRetentionLease(id, retainingSequenceNumber, source, listener); + final long now = System.nanoTime(); + latch.await(); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + assertThat(replica.getRetentionLeases(), hasItem(currentRetentionLease)); + } + + // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have + final long later = System.nanoTime(); + Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); + final Collection currentRetentionLeases = primary.getRetentionLeases(); + assertThat(currentRetentionLeases, anyOf(empty(), contains(currentRetentionLease))); + + /* + * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in + * the background. + */ + assertBusy(() -> { + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + if (currentRetentionLeases.isEmpty()) { + assertThat(replica.getRetentionLeases(), empty()); + } else { + assertThat(replica.getRetentionLeases(), contains(currentRetentionLeases.toArray(new RetentionLease[0]))); + } + } + }); + } + } + private static Map toMap(final Collection replicaCommittedRetentionLeases) { return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index eff1edfed52b..cd7d2a2c12cb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -81,57 +82,79 @@ public void testAddOrRenewRetentionLease() throws IOException { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, true); } } finally { closeShards(indexShard); } } - public void testExpiration() throws IOException { + public void testExpirationOnPrimary() throws IOException { + runExpirationTest(true); + } + + public void testExpirationOnReplica() throws IOException { + runExpirationTest(false); + } + + private void runExpirationTest(final boolean primary) throws IOException { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings .builder() .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); // current time is mocked through the thread pool - final IndexShard indexShard = newStartedShard(true, settings, new InternalEngineFactory()); + final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory()); try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + if (primary) { + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + } else { + indexShard.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); } // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + if (primary) { + indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + } else { + indexShard.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); - assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get); + if (primary) { + assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + } else { + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + } } finally { closeShards(indexShard); } @@ -196,7 +219,8 @@ private void assertRetentionLeases( final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier) { + final LongSupplier currentTimeMillisSupplier, + final boolean primary) { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { @@ -208,9 +232,12 @@ private void assertRetentionLeases( assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + if (primary) { + // retention leases can be expired on replicas, so we can only assert on primaries here + assertThat( + currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), + lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + } assertThat(retentionLease.source(), equalTo("test-" + i)); } }