From d07e61b187233787190fd860025a39a1eaf071a1 Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Mon, 11 Feb 2019 16:26:45 -0600 Subject: [PATCH 1/4] Fix the version check for LegacyGeoShapeFieldMapper (#38547) Change version check from 7.0 to 6.6 in BaseGeoShapeFieldMapper to correctly use LegacyGeoShapeFieldMapper for indexes created prior to 6.6. --- .../org/elasticsearch/index/mapper/BaseGeoShapeFieldMapper.java | 2 +- .../java/org/elasticsearch/index/mapper/ExternalMapper.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BaseGeoShapeFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/BaseGeoShapeFieldMapper.java index ea30f1c5c2312..74892bf7d516c 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BaseGeoShapeFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BaseGeoShapeFieldMapper.java @@ -190,7 +190,7 @@ public Mapper.Builder parse(String name, Map node, ParserContext } } final Builder builder; - if (parsedDeprecatedParams || parserContext.indexVersionCreated().before(Version.V_7_0_0)) { + if (parsedDeprecatedParams || parserContext.indexVersionCreated().before(Version.V_6_6_0)) { // Legacy index-based shape builder = new LegacyGeoShapeFieldMapper.Builder(name, deprecatedParameters); } else { diff --git a/server/src/test/java/org/elasticsearch/index/mapper/ExternalMapper.java b/server/src/test/java/org/elasticsearch/index/mapper/ExternalMapper.java index ea3125accd059..31864abc2e459 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/ExternalMapper.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/ExternalMapper.java @@ -87,7 +87,7 @@ public ExternalMapper build(BuilderContext context) { BinaryFieldMapper binMapper = binBuilder.build(context); BooleanFieldMapper boolMapper = boolBuilder.build(context); GeoPointFieldMapper pointMapper = latLonPointBuilder.build(context); - BaseGeoShapeFieldMapper shapeMapper = (context.indexCreatedVersion().before(Version.V_7_0_0)) + BaseGeoShapeFieldMapper shapeMapper = (context.indexCreatedVersion().before(Version.V_6_6_0)) ? legacyShapeBuilder.build(context) : shapeBuilder.build(context); FieldMapper stringMapper = (FieldMapper)stringBuilder.build(context); From 33b2be5b98995a23d812617647a0f8c95da81591 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 11 Feb 2019 20:36:27 -0500 Subject: [PATCH 2/4] Copy retention leases when trim unsafe commits (#37995) When a primary shard is recovered from its store, we trim the last commit (when it's unsafe). If that primary crashes before the recovery completes, we will lose the committed retention leases because they are baked in the last commit. With this change, we copy the retention leases from the last commit to the safe commit when trimming unsafe commits. Relates #37165 --- .../org/elasticsearch/index/store/Store.java | 14 +++- .../shard/IndexShardRetentionLeaseTests.java | 80 +++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 73ac8a65d3007..c693d6b9d80fe 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1521,7 +1521,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long if (existingCommits.isEmpty()) { throw new IllegalArgumentException("No index found to trim"); } - final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); + final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1); + final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY); final IndexCommit startingIndexCommit; // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. @@ -1546,7 +1547,14 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long + startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid [" + translogUUID + "]"); } - if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) { + if (startingIndexCommit.equals(lastIndexCommitCommit) == false) { + /* + * Unlike other commit tags, the retention-leases tag is not restored when an engine is + * recovered from translog. We need to manually copy it from the last commit to the safe commit; + * otherwise we might lose the latest committed retention leases when re-opening an engine. + */ + final Map userData = new HashMap<>(startingIndexCommit.getUserData()); + userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, "")); try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) { // this achieves two things: // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened @@ -1557,7 +1565,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long // The new commit will use segment files from the starting commit but userData from the last commit by default. // Thus, we need to manually set the userData from the starting commit to the new commit. - writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); + writer.setLiveCommitData(userData.entrySet()); writer.commit(); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index cc64fc6f8b2de..d73b860515691 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -20,32 +20,42 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -264,6 +274,76 @@ public void testRetentionLeaseStats() throws IOException { } } + public void testRecoverFromStoreReserveRetentionLeases() throws Exception { + final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean(); + final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(), + config -> new InternalEngine(config) { + @Override + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, + long recoverUpToSeqNo) throws IOException { + if (throwDuringRecoverFromTranslog.get()) { + throw new RuntimeException("crashed before recover from translog is completed"); + } + return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + } + }); + final List leases = new ArrayList<>(); + long version = randomLongBetween(0, 100); + long primaryTerm = randomLongBetween(1, 100); + final int iterations = randomIntBetween(1, 10); + for (int i = 0; i < iterations; i++) { + if (randomBoolean()) { + indexDoc(shard, "_doc", Integer.toString(i)); + } else { + leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(), + randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test")); + } + if (randomBoolean()) { + if (randomBoolean()) { + version += randomLongBetween(1, 100); + primaryTerm += randomLongBetween(0, 100); + shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + } + } + if (randomBoolean()) { + shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test"); + flushShard(shard); + } + } + version += randomLongBetween(1, 100); + primaryTerm += randomLongBetween(0, 100); + shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + closeShard(shard, false); + + final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), + shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); + throwDuringRecoverFromTranslog.set(true); + expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore); + closeShards(failedShard); + + final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), + shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); + throwDuringRecoverFromTranslog.set(false); + assertTrue(newShard.recoverFromStore()); + final RetentionLeases retentionLeases = newShard.getRetentionLeases(); + assertThat(retentionLeases.version(), equalTo(version)); + assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); + if (leases.isEmpty()) { + assertThat(retentionLeases.leases(), empty()); + } else { + assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); + } + closeShards(newShard); + } + private void assertRetentionLeases( final IndexShard indexShard, final int size, From 3fc1c940e8cf89501d30d827e6c4f2685babf3ad Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 11 Feb 2019 18:10:34 -0800 Subject: [PATCH 3/4] Make the 'get templates' types deprecation message consistent. (#38533) --- .../rest/action/admin/indices/RestGetIndexTemplateAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndexTemplateAction.java index 707378eec4cf6..65be56abd76da 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndexTemplateAction.java @@ -51,8 +51,8 @@ public class RestGetIndexTemplateAction extends BaseRestHandler { Collections.singleton(INCLUDE_TYPE_NAME_PARAMETER), Settings.FORMAT_PARAMS)); private static final DeprecationLogger deprecationLogger = new DeprecationLogger( LogManager.getLogger(RestGetIndexTemplateAction.class)); - public static final String TYPES_DEPRECATION_MESSAGE = "[types removal]" + - " Specifying include_type_name in get index template requests is deprecated."; + public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Using include_type_name in get " + + "index template requests is deprecated. The parameter will be removed in the next major version."; public RestGetIndexTemplateAction(final Settings settings, final RestController controller) { super(settings); From 58a77167219810cc8692dd1be1576345f8b9ee71 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 21:17:23 -0500 Subject: [PATCH 4/4] Enable removal of retention leases (#38751) This commit introduces the ability to remove retention leases. Explicit removal will be needed to manage retention leases used to increase the likelihood of operation-based recoveries syncing, and for consumers such as ILM. --- .../index/seqno/ReplicationTracker.java | 37 +++++-- .../elasticsearch/index/shard/IndexShard.java | 13 +++ ...ReplicationTrackerRetentionLeaseTests.java | 99 +++++++++++++++++++ .../index/seqno/RetentionLeaseIT.java | 62 ++++++++++++ .../shard/IndexShardRetentionLeaseTests.java | 30 ++++++ 5 files changed, 234 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 31f491d24cf9d..1d598d3d50ae0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -156,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final LongSupplier currentTimeMillisSupplier; /** - * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync - * retention leases to replicas. + * A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the + * retention lease sync action, to sync retention leases to replicas. */ - private final BiConsumer> onAddRetentionLease; + private final BiConsumer> onSyncRetentionLeases; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -246,7 +246,7 @@ public RetentionLease addRetentionLease( Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); currentRetentionLeases = retentionLeases; } - onAddRetentionLease.accept(currentRetentionLeases, listener); + onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } @@ -283,6 +283,29 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo return retentionLease; } + /** + * Removes an existing retention lease. + * + * @param id the identifier of the retention lease + * @param listener the callback when the retention lease is successfully removed and synced to replicas + */ + public void removeRetentionLease(final String id, final ActionListener listener) { + Objects.requireNonNull(listener); + final RetentionLeases currentRetentionLeases; + synchronized (this) { + assert primaryMode; + if (retentionLeases.contains(id) == false) { + throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist"); + } + retentionLeases = new RetentionLeases( + operationPrimaryTerm, + retentionLeases.version() + 1, + retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList())); + currentRetentionLeases = retentionLeases; + } + onSyncRetentionLeases.accept(currentRetentionLeases, listener); + } + /** * Updates retention leases on a replica. * @@ -563,7 +586,7 @@ private static long inSyncCheckpointStates( * @param indexSettings the index settings * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires + * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires */ public ReplicationTracker( final ShardId shardId, @@ -573,7 +596,7 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onAddRetentionLease) { + final BiConsumer> onSyncRetentionLeases) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -585,7 +608,7 @@ public ReplicationTracker( checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); - this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease); + this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1ea894e7aed74..63307af0ac67c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1956,6 +1956,19 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source); } + /** + * Removes an existing retention lease. + * + * @param id the identifier of the retention lease + * @param listener the callback when the retention lease is successfully removed and synced to replicas + */ + public void removeRetentionLease(final String id, final ActionListener listener) { + Objects.requireNonNull(listener); + assert assertPrimaryMode(); + verifyNotClosed(); + replicationTracker.removeRetentionLease(id, listener); + } + /** * Updates retention leases on a replica. * diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index bb526a3470873..d47b45a5ec6dc 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -137,6 +137,105 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { } } + public void testRemoveRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + } + + for (int i = 0; i < length; i++) { + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + /* + * Remove from the end since it will make the following assertion easier; we want to ensure that only the intended lease was + * removed. + */ + replicationTracker.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {})); + assertRetentionLeases( + replicationTracker, + length - i - 1, + minimumRetainingSequenceNumbers, + primaryTerm, + 1 + length + i, + true, + false); + } + } + + public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { + final AllocationId allocationId = AllocationId.newInitializing(); + final Map retainingSequenceNumbers = new HashMap<>(); + final AtomicBoolean invoked = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference<>(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { + // we do not want to hold a lock on the replication tracker in the callback! + assertFalse(Thread.holdsLock(reference.get())); + invoked.set(true); + assertThat( + leases.leases() + .stream() + .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), + equalTo(retainingSequenceNumbers)); + }); + reference.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retainingSequenceNumbers.put(id, retainingSequenceNumber); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); + // assert that the new retention lease callback was invoked + assertTrue(invoked.get()); + + // reset the invocation marker so that we can assert the callback was not invoked when removing the lease + invoked.set(false); + retainingSequenceNumbers.remove(id); + replicationTracker.removeRetentionLease(id, ActionListener.wrap(() -> {})); + assertTrue(invoked.get()); + } + } + public void testExpirationOnPrimary() { runExpirationTest(true); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index e7561d9f37049..f67c01c8d8f7d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -126,6 +126,68 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { } } + public void testRetentionLeaseSyncedOnRemove() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + // simulate a peer recovery which locks the soft deletes policy on the primary + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + retentionLock.close(); + } + + for (int i = 0; i < length; i++) { + final String id = randomFrom(currentRetentionLeases.keySet()); + final CountDownLatch latch = new CountDownLatch(1); + primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); + // simulate a peer recovery which locks the soft deletes policy on the primary + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + currentRetentionLeases.remove(id); + latch.await(); + retentionLock.close(); + + // check retention leases have been committed on the primary + final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( + primary.commitStats().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases))); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); + + // check retention leases have been committed on the replica + final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( + replica.commitStats().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); + } + } + } + public void testRetentionLeasesSyncOnExpiration() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index d73b860515691..5f103d484f8c1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -114,6 +114,36 @@ public void testAddOrRenewRetentionLease() throws IOException { } } + public void testRemoveRetentionLease() throws IOException { + final IndexShard indexShard = newStartedShard(true); + final long primaryTerm = indexShard.getOperationPrimaryTerm(); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + indexShard.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); + assertRetentionLeases( + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); + } + + for (int i = 0; i < length; i++) { + indexShard.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {})); + assertRetentionLeases( + indexShard, + length - i - 1, + minimumRetainingSequenceNumbers, + primaryTerm, + 1 + length + i, + true, + false); + } + } finally { + closeShards(indexShard); + } + } + public void testExpirationOnPrimary() throws IOException { runExpirationTest(true); }