diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java similarity index 75% rename from server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java rename to server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java index 95c1b0e149118..2244fc4bcd679 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.transport; +package org.elasticsearch.action; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.ArrayList; @@ -28,9 +27,11 @@ import java.util.function.BiConsumer; /** - * Deduplicator that keeps track of requests that should not be sent/executed in parallel. + * Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given + * request. + * @param Request type */ -public final class TransportRequestDeduplicator { +public final class ResultDeduplicator { private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); @@ -44,8 +45,8 @@ public final class TransportRequestDeduplicator { * @param listener Listener to invoke on request completion * @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator */ - public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { - ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { + ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); if (completionListener != null) { callback.accept(request, completionListener); } @@ -63,20 +64,21 @@ public int size() { return requests.size(); } - private final class CompositeListener implements ActionListener { + private final class CompositeListener implements ActionListener { - private final List> listeners = new ArrayList<>(); + private final List> listeners = new ArrayList<>(); private final T request; private boolean isNotified; private Exception failure; + private R response; CompositeListener(T request) { this.request = request; } - CompositeListener addListener(ActionListener listener) { + CompositeListener addListener(ActionListener listener) { synchronized (this) { if (this.isNotified == false) { listeners.add(listener); @@ -86,35 +88,35 @@ CompositeListener addListener(ActionListener listener) { if (failure != null) { listener.onFailure(failure); } else { - listener.onResponse(null); + listener.onResponse(response); } return null; } - private void onCompleted(Exception failure) { + @Override + public void onResponse(R response) { synchronized (this) { - this.failure = failure; + this.response = response; this.isNotified = true; } try { - if (failure == null) { - ActionListener.onResponse(listeners, null); - } else { - ActionListener.onFailure(listeners, failure); - } + ActionListener.onResponse(listeners, response); } finally { requests.remove(request); } } - @Override - public void onResponse(final Void aVoid) { - onCompleted(null); - } - @Override public void onFailure(Exception failure) { - onCompleted(failure); + synchronized (this) { + this.failure = failure; + this.isNotified = true; + } + try { + ActionListener.onFailure(listeners, failure); + } finally { + requests.remove(request); + } } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 4a44f620d7029..8a18e9a1899f3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -64,7 +64,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -93,7 +93,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final TransportRequestDeduplicator remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>(); + private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 322f94c725f49..4fc3e298ee12b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -110,6 +110,8 @@ default Repository create(RepositoryMetadata metadata, Function listener); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2d8f9dc88dc7d..d6a76ef3ae438 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -121,6 +121,7 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.ResultDeduplicator; import java.io.FilterInputStream; import java.io.IOException; @@ -1361,11 +1362,7 @@ public void getRepositoryData(ActionListener listener) { // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with // the latest known repository generation if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) { - try { - listener.onResponse(cached.repositoryData()); - } catch (Exception e) { - listener.onFailure(e); - } + repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData())); return; } if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) { @@ -1375,7 +1372,12 @@ public void getRepositoryData(ActionListener listener) { } else { logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(), latestKnownRepoGen); - threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + if (bestEffortConsistency) { + threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + } else { + repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> + threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); + } } } @@ -1461,10 +1463,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } + /** + * {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning + * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is + * unique for a given value of {@link #metadata} at any point in time. + */ + private final ResultDeduplicator repoDataDeduplicator = new ResultDeduplicator<>(); + private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. // Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same // generation repeatedly. + long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN; while (true) { final long genToLoad; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index b2f010774dff8..c9bb9a598c285 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -57,7 +57,7 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Map> shardSnapshots = new HashMap<>(); // A map of snapshots to the shardIds that we already reported to the master as failed - private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = - new TransportRequestDeduplicator<>(); + private final ResultDeduplicator remoteFailedRequestDeduplicator = + new ResultDeduplicator<>(); public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, IndicesService indicesService) { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java index 0df0d8c6af8e6..b80b746da3f90 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -36,7 +36,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -52,7 +52,7 @@ public class TaskCancellationService { private static final Logger logger = LogManager.getLogger(TaskCancellationService.class); private final TransportService transportService; private final TaskManager taskManager; - private final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + private final ResultDeduplicator deduplicator = new ResultDeduplicator<>(); public TaskCancellationService(TransportService transportService) { this.transportService = transportService; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java similarity index 94% rename from server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java rename to server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java index ab178134995a7..5d41459dd309f 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -29,7 +30,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.sameInstance; -public class TransportRequestDeduplicatorTests extends ESTestCase { +public class ResultDeduplicatorTests extends ESTestCase { public void testRequestDeduplication() throws Exception { AtomicInteger successCount = new AtomicInteger(); @@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception { public void setParentTask(final TaskId taskId) { } }; - final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + final ResultDeduplicator deduplicator = new ResultDeduplicator<>(); final SetOnce> listenerHolder = new SetOnce<>(); int iterationsPerThread = scaledRandomIntBetween(100, 1000); Thread[] threads = new Thread[between(1, 4)];