diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b501085703e5d..be8a7c04f5b5d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -536,7 +536,7 @@ public void respond(ActionListener listener) { } } - protected static class ReplicaResult { + public static class ReplicaResult { final Exception finalFailure; public ReplicaResult(Exception finalFailure) { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index e75bcff56d7e0..fee8d06c49873 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -120,6 +120,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; private volatile AsyncGlobalCheckpointTask globalCheckpointTask; + private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -196,6 +197,7 @@ public IndexService( this.refreshTask = new AsyncRefreshTask(this); this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); + this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -285,7 +287,8 @@ public synchronized void close(final String reason, boolean delete) throws IOExc refreshTask, fsyncTask, trimTranslogTask, - globalCheckpointTask); + globalCheckpointTask, + retentionLeaseBackgroundSyncTask); } } } @@ -402,7 +405,7 @@ public synchronized IndexShard createShard( searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), - (retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener), + retentionLeaseSyncer, circuitBreakerService); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); @@ -764,6 +767,14 @@ private void maybeTrimTranslog() { } private void maybeSyncGlobalCheckpoints() { + sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint"); + } + + private void backgroundSyncRetentionLeases() { + sync(IndexShard::backgroundSyncRetentionLeases, "retention lease"); + } + + private void sync(final Consumer sync, final String source) { for (final IndexShard shard : this.shards.values()) { if (shard.routingEntry().active() && shard.routingEntry().primary()) { switch (shard.state()) { @@ -777,17 +788,17 @@ private void maybeSyncGlobalCheckpoints() { case STARTED: try { shard.runUnderPrimaryPermit( - () -> shard.maybeSyncGlobalCheckpoint("background"), + () -> sync.accept(shard), e -> { if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { logger.warn( new ParameterizedMessage( - "{} failed to execute background global checkpoint sync", shard.shardId()), e); + "{} failed to execute background {} sync", shard.shardId(), source), e); } }, ThreadPool.Names.SAME, - "background global checkpoint sync"); + "background " + source + " sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } @@ -893,6 +904,15 @@ public String toString() { Property.Dynamic, Property.IndexScope); + // this setting is intentionally not registered, it is only used in tests + public static final Setting RETENTION_LEASE_SYNC_INTERVAL_SETTING = + Setting.timeSetting( + "index.soft_deletes.retention_lease.sync_interval", + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Property.Dynamic, + Property.IndexScope); + /** * Background task that syncs the global checkpoint to replicas. */ @@ -919,6 +939,29 @@ public String toString() { } } + final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask { + + AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) { + super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); + } + + @Override + protected void runInternal() { + indexService.backgroundSyncRetentionLeases(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + public String toString() { + return "retention_lease_background_sync"; + } + + } + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 34ec443a5404a..3b68dfa6addae 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -177,8 +177,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private RetentionLeases retentionLeases = RetentionLeases.EMPTY; /** - * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only - * the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. + * Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired, + * and if any have expired, syncs the retention leases to any replicas. * * @return the retention leases */ diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java new file mode 100644 index 0000000000000..906b505dad7e3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -0,0 +1,185 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +/** + * Replication action responsible for background syncing retention leases to replicas. This action is deliberately a replication action so + * that if a replica misses a background retention lease sync then that shard will not be marked as stale. We have some tolerance for a + * shard copy missing renewals of retention leases since the background sync interval is much smaller than the expected lifetime of + * retention leases. + */ +public class RetentionLeaseBackgroundSyncAction extends TransportReplicationAction< + RetentionLeaseBackgroundSyncAction.Request, + RetentionLeaseBackgroundSyncAction.Request, + ReplicationResponse> { + + public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; + + private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); + + protected Logger getLogger() { + return LOGGER; + } + + @Inject + public RetentionLeaseBackgroundSyncAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final IndicesService indicesService, + final ThreadPool threadPool, + final ShardStateAction shardStateAction, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver, + Request::new, + Request::new, + ThreadPool.Names.MANAGEMENT); + } + + /** + * Background sync the specified retention leases for the specified shard. + * + * @param shardId the shard to sync + * @param retentionLeases the retention leases to sync + */ + public void backgroundSync( + final ShardId shardId, + final RetentionLeases retentionLeases) { + Objects.requireNonNull(shardId); + Objects.requireNonNull(retentionLeases); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + execute( + new Request(shardId, retentionLeases), + ActionListener.wrap( + r -> {}, + e -> { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + } + })); + } + } + + @Override + protected PrimaryResult shardOperationOnPrimary(final Request request, final IndexShard primary) { + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.afterWriteOperation(); + return new PrimaryResult<>(request, new ReplicationResponse()); + } + + @Override + protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){ + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.afterWriteOperation(); + return new ReplicaResult(); + } + + public static final class Request extends ReplicationRequest { + + private RetentionLeases retentionLeases; + + public RetentionLeases getRetentionLeases() { + return retentionLeases; + } + + public Request() { + + } + + public Request(final ShardId shardId, final RetentionLeases retentionLeases) { + super(Objects.requireNonNull(shardId)); + this.retentionLeases = Objects.requireNonNull(retentionLeases); + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + retentionLeases = new RetentionLeases(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(Objects.requireNonNull(out)); + retentionLeases.writeTo(out); + } + + @Override + public String toString() { + return "Request{" + + "retentionLeases=" + retentionLeases + + ", shardId=" + shardId + + ", timeout=" + timeout + + ", index='" + index + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + '}'; + } + + } + + @Override + protected ReplicationResponse newResponseInstance() { + return new ReplicationResponse(); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 89a679abea591..9be7ab046eb8b 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -96,7 +96,7 @@ public RetentionLeaseSyncAction( * @param retentionLeases the retention leases to sync * @param listener the callback to invoke when the sync completes normally or abnormally */ - public void syncRetentionLeasesForShard( + public void sync( final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index a19700a94da4b..927d2ec499960 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -27,7 +27,6 @@ * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on * the primary. */ -@FunctionalInterface public interface RetentionLeaseSyncer { /** @@ -38,9 +37,20 @@ public interface RetentionLeaseSyncer { * @param retentionLeases the retention leases to sync * @param listener the callback when sync completes */ - void syncRetentionLeasesForShard( - ShardId shardId, - RetentionLeases retentionLeases, - ActionListener listener); + void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener); + + void backgroundSync(ShardId shardId, RetentionLeases retentionLeases); + + RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { + @Override + public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { + + } + + @Override + public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { + + } + }; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 42ed7ece4789a..2c1ffd0dccc62 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -111,6 +111,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -212,6 +213,8 @@ Runnable getGlobalCheckpointSyncer() { return globalCheckpointSyncer; } + private final RetentionLeaseSyncer retentionLeaseSyncer; + @Nullable private RecoveryState recoveryState; @@ -266,7 +269,7 @@ public IndexShard( final List searchOperationListener, final List listeners, final Runnable globalCheckpointSyncer, - final BiConsumer> retentionLeaseSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -288,6 +291,7 @@ public IndexShard( listenersList.add(internalIndexingStats); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); this.globalCheckpointSyncer = globalCheckpointSyncer; + this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); final List searchListenersList = new ArrayList<>(searchOperationListener); searchListenersList.add(searchStats); this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger); @@ -322,7 +326,7 @@ public IndexShard( UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - retentionLeaseSyncer); + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener)); this.replicationTracker = replicationTracker; // the query cache is a node-level thing, however we want the most popular filters @@ -1925,7 +1929,7 @@ public void addGlobalCheckpointListener( } /** - * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. + * Get all non-expired retention leases tracked on this shard. * * @return the retention leases */ @@ -1986,6 +1990,15 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); } + /** + * Syncs the current retention leases to all replicas. + */ + public void backgroundSyncRetentionLeases() { + assert assertPrimaryMode(); + verifyNotClosed(); + retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases()); + } + /** * Waits for all operations up to the provided sequence number to complete. * diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5955a749fea34..5fdea4362adbc 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -26,6 +26,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -56,8 +57,10 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; @@ -142,7 +145,8 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, - final RetentionLeaseSyncAction retentionLeaseSyncAction) { + final RetentionLeaseSyncAction retentionLeaseSyncAction, + final RetentionLeaseBackgroundSyncAction retentionLeaseBackgroundSyncAction) { this( settings, (AllocatedIndices>) indicesService, @@ -158,7 +162,20 @@ public IndicesClusterStateService( snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - Objects.requireNonNull(retentionLeaseSyncAction)::syncRetentionLeasesForShard); + new RetentionLeaseSyncer() { + @Override + public void sync( + final ShardId shardId, + final RetentionLeases retentionLeases, + final ActionListener listener) { + Objects.requireNonNull(retentionLeaseSyncAction).sync(shardId, retentionLeases, listener); + } + + @Override + public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { + Objects.requireNonNull(retentionLeaseBackgroundSyncAction).backgroundSync(shardId, retentionLeases); + } + }); } // for tests diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index e5c8fabd9f5ee..520344489adf9 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -153,7 +153,9 @@ private void runExpirationTest(final boolean primaryMode) { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings .builder() - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) + .put( + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), + TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); final ReplicationTracker replicationTracker = new ReplicationTracker( @@ -232,7 +234,9 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings .builder() - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) + .put( + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), + TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); final Map> retentionLeases = new HashMap<>(); final AtomicBoolean invoked = new AtomicBoolean(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java new file mode 100644 index 0000000000000..ff4beea808915 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.mockito.ArgumentCaptor; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { + + private ThreadPool threadPool; + private CapturingTransport transport; + private ClusterService clusterService; + private TransportService transportService; + private ShardStateAction shardStateAction; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createCapturingTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + } + + public void tearDown() throws Exception { + try { + IOUtils.close(transportService, clusterService, transport); + } finally { + terminate(threadPool); + } + super.tearDown(); + } + + public void testRetentionLeaseBackgroundSyncActionOnPrimary() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY)); + final RetentionLeases retentionLeases = mock(RetentionLeases.class); + final RetentionLeaseBackgroundSyncAction.Request request = + new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); + + final ReplicationOperation.PrimaryResult result = + action.shardOperationOnPrimary(request, indexShard); + // the retention leases on the shard should be periodically flushed + verify(indexShard).afterWriteOperation(); + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + } + + public void testRetentionLeaseBackgroundSyncActionOnReplica() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY)); + final RetentionLeases retentionLeases = mock(RetentionLeases.class); + final RetentionLeaseBackgroundSyncAction.Request request = + new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); + + final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard); + // the retention leases on the shard should be updated + verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); + // the retention leases on the shard should be periodically flushed + verify(indexShard).afterWriteOperation(); + // the result should indicate success + final AtomicBoolean success = new AtomicBoolean(); + result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); + assertTrue(success.get()); + } + + public void testRetentionLeaseSyncExecution() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final Logger retentionLeaseSyncActionLogger = mock(Logger.class); + + final RetentionLeases retentionLeases = mock(RetentionLeases.class); + final AtomicBoolean invoked = new AtomicBoolean(); + final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(Settings.EMPTY)) { + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + assertTrue(threadPool.getThreadContext().isSystemContext()); + assertThat(request.shardId(), sameInstance(indexShard.shardId())); + assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); + if (randomBoolean()) { + listener.onResponse(new ReplicationResponse()); + } else { + final Exception e = randomFrom( + new AlreadyClosedException("closed"), + new IndexShardClosedException(indexShard.shardId()), + new RuntimeException("failed")); + listener.onFailure(e); + if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { + final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); + verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); + final ParameterizedMessage message = captor.getValue(); + assertThat(message.getFormat(), equalTo("{} retention lease background sync failed")); + assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); + } + verifyNoMoreInteractions(retentionLeaseSyncActionLogger); + } + invoked.set(true); + } + + @Override + protected Logger getLogger() { + return retentionLeaseSyncActionLogger; + } + }; + + action.backgroundSync(indexShard.shardId(), retentionLeases); + assertTrue(invoked.get()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java new file mode 100644 index 0000000000000..7b89ef4703128 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; + +public class RetentionLeaseBackgroundSyncIT extends ESIntegTestCase { + + public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); + } + + } + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + + public void testBackgroundRetentionLeaseSync() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + // we will add multiple retention leases and expect to see them synced to all replicas + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(length); + final List ids = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + ids.add(id); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + // put a new lease + currentRetentionLeases.put( + id, + primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown))); + latch.await(); + // now renew all existing leases; we expect to see these synced to the replicas + for (int j = 0; j <= i; j++) { + currentRetentionLeases.put( + ids.get(j), + primary.renewRetentionLease( + ids.get(j), + randomLongBetween(currentRetentionLeases.get(ids.get(j)).retainingSequenceNumber(), Long.MAX_VALUE), + source)); + } + assertBusy(() -> { + // check all retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases())); + } + }); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index f7b763d17d08b..c0b13d32dcb9d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -155,7 +155,8 @@ public void testRetentionLeaseSyncActionOnReplica() { final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - final TransportWriteAction.WriteReplicaResult result = action.shardOperationOnReplica(request, indexShard); + final TransportWriteAction.WriteReplicaResult result = + action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be flushed @@ -229,7 +230,7 @@ protected Logger getLogger() { }; // execution happens on the test thread, so no need to register an actual listener to callback - action.syncRetentionLeasesForShard(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); + action.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); assertTrue(invoked.get()); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 8b127bbb96d91..b61901a123110 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; @@ -664,7 +665,7 @@ public static final IndexShard newIndexShard( Collections.emptyList(), Arrays.asList(listeners), () -> {}, - (leases, listener) -> {}, + RetentionLeaseSyncer.EMPTY, cbs); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 76ca9f5b02458..75d8d7e8e2679 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -110,7 +110,9 @@ private void runExpirationTest(final boolean primary) throws IOException { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings .builder() - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) + .put( + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), + TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); // current time is mocked through the thread pool final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory()); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index d240bb01fefb1..122d74121a718 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -130,7 +131,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, (s, leases, listener) -> {}); + IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index e664cc87452fc..ef0464d94cd7d 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -481,7 +482,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod null, primaryReplicaSyncer, s -> {}, - (s, leases, listener) -> {}); + RetentionLeaseSyncer.EMPTY); } private class RecordingIndicesService extends MockIndicesService { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index cc141d5ca29ab..97f1b2e7958a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -59,6 +59,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -385,7 +386,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, - (leases, listener) -> {}, + RetentionLeaseSyncer.EMPTY, breakerService); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index 2df364327e9a3..2c5493759d447 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.security.authz.privilege; +import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.xpack.core.security.support.Automatons; @@ -25,7 +27,8 @@ public final class SystemPrivilege extends Privilege { "indices:admin/template/put", // needed for the TemplateUpgradeService "indices:admin/template/delete", // needed for the TemplateUpgradeService "indices:admin/seq_no/global_checkpoint_sync*", // needed for global checkpoint syncs - "indices:admin/seq_no/retention_lease_sync*", // needed for retention lease syncs + RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs + RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs "indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly ), Automatons.patterns("internal:transport/proxy/*"))); // no proxy actions for system user! diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 77be9f3b1b1f3..906b00ccab0fc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -130,6 +130,9 @@ public void testSystem() throws Exception { assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync"), is(true)); assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[p]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[r]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[p]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true)); assertThat(predicate.test("indices:admin/settings/update"), is(true)); assertThat(predicate.test("indices:admin/settings/foo"), is(false)); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index b4f309d8ee6ee..9f6006c808e05 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -255,6 +255,7 @@ public void testActionsForSystemUserIsAuthorized() { "indices:admin/template/put", "indices:admin/seq_no/global_checkpoint_sync", "indices:admin/seq_no/retention_lease_sync", + "indices:admin/seq_no/retention_lease_background_sync", "indices:admin/settings/update" }; for (String action : actions) { authorize(authentication, action, request);