Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate RepositoryData on a Best Effort Basis #67947

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
* under the License.
*/

package org.elasticsearch.transport;
package org.elasticsearch.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

import java.util.ArrayList;
Expand All @@ -28,9 +27,11 @@
import java.util.function.BiConsumer;

/**
* Deduplicator that keeps track of requests that should not be sent/executed in parallel.
* Deduplicator for arbitrary keys and results that can be used to ensure a given action is only executed once at a time for a given
* request.
* @param <T> Request type
*/
public final class TransportRequestDeduplicator<T> {
public final class ResultDeduplicator<T, R> {

private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();

Expand All @@ -44,8 +45,8 @@ public final class TransportRequestDeduplicator<T> {
* @param listener Listener to invoke on request completion
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
*/
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
public void executeOnce(T request, ActionListener<R> listener, BiConsumer<T, ActionListener<R>> callback) {
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
if (completionListener != null) {
callback.accept(request, completionListener);
}
Expand All @@ -63,20 +64,21 @@ public int size() {
return requests.size();
}

private final class CompositeListener implements ActionListener<Void> {
private final class CompositeListener implements ActionListener<R> {

private final List<ActionListener<Void>> listeners = new ArrayList<>();
private final List<ActionListener<R>> listeners = new ArrayList<>();

private final T request;

private boolean isNotified;
private Exception failure;
private R response;

CompositeListener(T request) {
this.request = request;
}

CompositeListener addListener(ActionListener<Void> listener) {
CompositeListener addListener(ActionListener<R> listener) {
synchronized (this) {
if (this.isNotified == false) {
listeners.add(listener);
Expand All @@ -86,35 +88,35 @@ CompositeListener addListener(ActionListener<Void> listener) {
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onResponse(null);
listener.onResponse(response);
}
return null;
}

private void onCompleted(Exception failure) {
@Override
public void onResponse(R response) {
synchronized (this) {
this.failure = failure;
this.response = response;
this.isNotified = true;
}
try {
if (failure == null) {
ActionListener.onResponse(listeners, null);
} else {
ActionListener.onFailure(listeners, failure);
}
ActionListener.onResponse(listeners, response);
} finally {
requests.remove(request);
}
}

@Override
public void onResponse(final Void aVoid) {
onCompleted(null);
}

@Override
public void onFailure(Exception failure) {
onCompleted(failure);
synchronized (this) {
this.failure = failure;
this.isNotified = true;
}
try {
ActionListener.onFailure(listeners, failure);
} finally {
requests.remove(request);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class ShardStateAction {

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();

@Inject
public ShardStateAction(ClusterService clusterService, TransportService transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* if there was an error in reading the data.
* @param listener listener that may be resolved on different kinds of threads including transport and cluster state applier threads
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I audited all usages of this method and I couldn't find any remaining spot where this is a problem (at least in master, might need some adjustments in 7.x).

* and therefore must fork to a new thread for executing any long running actions
*/
void getRepositoryData(ActionListener<RepositoryData> listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.action.ResultDeduplicator;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1361,11 +1362,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached.generation() == latestKnownRepoGen.get() && cached.hasData()) {
try {
listener.onResponse(cached.repositoryData());
} catch (Exception e) {
listener.onFailure(e);
}
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> l.onResponse(cached.repositoryData()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little less significant in savings relative to the other spot but I think it's worth it.

  1. Deserializing the cached RepositoryData is still expensive and it's nice to save the work.
  2. For status APIs this gives a nice natural rate limit by only fanning out after fetching RepositoryData in case of massive concurrency of requests (e.g. snapshotter fetching all snapshots in a 1k snapshots repo)
  3. For other operations like create, delete, clone we want those to run serially anyway (which we generally do via the master service) so its fine if we just run all the listeners in series here to begin with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For status APIs this gives a nice natural rate limit by only fanning out after fetching RepositoryData in case of massive concurrency of requests (e.g. snapshotter fetching all snapshots in a 1k snapshots repo)

I fail to see how this change could act as a rate limit for that scenario, it's a good improvement but the requests would accumulate anyway, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the requests would accumulate anyway, right?

Yea, I guess "rate-limiter" was a poor choice of wording. I guess we "rate limit" the rate at which we dispatch the actions after loading repo data to a single thread at a time but requests still pile up. In practice that probably means that things run much quicker than before effectively but with requests queuing and waiting for the first one to finish instead of executing in parallel it's more of a "thread-limiter" I suppose :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for the clarification!

return;
}
if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
Expand All @@ -1375,7 +1372,12 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
} else {
logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
latestKnownRepoGen);
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
if (bestEffortConsistency) {
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would completely resolve the broken situations we observed where there were 50+ concurrent GENERIC threads fetching RepositoryData and deserializing it over and over, eventually running the system OOM.

threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData)));
}
}
}

Expand Down Expand Up @@ -1461,10 +1463,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
// generation repeatedly.

long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN;
while (true) {
final long genToLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -91,8 +91,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();

// A map of snapshots to the shardIds that we already reported to the master as failed
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
new TransportRequestDeduplicator<>();
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator =
new ResultDeduplicator<>();

public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
TransportService transportService, IndicesService indicesService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestDeduplicator;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
Expand All @@ -52,7 +52,7 @@ public class TaskCancellationService {
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
private final TransportService transportService;
private final TaskManager taskManager;
private final TransportRequestDeduplicator<CancelRequest> deduplicator = new TransportRequestDeduplicator<>();
private final ResultDeduplicator<CancelRequest, Void> deduplicator = new ResultDeduplicator<>();

public TaskCancellationService(TransportService transportService) {
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -29,7 +30,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;

public class TransportRequestDeduplicatorTests extends ESTestCase {
public class ResultDeduplicatorTests extends ESTestCase {

public void testRequestDeduplication() throws Exception {
AtomicInteger successCount = new AtomicInteger();
Expand All @@ -40,7 +41,7 @@ public void testRequestDeduplication() throws Exception {
public void setParentTask(final TaskId taskId) {
}
};
final TransportRequestDeduplicator<TransportRequest> deduplicator = new TransportRequestDeduplicator<>();
final ResultDeduplicator<TransportRequest, Void> deduplicator = new ResultDeduplicator<>();
final SetOnce<ActionListener<Void>> listenerHolder = new SetOnce<>();
int iterationsPerThread = scaledRandomIntBetween(100, 1000);
Thread[] threads = new Thread[between(1, 4)];
Expand Down