From 9dc8f8c9b06c0e0211cbafa3ef8aa8976105b785 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 25 Jan 2021 17:21:06 +0100 Subject: [PATCH 1/3] Deduplicate RepositoryData on a Best Effort Basis Enhance transport request deduplicator to allow for more general deduplication. Make use of that to enable deduplicate RepositoryData under concurrent request load (which so far has been the only situation where RepositoryData has created unmanagable memory pressure). --- .../action/shard/ShardStateAction.java | 5 ++- .../blobstore/BlobStoreRepository.java | 18 +++++--- .../snapshots/SnapshotShardsService.java | 6 +-- .../tasks/TaskCancellationService.java | 4 +- ...r.java => AbstractResultDeduplicator.java} | 45 ++++++++++--------- ...a => AbstractResultDeduplicatorTests.java} | 4 +- 6 files changed, 46 insertions(+), 36 deletions(-) rename server/src/main/java/org/elasticsearch/transport/{TransportRequestDeduplicator.java => AbstractResultDeduplicator.java} (76%) rename server/src/test/java/org/elasticsearch/transport/{TransportRequestDeduplicatorTests.java => AbstractResultDeduplicatorTests.java} (95%) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 4a44f620d7029..a335d44179cbc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -64,7 +64,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.transport.AbstractResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -93,7 +93,8 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final TransportRequestDeduplicator remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>(); + private final AbstractResultDeduplicator remoteFailedShardsDeduplicator = + new AbstractResultDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2d8f9dc88dc7d..f3d70c1d84eb8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -121,6 +121,7 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.AbstractResultDeduplicator; import java.io.FilterInputStream; import java.io.IOException; @@ -1361,11 +1362,7 @@ public void getRepositoryData(ActionListener listener) { // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with // the latest known repository generation if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) { - try { - listener.onResponse(cached.repositoryData()); - } catch (Exception e) { - listener.onFailure(e); - } + repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData())); return; } if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) { @@ -1375,7 +1372,12 @@ public void getRepositoryData(ActionListener listener) { } else { logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(), latestKnownRepoGen); - threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + if (bestEffortConsistency) { + threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + } else { + repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> + threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); + } } } @@ -1461,10 +1463,14 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } + private final AbstractResultDeduplicator repoDataDeduplicator = + new AbstractResultDeduplicator<>(); + private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. // Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same // generation repeatedly. + long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN; while (true) { final long genToLoad; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index b2f010774dff8..06db2f833803e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -57,7 +57,7 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.transport.AbstractResultDeduplicator; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Map> shardSnapshots = new HashMap<>(); // A map of snapshots to the shardIds that we already reported to the master as failed - private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = - new TransportRequestDeduplicator<>(); + private final AbstractResultDeduplicator remoteFailedRequestDeduplicator = + new AbstractResultDeduplicator<>(); public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, IndicesService indicesService) { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java index 0df0d8c6af8e6..56f8b1146ee92 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -36,7 +36,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestDeduplicator; +import org.elasticsearch.transport.AbstractResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -52,7 +52,7 @@ public class TaskCancellationService { private static final Logger logger = LogManager.getLogger(TaskCancellationService.class); private final TransportService transportService; private final TaskManager taskManager; - private final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + private final AbstractResultDeduplicator deduplicator = new AbstractResultDeduplicator<>(); public TaskCancellationService(TransportService transportService) { this.transportService = transportService; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java similarity index 76% rename from server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java rename to server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java index 95c1b0e149118..5e63b874a3638 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java @@ -28,9 +28,11 @@ import java.util.function.BiConsumer; /** - * Deduplicator that keeps track of requests that should not be sent/executed in parallel. + * Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given + * request. + * @param Request type */ -public final class TransportRequestDeduplicator { +public final class AbstractResultDeduplicator { private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); @@ -44,8 +46,8 @@ public final class TransportRequestDeduplicator { * @param listener Listener to invoke on request completion * @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator */ - public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { - ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { + ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); if (completionListener != null) { callback.accept(request, completionListener); } @@ -63,20 +65,21 @@ public int size() { return requests.size(); } - private final class CompositeListener implements ActionListener { + private final class CompositeListener implements ActionListener { - private final List> listeners = new ArrayList<>(); + private final List> listeners = new ArrayList<>(); private final T request; private boolean isNotified; private Exception failure; + private R response; CompositeListener(T request) { this.request = request; } - CompositeListener addListener(ActionListener listener) { + CompositeListener addListener(ActionListener listener) { synchronized (this) { if (this.isNotified == false) { listeners.add(listener); @@ -86,35 +89,35 @@ CompositeListener addListener(ActionListener listener) { if (failure != null) { listener.onFailure(failure); } else { - listener.onResponse(null); + listener.onResponse(response); } return null; } - private void onCompleted(Exception failure) { + @Override + public void onResponse(R response) { synchronized (this) { - this.failure = failure; + this.response = response; this.isNotified = true; } try { - if (failure == null) { - ActionListener.onResponse(listeners, null); - } else { - ActionListener.onFailure(listeners, failure); - } + ActionListener.onResponse(listeners, response); } finally { requests.remove(request); } } - @Override - public void onResponse(final Void aVoid) { - onCompleted(null); - } - @Override public void onFailure(Exception failure) { - onCompleted(failure); + synchronized (this) { + this.failure = failure; + this.isNotified = true; + } + try { + ActionListener.onFailure(listeners, failure); + } finally { + requests.remove(request); + } } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java similarity index 95% rename from server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java rename to server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java index ab178134995a7..f0c34bf9479e2 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java @@ -29,7 +29,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.sameInstance; -public class TransportRequestDeduplicatorTests extends ESTestCase { +public class AbstractResultDeduplicatorTests extends ESTestCase { public void testRequestDeduplication() throws Exception { AtomicInteger successCount = new AtomicInteger(); @@ -40,7 +40,7 @@ public void testRequestDeduplication() throws Exception { public void setParentTask(final TaskId taskId) { } }; - final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + final AbstractResultDeduplicator deduplicator = new AbstractResultDeduplicator<>(); final SetOnce> listenerHolder = new SetOnce<>(); int iterationsPerThread = scaledRandomIntBetween(100, 1000); Thread[] threads = new Thread[between(1, 4)]; From 130216c17ceb9acc73f90c6104e5279992a96574 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 26 Jan 2021 08:57:07 +0100 Subject: [PATCH 2/3] javadoc --- .../main/java/org/elasticsearch/repositories/Repository.java | 2 ++ .../repositories/blobstore/BlobStoreRepository.java | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 322f94c725f49..4fc3e298ee12b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -110,6 +110,8 @@ default Repository create(RepositoryMetadata metadata, Function listener); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f3d70c1d84eb8..27514dba4728b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1463,6 +1463,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } + /** + * {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning + * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is + * unique for a given value of {@link #metadata} at any point in time. + */ private final AbstractResultDeduplicator repoDataDeduplicator = new AbstractResultDeduplicator<>(); From 8e0c76f8770b83ba155c6542bbcc0caa638a31ad Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 26 Jan 2021 16:53:51 +0100 Subject: [PATCH 3/3] rename and move packages --- .../ResultDeduplicator.java} | 5 ++--- .../cluster/action/shard/ShardStateAction.java | 5 ++--- .../repositories/blobstore/BlobStoreRepository.java | 5 ++--- .../org/elasticsearch/snapshots/SnapshotShardsService.java | 6 +++--- .../org/elasticsearch/tasks/TaskCancellationService.java | 4 ++-- ...tDeduplicatorTests.java => ResultDeduplicatorTests.java} | 5 +++-- 6 files changed, 14 insertions(+), 16 deletions(-) rename server/src/main/java/org/elasticsearch/{transport/AbstractResultDeduplicator.java => action/ResultDeduplicator.java} (97%) rename server/src/test/java/org/elasticsearch/transport/{AbstractResultDeduplicatorTests.java => ResultDeduplicatorTests.java} (94%) diff --git a/server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java b/server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java similarity index 97% rename from server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java rename to server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java index 5e63b874a3638..2244fc4bcd679 100644 --- a/server/src/main/java/org/elasticsearch/transport/AbstractResultDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.transport; +package org.elasticsearch.action; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.ArrayList; @@ -32,7 +31,7 @@ * request. * @param Request type */ -public final class AbstractResultDeduplicator { +public final class ResultDeduplicator { private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a335d44179cbc..8a18e9a1899f3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -64,7 +64,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.AbstractResultDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -93,8 +93,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final AbstractResultDeduplicator remoteFailedShardsDeduplicator = - new AbstractResultDeduplicator<>(); + private final ResultDeduplicator remoteFailedShardsDeduplicator = new ResultDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 27514dba4728b..d6a76ef3ae438 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -121,7 +121,7 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.AbstractResultDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import java.io.FilterInputStream; import java.io.IOException; @@ -1468,8 +1468,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is * unique for a given value of {@link #metadata} at any point in time. */ - private final AbstractResultDeduplicator repoDataDeduplicator = - new AbstractResultDeduplicator<>(); + private final ResultDeduplicator repoDataDeduplicator = new ResultDeduplicator<>(); private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 06db2f833803e..c9bb9a598c285 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -57,7 +57,7 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.AbstractResultDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Map> shardSnapshots = new HashMap<>(); // A map of snapshots to the shardIds that we already reported to the master as failed - private final AbstractResultDeduplicator remoteFailedRequestDeduplicator = - new AbstractResultDeduplicator<>(); + private final ResultDeduplicator remoteFailedRequestDeduplicator = + new ResultDeduplicator<>(); public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, IndicesService indicesService) { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java index 56f8b1146ee92..b80b746da3f90 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -36,7 +36,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.AbstractResultDeduplicator; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -52,7 +52,7 @@ public class TaskCancellationService { private static final Logger logger = LogManager.getLogger(TaskCancellationService.class); private final TransportService transportService; private final TaskManager taskManager; - private final AbstractResultDeduplicator deduplicator = new AbstractResultDeduplicator<>(); + private final ResultDeduplicator deduplicator = new ResultDeduplicator<>(); public TaskCancellationService(TransportService transportService) { this.transportService = transportService; diff --git a/server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java similarity index 94% rename from server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java rename to server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java index f0c34bf9479e2..5d41459dd309f 100644 --- a/server/src/test/java/org/elasticsearch/transport/AbstractResultDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ResultDeduplicatorTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -29,7 +30,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.sameInstance; -public class AbstractResultDeduplicatorTests extends ESTestCase { +public class ResultDeduplicatorTests extends ESTestCase { public void testRequestDeduplication() throws Exception { AtomicInteger successCount = new AtomicInteger(); @@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception { public void setParentTask(final TaskId taskId) { } }; - final AbstractResultDeduplicator deduplicator = new AbstractResultDeduplicator<>(); + final ResultDeduplicator deduplicator = new ResultDeduplicator<>(); final SetOnce> listenerHolder = new SetOnce<>(); int iterationsPerThread = scaledRandomIntBetween(100, 1000); Thread[] threads = new Thread[between(1, 4)];