From 92ef753b56d25215f19098cbce91553fcae74c64 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Feb 2019 09:53:36 -0500 Subject: [PATCH] Allow retention lease operations under blocks (#39089) This commit allows manipulating retention leases under blocks. --- .../TransportResyncReplicationAction.java | 2 +- .../TransportReplicationAction.java | 2 +- .../replication/TransportWriteAction.java | 2 +- .../index/seqno/RetentionLeaseSyncAction.java | 10 +- .../TransportReplicationActionTests.java | 4 +- ...ReplicationAllPermitsAcquisitionTests.java | 2 +- ...tentionLeaseBackgroundSyncActionTests.java | 27 ++++ .../index/seqno/RetentionLeaseIT.java | 126 +++++++++++++++++- .../seqno/RetentionLeaseSyncActionTests.java | 27 ++++ 9 files changed, 193 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index bd996377c39c1..e9a6e7b48152d 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -110,7 +110,7 @@ protected ClusterBlockLevel globalBlockLevel() { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { // resync should never be blocked because it's an internal action return null; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a8c187745ac4a..326f7bacdb8f6 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -222,7 +222,7 @@ protected ClusterBlockLevel globalBlockLevel() { * Index level block to check before request execution. Returning null means that no blocks need to be checked. */ @Nullable - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return null; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 279a616160000..f44694f55d960 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -245,7 +245,7 @@ protected ClusterBlockLevel globalBlockLevel() { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return ClusterBlockLevel.WRITE; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 760271e53ee1e..4cd11de4574a0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -127,7 +128,7 @@ protected WritePrimaryResult shardOperationOnPrimary( Objects.requireNonNull(request); Objects.requireNonNull(primary); primary.persistRetentionLeases(); - return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger); + return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); } @Override @@ -138,7 +139,12 @@ protected WriteReplicaResult shardOperationOnReplica( Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, replica, logger); + return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + } + + @Override + public ClusterBlockLevel indexBlockLevel() { + return null; } public static final class Request extends ReplicatedWriteRequest { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 41a300c28f3a9..110ab9bcb99a2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -221,7 +221,7 @@ protected ClusterBlockLevel globalBlockLevel() { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : null; } }; @@ -305,7 +305,7 @@ protected ClusterBlockLevel globalBlockLevel() { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : null; } }; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 8cad76bcdfe5e..1cb1bfde34ea8 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -459,7 +459,7 @@ protected ClusterBlockLevel globalBlockLevel() { } @Override - protected ClusterBlockLevel indexBlockLevel() { + public ClusterBlockLevel indexBlockLevel() { return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 4567f3e382337..6ad7d5039ae8b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -228,4 +228,31 @@ protected Logger getLogger() { assertTrue(invoked.get()); } + public void testBlocks() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + + assertNull(action.indexBlockLevel()); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index a05d383eee080..ee6cab9a6872b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -44,6 +44,10 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -266,7 +270,7 @@ public void testBackgroundRetentionLeaseSync() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .build(); createIndex("index", settings); ensureGreen("index"); @@ -370,4 +374,124 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { } } + public void testCanAddRetentionLeaseUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runUnderBlockTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> { + final String nextId = randomValueOtherThan(idForInitialRetentionLease, () -> randomAlphaOfLength(8)); + final long nextRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + primary.addRetentionLease(nextId, nextRetainingSequenceNumber, nextSource, listener); + }, + primary -> {}); + } + + public void testCanRenewRetentionLeaseUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + final long initialRetainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final AtomicReference retentionLease = new AtomicReference<>(); + runUnderBlockTest( + idForInitialRetentionLease, + initialRetainingSequenceNumber, + (primary, listener) -> { + final long nextRetainingSequenceNumber = randomLongBetween(initialRetainingSequenceNumber, Long.MAX_VALUE); + final String nextSource = randomAlphaOfLength(8); + retentionLease.set(primary.renewRetentionLease(idForInitialRetentionLease, nextRetainingSequenceNumber, nextSource)); + listener.onResponse(new ReplicationResponse()); + }, + primary -> { + try { + /* + * If the background renew was able to execute, then the retention leases were persisted to disk. There is no other + * way for the current retention leases to end up written to disk so we assume that if they are written to disk, it + * implies that the background sync was able to execute under a block. + */ + assertBusy(() -> assertThat(primary.loadRetentionLeases().leases(), contains(retentionLease.get()))); + } catch (final Exception e) { + fail(e.toString()); + } + }); + + } + + public void testCanRemoveRetentionLeasesUnderBlock() throws InterruptedException { + final String idForInitialRetentionLease = randomAlphaOfLength(8); + runUnderBlockTest( + idForInitialRetentionLease, + randomLongBetween(0, Long.MAX_VALUE), + (primary, listener) -> primary.removeRetentionLease(idForInitialRetentionLease, listener), + indexShard -> {}); + } + + private void runUnderBlockTest( + final String idForInitialRetentionLease, + final long initialRetainingSequenceNumber, + final BiConsumer> indexShard, + final Consumer afterSync) throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .build(); + assertAcked(prepareCreate("index").setSettings(settings)); + ensureGreen("index"); + + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + + final String id = idForInitialRetentionLease; + final long retainingSequenceNumber = initialRetainingSequenceNumber; + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + primary.addRetentionLease(id, retainingSequenceNumber, source, listener); + latch.await(); + + final String block = randomFrom("read_only", "read_only_allow_delete", "read", "write", "metadata"); + + client() + .admin() + .indices() + .prepareUpdateSettings("index") + .setSettings(Settings.builder().put("index.blocks." + block, true).build()) + .get(); + + try { + final CountDownLatch actionLatch = new CountDownLatch(1); + final AtomicBoolean success = new AtomicBoolean(); + + indexShard.accept( + primary, + new ActionListener() { + + @Override + public void onResponse(final ReplicationResponse replicationResponse) { + success.set(true); + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + }); + actionLatch.await(); + assertTrue(success.get()); + afterSync.accept(primary); + } finally { + client() + .admin() + .indices() + .prepareUpdateSettings("index") + .setSettings(Settings.builder().putNull("index.blocks." + block).build()) + .get(); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 80baa23a4d7ac..9b9ad6a0962c1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -228,4 +228,31 @@ protected Logger getLogger() { assertTrue(invoked.get()); } + public void testBlocks() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + + assertNull(action.indexBlockLevel()); + } + }