From aa11a9d0a76d7f3b603894d6a3d04dd10f6627f9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 19:12:14 -0500 Subject: [PATCH 1/9] Introduce retention lease actions This commit introduces actions for some common retention lease operations that clients need to be able to perform remotely. These actions include add/renew/remove. --- .../elasticsearch/action/ActionModule.java | 9 +- .../elasticsearch/index/engine/Engine.java | 9 +- .../index/engine/InternalEngine.java | 4 +- .../index/engine/ReadOnlyEngine.java | 7 +- .../index/seqno/RetentionLeaseActions.java | 279 ++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 32 +- .../recovery/RecoverySourceHandler.java | 2 +- .../index/engine/InternalEngineTests.java | 3 +- .../seqno/RetentionLeaseActionsTests.java | 228 ++++++++++++++ .../index/seqno/RetentionLeaseIT.java | 2 +- 10 files changed, 560 insertions(+), 15 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 8a8cea82b0a4d..e0e3c206a63e8 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -19,8 +19,8 @@ package org.elasticsearch.action; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; @@ -209,6 +209,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.persistent.RemovePersistentTaskAction; @@ -220,6 +221,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.action.RestFieldCapabilitiesAction; import org.elasticsearch.rest.action.RestMainAction; +import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction; import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction; import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; @@ -251,7 +253,6 @@ import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction; -import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction; import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction; import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction; import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction; @@ -529,6 +530,10 @@ public void reg actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class); actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class); + // retention leases + actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class); + actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class); + return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b79bb079d9427..bac85413a7a92 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -744,7 +744,7 @@ public enum SearcherScope { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public abstract Closeable acquireRetentionLockForPeerRecovery(); + public abstract Closeable acquireRetentionLock(); /** * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). @@ -771,6 +771,13 @@ public abstract int estimateNumberOfHistoryOperations(String source, */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; + /** + * Gets the minimum retained sequence number for this engine. + * + * @return the minimum retained sequence number + */ + public abstract long getMinRetainedSeqNo(); + public abstract TranslogStats getTranslogStats(); /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b2143dcc0407f..66e0d30f164f1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2567,13 +2567,13 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe * Returns the minimum seqno that is retained in the Lucene index. * Operations whose seq# are at least this value should exist in the Lucene index. */ - final long getMinRetainedSeqNo() { + public final long getMinRetainedSeqNo() { assert softDeleteEnabled : Thread.currentThread().getName(); return softDeletesPolicy.getMinRetainedSeqNo(); } @Override - public Closeable acquireRetentionLockForPeerRecovery() { + public Closeable acquireRetentionLock() { if (softDeleteEnabled) { return softDeletesPolicy.acquireRetentionLock(); } else { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index a33f6f8fe27e0..c464a34e78b01 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -272,7 +272,7 @@ public void syncTranslog() { } @Override - public Closeable acquireRetentionLockForPeerRecovery() { + public Closeable acquireRetentionLock() { return () -> {}; } @@ -311,6 +311,11 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe return false; } + @Override + public long getMinRetainedSeqNo() { + throw new UnsupportedOperationException(); + } + @Override public TranslogStats getTranslogStats() { return translogStats; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java new file mode 100644 index 0000000000000..51375db73cbf9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -0,0 +1,279 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class RetentionLeaseActions { + + static abstract class TransportRetentionLeaseAction extends TransportSingleShardAction { + + private Logger logger = LogManager.getLogger(getClass()); + + private final IndicesService indicesService; + + @Inject + public TransportRetentionLeaseAction( + final String name, + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super(name, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); + this.indicesService = Objects.requireNonNull(indicesService); + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + return state + .routingTable() + .shardRoutingTable(request.concreteIndex(), request.request().getShardId().id()) + .primaryShardIt(); + } + + @Override + protected Response shardOperation(final Request request, final ShardId shardId) { + final IndexService indexService = indicesService.indexServiceSafe(request.getShardId().getIndex()); + final IndexShard indexShard = indexService.getShard(request.getShardId().id()); + + final CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener() { + + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + + }; + + indexShard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, request); + + // block until we have the permit + try (Releasable ignore = FutureUtils.get(permit)) { + doRetentionLeaseAction(indexShard, request); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } + }); + } + + return new Response(); + } + + abstract void doRetentionLeaseAction(IndexShard indexShard, Request request); + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected boolean resolveIndex(final Request request) { + return false; + } + + } + + public static class Add extends Action { + + public static final Add INSTANCE = new Add(); + public static final String NAME = "indices:admin/seq_no/add_retention_lease"; + + private Add() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService); + } + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final Request request) { + indexShard.addRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource(), ActionListener.wrap(() -> {})); + } + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class Renew extends Action { + + public static final Renew INSTANCE = new Renew(); + public static final String NAME = "indices:admin/seq_no/renew_retention_lease"; + + private Renew() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService); + } + + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final Request request) { + indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource()); + } + } + + @Override + public Response newResponse() { + return new Response(); + } + + } + + public static class Request extends SingleShardRequest { + + public static long RETAIN_ALL = -1; + + private ShardId shardId; + + public ShardId getShardId() { + return shardId; + } + + private String id; + + public String getId() { + return id; + } + + private long retainingSequenceNumber; + + public long getRetainingSequenceNumber() { + return retainingSequenceNumber; + } + + private String source; + + public String getSource() { + return source; + } + + public Request() { + } + + public Request(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(Objects.requireNonNull(shardId).getIndexName()); + this.shardId = shardId; + this.id = Objects.requireNonNull(id); + if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) { + throw new IllegalArgumentException( + "retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range"); + } + this.retainingSequenceNumber = retainingSequenceNumber; + this.source = Objects.requireNonNull(source); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + id = in.readString(); + retainingSequenceNumber = in.readZLong(); + source = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(id); + out.writeZLong(retainingSequenceNumber); + out.writeString(source); + } + + } + + public static class Response extends ActionResponse { + + } + +} \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1ea894e7aed74..be110fd685322 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -167,6 +167,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.Request.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -1739,8 +1740,8 @@ public void onSettingsChanged() { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireRetentionLockForPeerRecovery() { - return getEngine().acquireRetentionLockForPeerRecovery(); + public Closeable acquireRetentionLock() { + return getEngine().acquireRetentionLock(); } /** @@ -1760,12 +1761,21 @@ public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). - * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()} + * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()} */ public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException { return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo); } + /** + * Gets the minimum retained sequence number for this engine. + * + * @return the minimum retained sequence number + */ + public long getMinRetainedSeqNo() { + return getEngine().getMinRetainedSeqNo(); + } + /** * Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive) * and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading. @@ -1938,7 +1948,13 @@ public RetentionLease addRetentionLease( Objects.requireNonNull(listener); assert assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, listener); + try (Closeable ignore = acquireRetentionLock()) { + final long actualRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; + return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener); + } catch (final IOException e) { + throw new AssertionError(e); + } } /** @@ -1953,7 +1969,13 @@ public RetentionLease addRetentionLease( public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source); + try (Closeable ignore = acquireRetentionLock()) { + final long actualRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; + return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source); + } catch (final IOException e) { + throw new AssertionError(e); + } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a6973a46926a7..40e1a88a349c9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -155,7 +155,7 @@ public void recoverToTarget(ActionListener listener) { assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery(); + final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); final long startingSeqNo; final long requiredSeqNoRangeStart; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d492d07775941..f23665d201206 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -5376,7 +5375,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { engine.forceMerge(randomBoolean()); } - try (Closeable ignored = engine.acquireRetentionLockForPeerRecovery()) { + try (Closeable ignored = engine.acquireRetentionLock()) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java new file mode 100644 index 0000000000000..2f5960a615358 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -0,0 +1,228 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.seqno.RetentionLeaseActions.Request.RETAIN_ALL; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; + +public class RetentionLeaseActionsTests extends ESSingleNodeTestCase { + + public void testAddAction() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final String source = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(id)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); + assertThat(retentionLease.source(), equalTo(source)); + } + + public void testAddAlreadyExists() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final String source = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.Request( + indexService.getShard(0).shardId(), + id, + nextRetainingSequenceNumber, + source)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] already exists"))); + } + + + public void testRenewAction() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final String source = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + + /* + * When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how + * often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time. + * Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and + * sample the thread pool to ensure the clock has in fact advanced. + */ + final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings()); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + /* + * Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the + * lease. + */ + final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + final long timestampUpperBound = threadPool.absoluteTimeInMillis(); + final long start = System.nanoTime(); + + final IndicesStatsResponse initialStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(initialStats.getShards()); + assertThat(initialStats.getShards(), arrayWithSize(1)); + assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease initialRetentionLease = + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + /* + * Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during + * execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the + * suite timeout. It seems there is nothing simple that we can do to avoid this? + */ + do { + final long end = System.nanoTime(); + if (end - start < estimatedTimeInterval.nanos()) { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start))); + } + } while (threadPool.absoluteTimeInMillis() <= timestampUpperBound); + + client() + .execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, source)) + .actionGet(); + + final IndicesStatsResponse renewedStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(renewedStats.getShards()); + assertThat(renewedStats.getShards(), arrayWithSize(1)); + assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease renewedRetentionLease = + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedRetentionLease.id(), equalTo(id)); + assertThat( + renewedRetentionLease.retainingSequenceNumber(), + equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber)); + assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp())); + assertThat(renewedRetentionLease.source(), equalTo(source)); + } + + public void testRenewNotFound() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final String source = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client() + .execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); + } + +} \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index e7561d9f37049..5a2bfc5a793e2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -98,7 +98,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); From 888552fd770eac60eed20ca945f77e97ae30ff91 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 19:17:17 -0500 Subject: [PATCH 2/9] Fix missing newline --- .../elasticsearch/index/seqno/RetentionLeaseActionsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index 2f5960a615358..d538283d240cc 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -225,4 +225,4 @@ public void testRenewNotFound() { assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); } -} \ No newline at end of file +} From a1b9f1b1331d8eb1f7865999fe0ded0da59f2723 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 22:03:16 -0500 Subject: [PATCH 3/9] Add remove action --- .../index/seqno/RetentionLeaseActions.java | 202 ++++++++++++++---- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../seqno/RetentionLeaseActionsTests.java | 18 +- 3 files changed, 178 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 51375db73cbf9..cbbf902a9386c 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -47,10 +47,13 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; public class RetentionLeaseActions { - static abstract class TransportRetentionLeaseAction extends TransportSingleShardAction { + public static final long RETAIN_ALL = -1; + + static abstract class TransportRetentionLeaseAction> extends TransportSingleShardAction { private Logger logger = LogManager.getLogger(getClass()); @@ -64,13 +67,22 @@ public TransportRetentionLeaseAction( final TransportService transportService, final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, - final IndicesService indicesService) { - super(name, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT); + final IndicesService indicesService, + final Supplier requestSupplier) { + super( + name, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + requestSupplier, + ThreadPool.Names.MANAGEMENT); this.indicesService = Objects.requireNonNull(indicesService); } @Override - protected ShardsIterator shards(ClusterState state, InternalRequest request) { + protected ShardsIterator shards(final ClusterState state, final InternalRequest request) { return state .routingTable() .shardRoutingTable(request.concreteIndex(), request.request().getShardId().id()) @@ -78,9 +90,9 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { } @Override - protected Response shardOperation(final Request request, final ShardId shardId) { - final IndexService indexService = indicesService.indexServiceSafe(request.getShardId().getIndex()); - final IndexShard indexShard = indexService.getShard(request.getShardId().id()); + protected Response shardOperation(final T request, final ShardId shardId) { + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); final CompletableFuture permit = new CompletableFuture<>(); final ActionListener onAcquired = new ActionListener() { @@ -119,7 +131,7 @@ public void onFailure(Exception e) { return new Response(); } - abstract void doRetentionLeaseAction(IndexShard indexShard, Request request); + abstract void doRetentionLeaseAction(IndexShard indexShard, T request); @Override protected Response newResponse() { @@ -127,7 +139,7 @@ protected Response newResponse() { } @Override - protected boolean resolveIndex(final Request request) { + protected boolean resolveIndex(final T request) { return false; } @@ -142,7 +154,7 @@ private Add() { super(NAME); } - public static class TransportAction extends TransportRetentionLeaseAction { + public static class TransportAction extends TransportRetentionLeaseAction { @Inject public TransportAction( @@ -152,13 +164,26 @@ public TransportAction( final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, final IndicesService indicesService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService); + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + AddRequest::new); } @Override - void doRetentionLeaseAction(final IndexShard indexShard, final Request request) { - indexShard.addRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource(), ActionListener.wrap(() -> {})); + void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest request) { + indexShard.addRetentionLease( + request.getId(), + request.getRetainingSequenceNumber(), + request.getSource(), + ActionListener.wrap(() -> {})); } + } @Override @@ -177,7 +202,7 @@ private Renew() { super(NAME); } - public static class TransportAction extends TransportRetentionLeaseAction { + public static class TransportAction extends TransportRetentionLeaseAction { @Inject public TransportAction( @@ -187,14 +212,23 @@ public TransportAction( final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, final IndicesService indicesService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService); + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + RenewRequest::new); } @Override - void doRetentionLeaseAction(final IndexShard indexShard, final Request request) { + void doRetentionLeaseAction(final IndexShard indexShard, final RenewRequest request) { indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource()); } + } @Override @@ -204,9 +238,52 @@ public Response newResponse() { } - public static class Request extends SingleShardRequest { + public static class Remove extends Action { + + public static final Renew INSTANCE = new Renew(); + public static final String NAME = "indices:admin/seq_no/renew_retention_lease"; + + private Remove() { + super(NAME); + } + + public static class TransportAction extends TransportRetentionLeaseAction { + + @Inject + public TransportAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + NAME, + threadPool, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + indicesService, + RemoveRequest::new); + } + + + @Override + void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request) { + indexShard.removeRetentionLease(request.getId(), ActionListener.wrap(() -> {})); + } + + } + + @Override + public Response newResponse() { + return new Response(); + } + + } - public static long RETAIN_ALL = -1; + private abstract static class Request> extends SingleShardRequest { private ShardId shardId; @@ -220,6 +297,38 @@ public String getId() { return id; } + public Request() { + } + + public Request(final ShardId shardId, final String id) { + super(Objects.requireNonNull(shardId).getIndexName()); + this.shardId = shardId; + this.id = Objects.requireNonNull(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + id = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(id); + } + + } + + private abstract static class AddOrRenewRequest> extends Request { + private long retainingSequenceNumber; public long getRetainingSequenceNumber() { @@ -232,46 +341,67 @@ public String getSource() { return source; } - public Request() { + public AddOrRenewRequest() { } - public Request(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { - super(Objects.requireNonNull(shardId).getIndexName()); - this.shardId = shardId; - this.id = Objects.requireNonNull(id); + public AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id); if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) { - throw new IllegalArgumentException( - "retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range"); + throw new IllegalArgumentException("retaining sequence number [" + retainingSequenceNumber + "] out of range"); } this.retainingSequenceNumber = retainingSequenceNumber; this.source = Objects.requireNonNull(source); } @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { + public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - shardId = ShardId.readShardId(in); - id = in.readString(); retainingSequenceNumber = in.readZLong(); source = in.readString(); } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - shardId.writeTo(out); - out.writeString(id); out.writeZLong(retainingSequenceNumber); out.writeString(source); } } + public static class AddRequest extends AddOrRenewRequest { + + public AddRequest() { + } + + public AddRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id, retainingSequenceNumber, source); + } + + } + + public static class RenewRequest extends AddOrRenewRequest { + + public RenewRequest() { + } + + public RenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + super(shardId, id, retainingSequenceNumber, source); + } + + } + + public static class RemoveRequest extends Request { + + public RemoveRequest() { + } + + public RemoveRequest(final ShardId shardId, final String id) { + super(shardId, id); + } + + } + public static class Response extends ActionResponse { } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e6d69e404a0ce..4b2d134f5385c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -167,7 +167,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.index.seqno.RetentionLeaseActions.Request.RETAIN_ALL; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index d538283d240cc..7bb67831f885b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; -import static org.elasticsearch.index.seqno.RetentionLeaseActions.Request.RETAIN_ALL; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -57,7 +57,7 @@ public void testAddAction() { client() .execute( RetentionLeaseActions.Add.INSTANCE, - new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) .actionGet(); final IndicesStatsResponse stats = client() @@ -90,7 +90,7 @@ public void testAddAlreadyExists() { client() .execute( RetentionLeaseActions.Add.INSTANCE, - new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) .actionGet(); final long nextRetainingSequenceNumber = @@ -102,7 +102,7 @@ public void testAddAlreadyExists() { () -> client() .execute( RetentionLeaseActions.Add.INSTANCE, - new RetentionLeaseActions.Request( + new RetentionLeaseActions.AddRequest( indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, @@ -136,7 +136,7 @@ public void testRenewAction() throws InterruptedException { client() .execute( RetentionLeaseActions.Add.INSTANCE, - new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) .actionGet(); /* @@ -179,7 +179,7 @@ public void testRenewAction() throws InterruptedException { client() .execute( RetentionLeaseActions.Renew.INSTANCE, - new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, source)) + new RetentionLeaseActions.RenewRequest(indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, source)) .actionGet(); final IndicesStatsResponse renewedStats = client() @@ -220,7 +220,11 @@ public void testRenewNotFound() { () -> client() .execute( RetentionLeaseActions.Renew.INSTANCE, - new RetentionLeaseActions.Request(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + new RetentionLeaseActions.RenewRequest( + indexService.getShard(0).shardId(), + id, + retainingSequenceNumber, + source)) .actionGet()); assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); } From 5b38c5354d6d6e04b5b8e8da01ef152d096a8c4d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 22:03:37 -0500 Subject: [PATCH 4/9] Make logger final --- .../org/elasticsearch/index/seqno/RetentionLeaseActions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index cbbf902a9386c..f897d66456fcd 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -55,7 +55,7 @@ public class RetentionLeaseActions { static abstract class TransportRetentionLeaseAction> extends TransportSingleShardAction { - private Logger logger = LogManager.getLogger(getClass()); + private final Logger logger = LogManager.getLogger(getClass()); private final IndicesService indicesService; From 9996a46aac47a9bb11d30683a45401dc0b575894 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 23:41:09 -0500 Subject: [PATCH 5/9] Iteration --- .../elasticsearch/action/ActionModule.java | 1 + .../index/seqno/RetentionLeaseActions.java | 86 ++++++++----------- .../seqno/RetentionLeaseActionsTests.java | 56 ++++++++++++ .../index/seqno/RetentionLeaseIT.java | 4 +- 4 files changed, 96 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index e0e3c206a63e8..83e1e01614435 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -533,6 +533,7 @@ public void reg // retention leases actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class); actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class); + actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index f897d66456fcd..829acf54dd161 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.seqno; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -36,7 +34,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -46,7 +43,6 @@ import java.io.IOException; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class RetentionLeaseActions { @@ -55,8 +51,6 @@ public class RetentionLeaseActions { static abstract class TransportRetentionLeaseAction> extends TransportSingleShardAction { - private final Logger logger = LogManager.getLogger(getClass()); - private final IndicesService indicesService; @Inject @@ -90,48 +84,35 @@ protected ShardsIterator shards(final ClusterState state, final InternalRequest } @Override - protected Response shardOperation(final T request, final ShardId shardId) { + protected void asyncShardOperation(T request, ShardId shardId, final ActionListener listener) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { - final CompletableFuture permit = new CompletableFuture<>(); - final ActionListener onAcquired = new ActionListener() { - - @Override - public void onResponse(Releasable releasable) { - if (permit.complete(releasable) == false) { - releasable.close(); - } - } - - @Override - public void onFailure(Exception e) { - permit.completeExceptionally(e); - } - - }; - - indexShard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, request); - - // block until we have the permit - try (Releasable ignore = FutureUtils.get(permit)) { - doRetentionLeaseAction(indexShard, request); - } finally { - // just in case we got an exception (likely interrupted) while waiting for the get - permit.whenComplete((r, e) -> { - if (r != null) { - r.close(); - } - if (e != null) { - logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); - } - }); - } + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignore = releasable) { + doRetentionLeaseAction(indexShard, request, listener); + } + } - return new Response(); + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + + }, + ThreadPool.Names.SAME, + request); + } + + @Override + protected Response shardOperation(final T request, final ShardId shardId) throws IOException { + throw new UnsupportedOperationException(); } - abstract void doRetentionLeaseAction(IndexShard indexShard, T request); + abstract void doRetentionLeaseAction(IndexShard indexShard, T request, ActionListener listener); @Override protected Response newResponse() { @@ -176,12 +157,14 @@ public TransportAction( } @Override - void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest request) { + void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest request, final ActionListener listener) { indexShard.addRetentionLease( request.getId(), request.getRetainingSequenceNumber(), request.getSource(), - ActionListener.wrap(() -> {})); + ActionListener.wrap( + r -> listener.onResponse(new Response()), + listener::onFailure)); } } @@ -225,8 +208,9 @@ public TransportAction( @Override - void doRetentionLeaseAction(final IndexShard indexShard, final RenewRequest request) { + void doRetentionLeaseAction(final IndexShard indexShard, final RenewRequest request, final ActionListener listener) { indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource()); + listener.onResponse(new Response()); } } @@ -240,8 +224,8 @@ public Response newResponse() { public static class Remove extends Action { - public static final Renew INSTANCE = new Renew(); - public static final String NAME = "indices:admin/seq_no/renew_retention_lease"; + public static final Remove INSTANCE = new Remove(); + public static final String NAME = "indices:admin/seq_no/remove_retention_lease"; private Remove() { super(NAME); @@ -270,8 +254,12 @@ public TransportAction( @Override - void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request) { - indexShard.removeRetentionLease(request.getId(), ActionListener.wrap(() -> {})); + void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request, final ActionListener listener) { + indexShard.removeRetentionLease( + request.getId(), + ActionListener.wrap( + r -> listener.onResponse(new Response()), + listener::onFailure)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index 7bb67831f885b..e03d1fc448db5 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -229,4 +229,60 @@ public void testRenewNotFound() { assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); } + public void testRemoveAction() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + final String source = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + client() + .execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id)) + .actionGet(); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + } + + public void testRemoveNotFound() { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + + final String id = randomAlphaOfLength(8); + + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client() + .execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id)) + .actionGet()); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index ae005c9d5bcec..d92db46701df8 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -149,7 +149,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -160,7 +160,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; currentRetentionLeases.remove(id); latch.await(); retentionLock.close(); From 57069b2cf562095e342f0121a0219d7993f8a48f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Feb 2019 23:42:19 -0500 Subject: [PATCH 6/9] Fix extra newline --- .../elasticsearch/index/seqno/RetentionLeaseActionsTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index e03d1fc448db5..3b7ab2fda176c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -111,7 +111,6 @@ public void testAddAlreadyExists() { assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] already exists"))); } - public void testRenewAction() throws InterruptedException { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) From 6cfca14e7a6bb01b66fce6ce2cfbf18c3f028571 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 12 Feb 2019 00:11:12 -0500 Subject: [PATCH 7/9] Strong tests --- .../index/seqno/RetentionLeaseActions.java | 2 +- .../seqno/RetentionLeaseActionsTests.java | 256 +++++++++++++++++- 2 files changed, 252 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 829acf54dd161..62cb09998ad9c 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -84,7 +84,7 @@ protected ShardsIterator shards(final ClusterState state, final InternalRequest } @Override - protected void asyncShardOperation(T request, ShardId shardId, final ActionListener listener) throws IOException { + protected void asyncShardOperation(T request, ShardId shardId, final ActionListener listener) { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); indexShard.acquirePrimaryOperationPermit( diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index 3b7ab2fda176c..01ed025aeb385 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -19,18 +19,23 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.hamcrest.Matchers.arrayWithSize; @@ -52,8 +57,8 @@ public void testAddAction() { ensureGreen("index"); final String id = randomAlphaOfLength(8); - final String source = randomAlphaOfLength(8); final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); client() .execute( RetentionLeaseActions.Add.INSTANCE, @@ -85,8 +90,8 @@ public void testAddAlreadyExists() { ensureGreen("index"); final String id = randomAlphaOfLength(8); - final String source = randomAlphaOfLength(8); final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); client() .execute( RetentionLeaseActions.Add.INSTANCE, @@ -121,8 +126,8 @@ public void testRenewAction() throws InterruptedException { ensureGreen("index"); final String id = randomAlphaOfLength(8); - final String source = randomAlphaOfLength(8); final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); /* * When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how @@ -211,8 +216,8 @@ public void testRenewNotFound() { ensureGreen("index"); final String id = randomAlphaOfLength(8); - final String source = randomAlphaOfLength(8); final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -238,8 +243,8 @@ public void testRemoveAction() { ensureGreen("index"); final String id = randomAlphaOfLength(8); - final String source = randomAlphaOfLength(8); final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); client() .execute( RetentionLeaseActions.Add.INSTANCE, @@ -284,4 +289,245 @@ public void testRemoveNotFound() { assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); } + public void testAddUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, source), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(id)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber)); + assertThat(retentionLease.source(), equalTo(source)); + } + + public void testRenewUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + /* + * When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how + * often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time. + * Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and + * sample the thread pool to ensure the clock has in fact advanced. + */ + final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings()); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + /* + * Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the + * lease. + */ + final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class); + final long timestampUpperBound = threadPool.absoluteTimeInMillis(); + final long start = System.nanoTime(); + + final IndicesStatsResponse initialStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(initialStats.getShards()); + assertThat(initialStats.getShards(), arrayWithSize(1)); + assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats()); + assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease initialRetentionLease = + initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + + final long nextRetainingSequenceNumber = + retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL + : randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE); + + /* + * Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during + * execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the + * suite timeout. It seems there is nothing simple that we can do to avoid this? + */ + do { + final long end = System.nanoTime(); + if (end - start < estimatedTimeInterval.nanos()) { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start))); + } + } while (threadPool.absoluteTimeInMillis() <= timestampUpperBound); + + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Renew.INSTANCE, + new RetentionLeaseActions.RenewRequest(shardId, id, nextRetainingSequenceNumber, source), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse renewedStats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + + assertNotNull(renewedStats.getShards()); + assertThat(renewedStats.getShards(), arrayWithSize(1)); + assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats()); + assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + final RetentionLease renewedRetentionLease = + renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next(); + assertThat(renewedRetentionLease.id(), equalTo(id)); + assertThat( + renewedRetentionLease.retainingSequenceNumber(), + equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber)); + assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp())); + assertThat(renewedRetentionLease.source(), equalTo(source)); + } + + public void testRemoveUnderBlock() throws InterruptedException { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexService indexService = createIndex("index", settings); + ensureGreen("index"); + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + + client() + .execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source)) + .actionGet(); + + runActionUnderBlockTest( + indexService, + (shardId, actionLatch) -> + client().execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(shardId, id), + new ActionListener() { + + @Override + public void onResponse(final RetentionLeaseActions.Response response) { + actionLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + })); + + final IndicesStatsResponse stats = client() + .execute( + IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices("index")) + .actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(1)); + assertNotNull(stats.getShards()[0].getRetentionLeaseStats()); + assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0)); + } + + /* + * Tests that use this method are ensuring that the asynchronous usage of the permits API when permit acquisition is blocked is + * correctly handler. In these scenarios, we first acquire all permits. Then we submit a request to one of the retention lease actions + * (via the consumer callback). That invocation will go asynchronous and be queued, since all permits are blocked. Then we release the + * permit block and except that the callbacks occur correctly. These assertions happen after returning from this method. + */ + private void runActionUnderBlockTest( + final IndexService indexService, + final BiConsumer consumer) throws InterruptedException { + + final CountDownLatch blockedLatch = new CountDownLatch(1); + final CountDownLatch unblockLatch = new CountDownLatch(1); + indexService.getShard(0).acquireAllPrimaryOperationsPermits( + new ActionListener() { + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignore = releasable) { + blockedLatch.countDown(); + unblockLatch.await(); + } catch (final InterruptedException e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + fail(e.toString()); + } + + }, + TimeValue.timeValueHours(1)); + + blockedLatch.await(); + + final CountDownLatch actionLatch = new CountDownLatch(1); + + consumer.accept(indexService.getShard(0).shardId(), actionLatch); + + unblockLatch.countDown(); + actionLatch.await(); + } + } From 7fb5b1f4e9021268abb4b02b61266d093ae2a743 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 12 Feb 2019 00:15:43 -0500 Subject: [PATCH 8/9] Add Javadocs --- .../elasticsearch/index/seqno/RetentionLeaseActions.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 62cb09998ad9c..6bb65e34e6adc 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -45,6 +45,13 @@ import java.util.Objects; import java.util.function.Supplier; +/** + * This class holds all actions related to retention leases. Note carefully that these actions are executed under a primary permit. Care is + * taken to thread the listener through the invocations so that for the sync APIs we do not notify the listener until these APIs have + * responded with success. Additionally, note the use of + * {@link TransportSingleShardAction#asyncShardOperation(SingleShardRequest, ShardId, ActionListener)} to handle the case when acquiring + * permits goes asynchronous because acquiring permits is blocked + */ public class RetentionLeaseActions { public static final long RETAIN_ALL = -1; From 7e9543c82d0e5b503006dafe80ec485df1852b43 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 12 Feb 2019 00:55:37 -0500 Subject: [PATCH 9/9] Fix checkstyle and tests --- .../index/seqno/RetentionLeaseActions.java | 12 ++++++------ .../index/seqno/RetentionLeaseActionsTests.java | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 6bb65e34e6adc..f35e4906131e1 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -56,12 +56,12 @@ public class RetentionLeaseActions { public static final long RETAIN_ALL = -1; - static abstract class TransportRetentionLeaseAction> extends TransportSingleShardAction { + abstract static class TransportRetentionLeaseAction> extends TransportSingleShardAction { private final IndicesService indicesService; @Inject - public TransportRetentionLeaseAction( + TransportRetentionLeaseAction( final String name, final ThreadPool threadPool, final ClusterService clusterService, @@ -292,10 +292,10 @@ public String getId() { return id; } - public Request() { + Request() { } - public Request(final ShardId shardId, final String id) { + Request(final ShardId shardId, final String id) { super(Objects.requireNonNull(shardId).getIndexName()); this.shardId = shardId; this.id = Objects.requireNonNull(id); @@ -336,10 +336,10 @@ public String getSource() { return source; } - public AddOrRenewRequest() { + AddOrRenewRequest() { } - public AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { + AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) { super(shardId, id); if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) { throw new IllegalArgumentException("retaining sequence number [" + retainingSequenceNumber + "] out of range"); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java index b2680404bc871..bff4493321289 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseActionsTests.java @@ -230,7 +230,7 @@ public void testRenewNotFound() { retainingSequenceNumber, source)) .actionGet()); - assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found"))); } public void testRemoveAction() { @@ -286,7 +286,7 @@ public void testRemoveNotFound() { RetentionLeaseActions.Remove.INSTANCE, new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id)) .actionGet()); - assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] does not exist"))); + assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found"))); } public void testAddUnderBlock() throws InterruptedException {