Skip to content

Commit

Permalink
Tidy up PlainListenableActionFuture (elastic#68735)
Browse files Browse the repository at this point in the history
As of elastic#53261 today `PlainListenableActionFuture` is the sole
implementation of `ListenableActionFuture`, the reason for the
`Plain...` prefix is now lost. It also heavily uses variable shadowing
in its implementation which makes it quite hard to read, uses a mix of
`volatile` fields and mutexes, and keeps hold of the deferred listeners
even after completion. Finally it does not have many tests.

This commit drops the unnecessary interface and renames the class to
simply `ListenableActionFuture`. It reworks the implementation to avoid
shadowing variables, drops the deferred listeners on completion, and
moves to an entirely mutex-based implementation. Finally, it adds
another test that it works as advertised.
  • Loading branch information
DaveCTurner authored Feb 9, 2021
1 parent 71763ca commit ccf283e
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.elasticsearch.action.admin.cluster.node.tasks;

import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -63,8 +63,7 @@ public void testRetry() throws Exception {
});
barrier.await();
Task task;
PlainListenableActionFuture<TestTaskPlugin.NodesResponse> future =
PlainListenableActionFuture.newListenableFuture();
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = new ListenableActionFuture<>();
try {
logger.info("start a task that will store its results");
TestTaskPlugin.NodesRequest req = new TestTaskPlugin.NodesRequest("foo");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.action.ActionListener;

import java.util.ArrayList;
import java.util.List;

/**
* A {@code Future} and {@link ActionListener} against which which other {@link ActionListener}s can be registered later, to support
* fanning-out a result to a dynamic collection of listeners.
*/
public class ListenableActionFuture<T> extends AdapterActionFuture<T, T> {

private Object listeners;
private boolean executedListeners = false;

/**
* Registers an {@link ActionListener<T>} to be notified when this future is completed. If the future is already completed then the
* listener is notified immediately, on the calling thread. If not, the listener is notified on the thread that completes the listener.
*/
@SuppressWarnings("unchecked,rawtypes")
public void addListener(final ActionListener<T> listener) {
final boolean executeImmediate;
synchronized (this) {
executeImmediate = executedListeners;
if (executeImmediate == false) {
final Object oldListeners = listeners;
final Object newListeners;
if (oldListeners == null) {
// adding the first listener
newListeners = listener;
} else if (oldListeners instanceof List) {
// adding a listener after the second
((List) oldListeners).add(listener);
newListeners = oldListeners;
} else {
// adding the second listener
newListeners = new ArrayList<>(2);
((List) newListeners).add(oldListeners);
((List) newListeners).add(listener);
}
this.listeners = newListeners;
}
}
if (executeImmediate) {
executeListener(listener);
}
}

@Override
@SuppressWarnings("unchecked,rawtypes")
protected void done(boolean success) {
super.done(success);
final Object listenersToExecute;
synchronized (this) {
executedListeners = true;
listenersToExecute = listeners;
listeners = null;
}

if (listenersToExecute != null) {
if (listenersToExecute instanceof List) {
for (final Object listener : (List) listenersToExecute) {
executeListener((ActionListener<T>) listener);
}
} else {
executeListener((ActionListener<T>) listenersToExecute);
}
}
}

@Override
protected T convert(T listenerResponse) {
return listenerResponse;
}

private void executeListener(final ActionListener<T> listener) {
try {
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
// here we know we will never block
listener.onResponse(actionGet(0));
} catch (Exception e) {
listener.onFailure(e);
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -281,7 +281,7 @@ private enum ActivityType {
private class ConnectionTarget {
private final DiscoveryNode discoveryNode;

private PlainListenableActionFuture<Void> future = PlainListenableActionFuture.newListenableFuture();
private ListenableActionFuture<Void> future = new ListenableActionFuture<>();
private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting

private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
Expand Down Expand Up @@ -412,10 +412,10 @@ private void addListener(@Nullable ActionListener<Void> listener) {
}
}

private PlainListenableActionFuture<Void> getAndClearFuture() {
private ListenableActionFuture<Void> getAndClearFuture() {
assert Thread.holdsLock(mutex) : "mutex not held";
final PlainListenableActionFuture<Void> drainedFuture = future;
future = PlainListenableActionFuture.newListenableFuture();
final ListenableActionFuture<Void> drainedFuture = future;
future = new ListenableActionFuture<>();
return drainedFuture;
}

Expand All @@ -437,7 +437,7 @@ private Runnable addListenerAndStartActivity(@Nullable ActionListener<Void> list
}

activityType = newActivityType;
final PlainListenableActionFuture<Void> oldFuture = getAndClearFuture();
final ListenableActionFuture<Void> oldFuture = getAndClearFuture();
addListener(listener);
return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage));
}
Expand All @@ -449,7 +449,7 @@ private void onCompletion(ActivityType completedActivityType, @Nullable Exceptio
synchronized (mutex) {
assert activityType != ActivityType.IDLE;
if (activityType == completedActivityType) {
final PlainListenableActionFuture<Void> oldFuture = getAndClearFuture();
final ListenableActionFuture<Void> oldFuture = getAndClearFuture();
activityType = ActivityType.IDLE;

cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -1379,7 +1379,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
}

// Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking
private PlainListenableActionFuture<RepositoryData> repoDataInitialized;
private ListenableActionFuture<RepositoryData> repoDataInitialized;

/**
* Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that
Expand All @@ -1400,7 +1400,7 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
return;
}
logger.trace("[{}] initializing repository generation in cluster state", metadata.name());
repoDataInitialized = PlainListenableActionFuture.newListenableFuture();
repoDataInitialized = new ListenableActionFuture<>();
repoDataInitialized.addListener(listener);
final Consumer<Exception> onFailure = e -> {
logger.warn(new ParameterizedMessage("[{}] Exception when initializing repository generation in cluster state",
Expand Down
Loading

0 comments on commit ccf283e

Please sign in to comment.