diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7a5ec6bd28685..27c3d5eabcfdb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3162,4 +3162,8 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { public void verifyShardBeforeIndexClosing() throws IllegalStateException { getEngine().verifyEngineBeforeIndexClosing(); } + + RetentionLeaseSyncer getRetentionLeaseSyncer() { + return retentionLeaseSyncer; + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java new file mode 100644 index 0000000000000..ce3986f0a2517 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.replication; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class RetentionLeasesReplicationTests extends ESIndexLevelReplicationTestCase { + + public void testSimpleSyncRetentionLeases() throws Exception { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + try (ReplicationGroup group = createGroup(between(0, 2), settings)) { + group.startAll(); + List leases = new ArrayList<>(); + int iterations = between(1, 100); + CountDownLatch latch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + if (leases.isEmpty() == false && rarely()) { + RetentionLease leaseToRemove = randomFrom(leases); + leases.remove(leaseToRemove); + group.removeRetentionLease(leaseToRemove.id(), ActionListener.wrap(latch::countDown)); + } else { + RetentionLease newLease = group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i, + ActionListener.wrap(latch::countDown)); + leases.add(newLease); + } + } + RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); + assertThat(leasesOnPrimary.version(), equalTo((long) iterations)); + assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm())); + assertThat(leasesOnPrimary.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); + latch.await(); + for (IndexShard replica : group.getReplicas()) { + assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary)); + } + } + } + + public void testOutOfOrderRetentionLeasesRequests() throws Exception { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + int numberOfReplicas = between(1, 2); + IndexMetaData indexMetaData = buildIndexMetaData(numberOfReplicas, settings, indexMapping); + try (ReplicationGroup group = new ReplicationGroup(indexMetaData) { + @Override + protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { + listener.onResponse(new SyncRetentionLeasesResponse(new RetentionLeaseSyncAction.Request(shardId, leases))); + } + }) { + group.startAll(); + int numLeases = between(1, 10); + List requests = new ArrayList<>(); + for (int i = 0; i < numLeases; i++) { + PlainActionFuture future = new PlainActionFuture<>(); + group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i, future); + requests.add(((SyncRetentionLeasesResponse) future.actionGet()).syncRequest); + } + RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); + for (IndexShard replica : group.getReplicas()) { + Randomness.shuffle(requests); + requests.forEach(request -> group.executeRetentionLeasesSyncRequestOnReplica(request, replica)); + assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary)); + } + } + } + + public void testSyncRetentionLeasesWithPrimaryPromotion() throws Exception { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + int numberOfReplicas = between(2, 4); + IndexMetaData indexMetaData = buildIndexMetaData(numberOfReplicas, settings, indexMapping); + try (ReplicationGroup group = new ReplicationGroup(indexMetaData) { + @Override + protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { + listener.onResponse(new SyncRetentionLeasesResponse(new RetentionLeaseSyncAction.Request(shardId, leases))); + } + }) { + group.startAll(); + int numLeases = between(1, 100); + IndexShard newPrimary = randomFrom(group.getReplicas()); + RetentionLeases latestRetentionLeasesOnNewPrimary = RetentionLeases.EMPTY; + for (int i = 0; i < numLeases; i++) { + PlainActionFuture addLeaseFuture = new PlainActionFuture<>(); + group.addRetentionLease(Integer.toString(i), randomNonNegativeLong(), "test-" + i, addLeaseFuture); + RetentionLeaseSyncAction.Request request = ((SyncRetentionLeasesResponse) addLeaseFuture.actionGet()).syncRequest; + for (IndexShard replica : randomSubsetOf(group.getReplicas())) { + group.executeRetentionLeasesSyncRequestOnReplica(request, replica); + if (newPrimary == replica) { + latestRetentionLeasesOnNewPrimary = request.getRetentionLeases(); + } + } + } + group.promoteReplicaToPrimary(newPrimary).get(); + // we need to make changes to retention leases to sync it to replicas + // since we don't sync retention leases when promoting a new primary. + PlainActionFuture newLeaseFuture = new PlainActionFuture<>(); + group.addRetentionLease("new-lease-after-promotion", randomNonNegativeLong(), "test", newLeaseFuture); + RetentionLeases leasesOnPrimary = group.getPrimary().getRetentionLeases(); + assertThat(leasesOnPrimary.primaryTerm(), equalTo(group.getPrimary().getOperationPrimaryTerm())); + assertThat(leasesOnPrimary.version(), equalTo(latestRetentionLeasesOnNewPrimary.version() + 1L)); + assertThat(leasesOnPrimary.leases(), hasSize(latestRetentionLeasesOnNewPrimary.leases().size() + 1)); + RetentionLeaseSyncAction.Request request = ((SyncRetentionLeasesResponse) newLeaseFuture.actionGet()).syncRequest; + for (IndexShard replica : group.getReplicas()) { + group.executeRetentionLeasesSyncRequestOnReplica(request, replica); + } + for (IndexShard replica : group.getReplicas()) { + assertThat(replica.getRetentionLeases(), equalTo(leasesOnPrimary)); + } + } + } + + static final class SyncRetentionLeasesResponse extends ReplicationResponse { + final RetentionLeaseSyncAction.Request syncRequest; + SyncRetentionLeasesResponse(RetentionLeaseSyncAction.Request syncRequest) { + this.syncRequest = syncRequest; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 316ed39574c0c..e997adb3d1a18 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -101,6 +101,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -1046,8 +1047,8 @@ public void testGlobalCheckpointSync() throws IOException { final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); final AtomicBoolean synced = new AtomicBoolean(); - final IndexShard primaryShard = - newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true)); + final IndexShard primaryShard = newShard( + shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true), RetentionLeaseSyncer.EMPTY); // add a replica recoverShardFromStore(primaryShard); final IndexShard replicaShard = newShard(shardId, false); @@ -1462,9 +1463,8 @@ public String[] listAll() throws IOException { }; try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) { - IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store, - null, new InternalEngineFactory(), () -> { - }, EMPTY_EVENT_LISTENER); + IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store, null, new InternalEngineFactory(), + () -> { }, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER); AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true)); @@ -2122,6 +2122,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { null, shard.getEngineFactory(), shard.getGlobalCheckpointSyncer(), + shard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); @@ -2242,6 +2243,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { wrapper, new InternalEngineFactory(), () -> {}, + RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER); recoverShardFromStore(newShard); @@ -2396,6 +2398,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { wrapper, new InternalEngineFactory(), () -> {}, + RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER); recoverShardFromStore(newShard); @@ -2962,9 +2965,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum"))) .build(); - IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, - null, null, indexShard.engineFactory, - indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); final IndexShardRecoveryException indexShardRecoveryException = expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); @@ -3007,9 +3009,8 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception { } // try to start shard on corrupted files - final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, - null, null, indexShard.engineFactory, - indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); @@ -3030,9 +3031,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1)); // try to start another time shard on corrupted files - final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData, - null, null, indexShard.engineFactory, - indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData, null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard2, true)); @@ -3070,9 +3070,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { .put(indexShard.indexSettings.getSettings()) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum"))) .build(); - final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, - null, null, indexShard.engineFactory, - indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata(); assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1); @@ -3482,15 +3481,14 @@ public void testFlushOnInactive() throws Exception { ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); AtomicBoolean markedInactive = new AtomicBoolean(); AtomicReference primaryRef = new AtomicReference<>(); - IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, - new InternalEngineFactory(), () -> { - }, new IndexEventListener() { - @Override - public void onShardInactive(IndexShard indexShard) { - markedInactive.set(true); - primaryRef.get().flush(new FlushRequest()); - } - }); + IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { }, + RetentionLeaseSyncer.EMPTY, new IndexEventListener() { + @Override + public void onShardInactive(IndexShard indexShard) { + markedInactive.set(true); + primaryRef.get().flush(new FlushRequest()); + } + }); primaryRef.set(primary); recoverShardFromStore(primary); for (int i = 0; i < 3; i++) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 1c3c3b28773cf..2079b80cd386c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -107,11 +108,8 @@ public void setup() throws IOException { .putMapping("_doc", "{ \"properties\": {} }"); indexMetaData = metaData.build(); - indexShard = newStartedShard(p -> - newShard(routing, shardPath, indexMetaData, null, null, - new InternalEngineFactory(), () -> { - }, EMPTY_EVENT_LISTENER), - true); + indexShard = newStartedShard(p -> newShard(routing, shardPath, indexMetaData, null, null, + new InternalEngineFactory(), () -> { }, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER), true); translogPath = shardPath.resolveTranslog(); indexPath = shardPath.resolveIndex(); @@ -371,8 +369,8 @@ private IndexShard reopenIndexShard(boolean corrupted) throws IOException { return new Store(shardId, indexSettings, baseDirectoryWrapper, new DummyShardLock(shardId)); }; - return newShard(shardRouting, shardPath, metaData, storeProvider, null, - indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + return newShard(shardRouting, shardPath, metaData, storeProvider, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER); } private int indexDocs(IndexShard indexShard, boolean flushLast) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index ba3fa84a19641..1b59f558db584 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -109,6 +110,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { null, new InternalEngineFactory(), () -> {}, + RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER); // restore the shard diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 37fc1c748c189..85e69de8824c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -67,6 +67,10 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -177,9 +181,25 @@ protected class ReplicationGroup implements AutoCloseable, Iterable } }); + private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer() { + @Override + public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) { + syncRetentionLeases(shardId, retentionLeases, listener); + } + + @Override + public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { + sync(shardId, retentionLeases, ActionListener.wrap( + r -> { }, + e -> { + throw new AssertionError("failed to backgroun sync retention lease", e); + })); + } + }; + protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); + primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); replicas = new CopyOnWriteArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -284,7 +304,7 @@ public void startPrimary() throws IOException { public IndexShard addReplica() throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = - newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting), () -> {}); + newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer); addReplica(replica); return replica; } @@ -315,7 +335,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting), - () -> {}, EMPTY_EVENT_LISTENER); + () -> {}, retentionLeaseSyncer, EMPTY_EVENT_LISTENER); replicas.add(newReplica); if (replicationTargets != null) { replicationTargets.addReplica(newReplica); @@ -479,7 +499,7 @@ public Iterator iterator() { return Iterators.concat(replicas.iterator(), Collections.singleton(primary).iterator()); } - public IndexShard getPrimary() { + public synchronized IndexShard getPrimary() { return primary; } @@ -512,6 +532,34 @@ private synchronized void computeReplicationTargets() { private synchronized ReplicationTargets getReplicationTargets() { return replicationTargets; } + + protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { + RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(shardId, leases); + ActionListener wrappedListener = ActionListener.wrap( + r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); + new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); + } + + public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, + ActionListener listener) { + return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener); + } + + public void removeRetentionLease(String id, ActionListener listener) { + getPrimary().removeRetentionLease(id, listener); + } + + public void executeRetentionLeasesSyncRequestOnReplica(RetentionLeaseSyncAction.Request request, IndexShard replica) { + final PlainActionFuture acquirePermitFuture = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(getPrimary().getOperationPrimaryTerm(), getPrimary().getGlobalCheckpoint(), + getPrimary().getMaxSeqNoOfUpdatesOrDeletes(), acquirePermitFuture, ThreadPool.Names.SAME, request); + try (Releasable ignored = acquirePermitFuture.actionGet()) { + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + } catch (Exception e) { + throw new AssertionError("failed to execute retention lease request on replica [" + replica.routingEntry() + "]", e); + } + } } static final class ReplicationTargets { @@ -866,4 +914,26 @@ private void executeResyncOnReplica(IndexShard replica, ResyncReplicationRequest } TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); } + + class SyncRetentionLeases extends ReplicationAction< + RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> { + + SyncRetentionLeases(RetentionLeaseSyncAction.Request request, ReplicationGroup group, + ActionListener listener) { + super(request, listener, group, "sync-retention-leases"); + } + + @Override + protected PrimaryResult performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request) throws Exception { + primary.persistRetentionLeases(); + return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + } + + @Override + protected void performOnReplica(RetentionLeaseSyncAction.Request request, IndexShard replica) throws Exception { + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index f59ae8b9683ac..8d73a5ba4e467 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -250,7 +250,7 @@ protected IndexShard newShard( .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("_doc", "{ \"properties\": {} }"); - return newShard(shardRouting, metaData.build(), null, engineFactory, () -> {}, listeners); + return newShard(shardRouting, metaData.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); } /** @@ -293,7 +293,8 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I @Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer); + return newShard( + shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer, RetentionLeaseSyncer.EMPTY); } /** @@ -307,7 +308,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I protected IndexShard newShard( ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, engineFactory, () -> {}, listeners); + return newShard(routing, indexMetaData, null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); } /** @@ -323,6 +324,7 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, + RetentionLeaseSyncer retentionLeaseSyncer, IndexingOperationListener... listeners) throws IOException { // add node id as name to settings for proper logging @@ -330,7 +332,7 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, - EMPTY_EVENT_LISTENER, listeners); + retentionLeaseSyncer, EMPTY_EVENT_LISTENER, listeners); } /** @@ -348,7 +350,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe @Nullable CheckedFunction storeProvider, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, - Runnable globalCheckpointSyncer, + Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); @@ -386,7 +388,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, - RetentionLeaseSyncer.EMPTY, + retentionLeaseSyncer, breakerService); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; @@ -438,6 +440,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Engin null, engineFactory, current.getGlobalCheckpointSyncer(), + current.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, listeners); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 28244b523e129..ec1f002d05ba8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -208,7 +209,8 @@ public void testRestoreMinmal() throws IOException { new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId())); IndexMetaData metaData = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetaData(snapshotId, indexId)); - IndexShard restoredShard = newShard(shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}); + IndexShard restoredShard = newShard( + shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY); restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT); restoredShard.markAsRecovering("test from snap", new RecoveryState(restoredShard.routingEntry(), discoveryNode, null));