diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 8a8cea82b0a4d..83e1e01614435 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -19,8 +19,8 @@ package org.elasticsearch.action; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; @@ -209,6 +209,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.persistent.RemovePersistentTaskAction; @@ -220,6 +221,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.action.RestFieldCapabilitiesAction; import org.elasticsearch.rest.action.RestMainAction; +import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction; import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction; import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; @@ -251,7 +253,6 @@ import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction; -import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; @@ -529,6 +530,11 @@ public void reg actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class); actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class); + // retention leases + actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class); + actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class); + actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); + return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b79bb079d9427..bac85413a7a92 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -744,7 +744,7 @@ public enum SearcherScope { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public abstract Closeable acquireRetentionLockForPeerRecovery(); + public abstract Closeable acquireRetentionLock(); /** * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). @@ -771,6 +771,13 @@ public abstract int estimateNumberOfHistoryOperations(String source, */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; + /** + * Gets the minimum retained sequence number for this engine. + * + * @return the minimum retained sequence number + */ + public abstract long getMinRetainedSeqNo(); + public abstract TranslogStats getTranslogStats(); /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b2143dcc0407f..66e0d30f164f1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2567,13 +2567,13 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe * Returns the minimum seqno that is retained in the Lucene index. * Operations whose seq# are at least this value should exist in the Lucene index. */ - final long getMinRetainedSeqNo() { + public final long getMinRetainedSeqNo() { assert softDeleteEnabled : Thread.currentThread().getName(); return softDeletesPolicy.getMinRetainedSeqNo(); } @Override - public Closeable acquireRetentionLockForPeerRecovery() { + public Closeable acquireRetentionLock() { if (softDeleteEnabled) { return softDeletesPolicy.acquireRetentionLock(); } else { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index a33f6f8fe27e0..c464a34e78b01 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -272,7 +272,7 @@ public void syncTranslog() { } @Override - public Closeable acquireRetentionLockForPeerRecovery() { + public Closeable acquireRetentionLock() { return () -> {}; } @@ -311,6 +311,11 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe return false; } + @Override + public long getMinRetainedSeqNo() { + throw new UnsupportedOperationException(); + } + @Override public TranslogStats getTranslogStats() { return translogStats; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java new file mode 100644 index 0000000000000..f35e4906131e1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -0,0 +1,404 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * This class holds all actions related to retention leases. Note carefully that these actions are executed under a primary permit. Care is + * taken to thread the listener through the invocations so that for the sync APIs we do not notify the listener until these APIs have + * responded with success. Additionally, note the use of + * {@link TransportSingleShardAction#asyncShardOperation(SingleShardRequest, ShardId, ActionListener)} to handle the case when acquiring + * permits goes asynchronous because acquiring permits is blocked + */ +public class RetentionLeaseActions { + + public static final long RETAIN_ALL = -1; + + abstract static class TransportRetentionLeaseAction> extends TransportSingleShardAction { + + private final IndicesService indicesService; + + @Inject + TransportRetentionLeaseAction( + final String name, + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService, + final Supplier requestSupplier) { + super( + name, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + requestSupplier, + ThreadPool.Names.MANAGEMENT); + this.indicesService = Objects.requireNonNull(indicesService); + } + + @Override + protected ShardsIterator shards(final ClusterState state, final InternalRequest request) { + return state + .routingTable() + .shardRoutingTable(request.concreteIndex(), request.request().getShardId().id()) + .primaryShardIt(); + } + + @Override + protected void asyncShardOperation(T request, ShardId shardId, final ActionListener listener) { + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignore = releasable) { + doRetentionLeaseAction(indexShard, request, listener); + } + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + + }, + ThreadPool.Names.SAME, + request); + } + + @Override + protected Response shardOperation(final T request, final ShardId shardId) throws IOException { + throw new UnsupportedOperationException(); + } + + abstract void doRetentionLeaseAction(IndexShard indexShard, T request, ActionListener listener); + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected boolean resolveIndex(final T request) { + return false; + } + + } + + public static class Add extends Action { + + public static final Add INSTANCE = new Add(); + public static final String NAME = "indices:admin/seq_no/add_retention_lease"; + + private Add() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + AddRequest::new); + } + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest request, final ActionListener listener) { + indexShard.addRetentionLease( + request.getId(), + request.getRetainingSequenceNumber(), + request.getSource(), + ActionListener.wrap( + r -> listener.onResponse(new Response()), + listener::onFailure)); + } + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class Renew extends Action { + + public static final Renew INSTANCE = new Renew(); + public static final String NAME = "indices:admin/seq_no/renew_retention_lease"; + + private Renew() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + RenewRequest::new); + } + + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final RenewRequest request, final ActionListener listener) { + indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource()); + listener.onResponse(new Response()); + } + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class Remove extends Action { + + public static final Remove INSTANCE = new Remove(); + public static final String NAME = "indices:admin/seq_no/remove_retention_lease"; + + private Remove() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + RemoveRequest::new); + } + + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request, final ActionListener listener) { + indexShard.removeRetentionLease( + request.getId(), + ActionListener.wrap( + r -> listener.onResponse(new Response()), + listener::onFailure)); + } + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + private abstract static class Request> extends SingleShardRequest { + + private ShardId shardId; + + public ShardId getShardId() { + return shardId; + } + + private String id; + + public String getId() { + return id; + } + + Request() { + } + + Request(final ShardId shardId, final String id) { + super(Objects.requireNonNull(shardId).getIndexName()); + this.shardId = shardId; + this.id = Objects.requireNonNull(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + id = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(id); + } + + } + + private abstract static class AddOrRenewRequest> extends Request { + + private long retainingSequenceNumber; + + public long getRetainingSequenceNumber() { + return retainingSequenceNumber; + } + + private String source; + + public String getSource() { + return source; + } + + AddOrRenewRequest() { + } + + AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id); + if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) { + throw new IllegalArgumentException("retaining sequence number [" + retainingSequenceNumber + "] out of range"); + } + this.retainingSequenceNumber = retainingSequenceNumber; + this.source = Objects.requireNonNull(source); + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + retainingSequenceNumber = in.readZLong(); + source = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(retainingSequenceNumber); + out.writeString(source); + } + + } + + public static class AddRequest extends AddOrRenewRequest { + + public AddRequest() { + } + + public AddRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id, retainingSequenceNumber, source); + } + + } + + public static class RenewRequest extends AddOrRenewRequest { + + public RenewRequest() { + } + + public RenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id, retainingSequenceNumber, source); + } + + } + + public static class RemoveRequest extends Request { + + public RemoveRequest() { + } + + public RemoveRequest(final ShardId shardId, final String id) { + super(shardId, id); + } + + } + + public static class Response extends ActionResponse { + + } + +} \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 63307af0ac67c..4b2d134f5385c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -167,6 +167,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -1739,8 +1740,8 @@ public void onSettingsChanged() { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireRetentionLockForPeerRecovery() { - return getEngine().acquireRetentionLockForPeerRecovery(); + public Closeable acquireRetentionLock() { + return getEngine().acquireRetentionLock(); } /** @@ -1760,12 +1761,21 @@ public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). - * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()} + * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()} */ public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException { return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo); } + /** + * Gets the minimum retained sequence number for this engine. + * + * @return the minimum retained sequence number + */ + public long getMinRetainedSeqNo() { + return getEngine().getMinRetainedSeqNo(); + } + /** * Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive) * and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading. @@ -1938,7 +1948,13 @@ public RetentionLease addRetentionLease( Objects.requireNonNull(listener); assert assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, listener); + try (Closeable ignore = acquireRetentionLock()) { + final long actualRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; + return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener); + } catch (final IOException e) { + throw new AssertionError(e); + } } /** @@ -1953,7 +1969,13 @@ public RetentionLease addRetentionLease( public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source); + try (Closeable ignore = acquireRetentionLock()) { + final long actualRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; + return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source); + } catch (final IOException e) { + throw new AssertionError(e); + } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a6973a46926a7..40e1a88a349c9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -155,7 +155,7 @@ public void recoverToTarget(ActionListener listener) { assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery(); + final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); final long startingSeqNo; final long requiredSeqNoRangeStart; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d492d07775941..f23665d201206 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -5376,7 +5375,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { engine.forceMerge(randomBoolean()); } - try (Closeable ignored = engine.acquireRetentionLockForPeerRecovery()) { + try (Closeable ignored = engine.acquireRetentionLock()) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java new file mode 100644 index 0000000000000..bff4493321289 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -0,0 +1,533 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; + +public class RetentionLeaseActionsTests extends ESSingleNodeTestCase { + + public void testAddAction() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(id)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); + assertThat(retentionLease.source(), equalTo(source)); + } + + public void testAddAlreadyExists() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + final RetentionLeaseAlreadyExistsException e = expectThrows( + RetentionLeaseAlreadyExistsException.class, + () -> client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest( + indexService.getShard(0).shardId(), + id, + nextRetainingSequenceNumber, + source)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] already exists"))); + } + + public void testRenewAction() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + /* + * When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how + * often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time. + * Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and + * sample the thread pool to ensure the clock has in fact advanced. + */ + final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings()); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + /* + * Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the + * lease. + */ + final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + final long timestampUpperBound = threadPool.absoluteTimeInMillis(); + final long start = System.nanoTime(); + + final IndicesStatsResponse initialStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(initialStats.getShards()); + assertThat(initialStats.getShards(), arrayWithSize(1)); + assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease initialRetentionLease = + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + /* + * Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during + * execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the + * suite timeout. It seems there is nothing simple that we can do to avoid this? + */ + do { + final long end = System.nanoTime(); + if (end - start < estimatedTimeInterval.nanos()) { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start))); + } + } while (threadPool.absoluteTimeInMillis() <= timestampUpperBound); + + client() + .execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.RenewRequest(indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, source)) + .actionGet(); + + final IndicesStatsResponse renewedStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(renewedStats.getShards()); + assertThat(renewedStats.getShards(), arrayWithSize(1)); + assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease renewedRetentionLease = + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedRetentionLease.id(), equalTo(id)); + assertThat( + renewedRetentionLease.retainingSequenceNumber(), + equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber)); + assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp())); + assertThat(renewedRetentionLease.source(), equalTo(source)); + } + + public void testRenewNotFound() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + final RetentionLeaseNotFoundException e = expectThrows( + RetentionLeaseNotFoundException.class, + () -> client() + .execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.RenewRequest( + indexService.getShard(0).shardId(), + id, + retainingSequenceNumber, + source)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found"))); + } + + public void testRemoveAction() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + client() + .execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id)) + .actionGet(); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + } + + public void testRemoveNotFound() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + + final RetentionLeaseNotFoundException e = expectThrows( + RetentionLeaseNotFoundException.class, + () -> client() + .execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found"))); + } + + public void testAddUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, source), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(id)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); + assertThat(retentionLease.source(), equalTo(source)); + } + + public void testRenewUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + /* + * When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how + * often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time. + * Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and + * sample the thread pool to ensure the clock has in fact advanced. + */ + final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings()); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + /* + * Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the + * lease. + */ + final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + final long timestampUpperBound = threadPool.absoluteTimeInMillis(); + final long start = System.nanoTime(); + + final IndicesStatsResponse initialStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(initialStats.getShards()); + assertThat(initialStats.getShards(), arrayWithSize(1)); + assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease initialRetentionLease = + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + /* + * Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during + * execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the + * suite timeout. It seems there is nothing simple that we can do to avoid this? + */ + do { + final long end = System.nanoTime(); + if (end - start < estimatedTimeInterval.nanos()) { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start))); + } + } while (threadPool.absoluteTimeInMillis() <= timestampUpperBound); + + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.RenewRequest(shardId, id, nextRetainingSequenceNumber, source), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse renewedStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(renewedStats.getShards()); + assertThat(renewedStats.getShards(), arrayWithSize(1)); + assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease renewedRetentionLease = + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedRetentionLease.id(), equalTo(id)); + assertThat( + renewedRetentionLease.retainingSequenceNumber(), + equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber)); + assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp())); + assertThat(renewedRetentionLease.source(), equalTo(source)); + } + + public void testRemoveUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(shardId, id), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + } + + /* + * Tests that use this method are ensuring that the asynchronous usage of the permits API when permit acquisition is blocked is + * correctly handler. In these scenarios, we first acquire all permits. Then we submit a request to one of the retention lease actions + * (via the consumer callback). That invocation will go asynchronous and be queued, since all permits are blocked. Then we release the + * permit block and except that the callbacks occur correctly. These assertions happen after returning from this method. + */ + private void runActionUnderBlockTest( + final IndexService indexService, + final BiConsumer consumer) throws InterruptedException { + + final CountDownLatch blockedLatch = new CountDownLatch(1); + final CountDownLatch unblockLatch = new CountDownLatch(1); + indexService.getShard(0).acquireAllPrimaryOperationsPermits( + new ActionListener() { + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignore = releasable) { + blockedLatch.countDown(); + unblockLatch.await(); + } catch (final InterruptedException e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + }, + TimeValue.timeValueHours(1)); + + blockedLatch.await(); + + final CountDownLatch actionLatch = new CountDownLatch(1); + + consumer.accept(indexService.getShard(0).shardId(), actionLatch); + + unblockLatch.countDown(); + actionLatch.await(); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index f67c01c8d8f7d..d92db46701df8 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -98,7 +98,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -149,7 +149,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -160,7 +160,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.remove(id); latch.await(); retentionLock.close();