Skip to content

Commit

Permalink
Deduplicate RepositoryData on a Best Effort Basis (elastic#67947)
Browse files Browse the repository at this point in the history
Enhance transport request deduplicator to allow for more general deduplication.
Make use of that to enable deduplicate RepositoryData under concurrent request load
(which so far has been the only situation where RepositoryData has created unmanageable
memory pressure).
  • Loading branch information
original-brownbear authored Jan 26, 2021
1 parent 03d4ba5 commit 29d1d25
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
* under the License.
*/

package org.elasticsearch.transport;
package org.elasticsearch.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

import java.util.ArrayList;
Expand All @@ -28,9 +27,11 @@
import java.util.function.BiConsumer;

/**
* Deduplicator that keeps track of requests that should not be sent/executed in parallel.
* Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given
* request.
* @param <T> Request type
*/
public final class TransportRequestDeduplicator<T> {
public final class ResultDeduplicator<T, R> {

private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();

Expand All @@ -44,8 +45,8 @@ public final class TransportRequestDeduplicator<T> {
* @param listener Listener to invoke on request completion
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
*/
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
public void executeOnce(T request, ActionListener<R> listener, BiConsumer<T, ActionListener<R>> callback) {
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
if (completionListener != null) {
callback.accept(request, completionListener);
}
Expand All @@ -63,20 +64,21 @@ public int size() {
return requests.size();
}

private final class CompositeListener implements ActionListener<Void> {
private final class CompositeListener implements ActionListener<R> {

private final List<ActionListener<Void>> listeners = new ArrayList<>();
private final List<ActionListener<R>> listeners = new ArrayList<>();

private final T request;

private boolean isNotified;
private Exception failure;
private R response;

CompositeListener(T request) {
this.request = request;
}

CompositeListener addListener(ActionListener<Void> listener) {
CompositeListener addListener(ActionListener<R> listener) {
synchronized (this) {
if (this.isNotified == false) {
listeners.add(listener);
Expand All @@ -86,35 +88,35 @@ CompositeListener addListener(ActionListener<Void> listener) {
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onResponse(null);
listener.onResponse(response);
}
return null;
}

private void onCompleted(Exception failure) {
@Override
public void onResponse(R response) {
synchronized (this) {
this.failure = failure;
this.response = response;
this.isNotified = true;
}
try {
if (failure == null) {
ActionListener.onResponse(listeners, null);
} else {
ActionListener.onFailure(listeners, failure);
}
ActionListener.onResponse(listeners, response);
} finally {
requests.remove(request);
}
}

@Override
public void onResponse(final Void aVoid) {
onCompleted(null);
}

@Override
public void onFailure(Exception failure) {
onCompleted(failure);
synchronized (this) {
this.failure = failure;
this.isNotified = true;
}
try {
ActionListener.onFailure(listeners, failure);
} finally {
requests.remove(request);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class ShardStateAction {

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();

@Inject
public ShardStateAction(ClusterService clusterService, TransportService transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* if there was an error in reading the data.
* @param listener listener that may be resolved on different kinds of threads including transport and cluster state applier threads
* and therefore must fork to a new thread for executing any long running actions
*/
void getRepositoryData(ActionListener<RepositoryData> listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.action.ResultDeduplicator;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1361,11 +1362,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) {
try {
listener.onResponse(cached.repositoryData());
} catch (Exception e) {
listener.onFailure(e);
}
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData()));
return;
}
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
Expand All @@ -1375,7 +1372,12 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
} else {
logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
latestKnownRepoGen);
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
if (bestEffortConsistency) {
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
}
}
}

Expand Down Expand Up @@ -1461,10 +1463,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
// generation repeatedly.

long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN;
while (true) {
final long genToLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();

// A map of snapshots to the shardIds that we already reported to the master as failed
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
new TransportRequestDeduplicator<>();
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator =
new ResultDeduplicator<>();

public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
TransportService transportService, IndicesService indicesService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
Expand All @@ -52,7 +52,7 @@ public class TaskCancellationService {
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
private final TransportService transportService;
private final TaskManager taskManager;
private final TransportRequestDeduplicator<CancelRequest> deduplicator = new TransportRequestDeduplicator<>();
private final ResultDeduplicator<CancelRequest, Void> deduplicator = new ResultDeduplicator<>();

public TaskCancellationService(TransportService transportService) {
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -29,7 +30,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;

public class TransportRequestDeduplicatorTests extends ESTestCase {
public class ResultDeduplicatorTests extends ESTestCase {

public void testRequestDeduplication() throws Exception {
AtomicInteger successCount = new AtomicInteger();
Expand All @@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception {
public void setParentTask(final TaskId taskId) {
}
};
final TransportRequestDeduplicator<TransportRequest> deduplicator = new TransportRequestDeduplicator<>();
final ResultDeduplicator<TransportRequest, Void> deduplicator = new ResultDeduplicator<>();
final SetOnce<ActionListener<Void>> listenerHolder = new SetOnce<>();
int iterationsPerThread = scaledRandomIntBetween(100, 1000);
Thread[] threads = new Thread[between(1, 4)];
Expand Down

0 comments on commit 29d1d25

Please sign in to comment.