From ccf283e2fbdffedfc337d5793ac530c7098ba68a Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Feb 2021 15:34:40 +0000 Subject: [PATCH] Tidy up PlainListenableActionFuture (#68735) As of #53261 today `PlainListenableActionFuture` is the sole implementation of `ListenableActionFuture`, the reason for the `Plain...` prefix is now lost. It also heavily uses variable shadowing in its implementation which makes it quite hard to read, uses a mix of `volatile` fields and mutexes, and keeps hold of the deferred listeners even after completion. Finally it does not have many tests. This commit drops the unnecessary interface and renames the class to simply `ListenableActionFuture`. It reworks the implementation to avoid shadowing variables, drops the deferred listeners on completion, and moves to an entirely mutex-based implementation. Finally, it adds another test that it works as advertised. --- .../node/tasks/TaskStorageRetryIT.java | 5 +- .../action/ListenableActionFuture.java | 22 ---- .../support/ListenableActionFuture.java | 95 ++++++++++++++++ .../support/PlainListenableActionFuture.java | 99 ----------------- .../cluster/NodeConnectionsService.java | 14 +-- .../blobstore/BlobStoreRepository.java | 6 +- .../support/ListenableActionFutureTests.java | 101 +++++++++++++++++- .../RestCancellableNodeClientTests.java | 4 +- .../test/rest/FakeRestRequest.java | 4 +- 9 files changed, 208 insertions(+), 142 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java create mode 100644 server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java delete mode 100644 server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java index 06e5c3259b39a..5928e4f1e16be 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java @@ -9,7 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -63,8 +63,7 @@ public void testRetry() throws Exception { }); barrier.await(); Task task; - PlainListenableActionFuture future = - PlainListenableActionFuture.newListenableFuture(); + ListenableActionFuture future = new ListenableActionFuture<>(); try { logger.info("start a task that will store its results"); TestTaskPlugin.NodesRequest req = new TestTaskPlugin.NodesRequest("foo"); diff --git a/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java deleted file mode 100644 index d2374515425c2..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action; - -/** - * An {@link ActionFuture} that listeners can be added to. - * - * - */ -public interface ListenableActionFuture extends ActionFuture { - - /** - * Add an action listener to be invoked when a response has received. - */ - void addListener(ActionListener listener); -} diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java new file mode 100644 index 0000000000000..36b56fd7cb1a1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@code Future} and {@link ActionListener} against which which other {@link ActionListener}s can be registered later, to support + * fanning-out a result to a dynamic collection of listeners. + */ +public class ListenableActionFuture extends AdapterActionFuture { + + private Object listeners; + private boolean executedListeners = false; + + /** + * Registers an {@link ActionListener} to be notified when this future is completed. If the future is already completed then the + * listener is notified immediately, on the calling thread. If not, the listener is notified on the thread that completes the listener. + */ + @SuppressWarnings("unchecked,rawtypes") + public void addListener(final ActionListener listener) { + final boolean executeImmediate; + synchronized (this) { + executeImmediate = executedListeners; + if (executeImmediate == false) { + final Object oldListeners = listeners; + final Object newListeners; + if (oldListeners == null) { + // adding the first listener + newListeners = listener; + } else if (oldListeners instanceof List) { + // adding a listener after the second + ((List) oldListeners).add(listener); + newListeners = oldListeners; + } else { + // adding the second listener + newListeners = new ArrayList<>(2); + ((List) newListeners).add(oldListeners); + ((List) newListeners).add(listener); + } + this.listeners = newListeners; + } + } + if (executeImmediate) { + executeListener(listener); + } + } + + @Override + @SuppressWarnings("unchecked,rawtypes") + protected void done(boolean success) { + super.done(success); + final Object listenersToExecute; + synchronized (this) { + executedListeners = true; + listenersToExecute = listeners; + listeners = null; + } + + if (listenersToExecute != null) { + if (listenersToExecute instanceof List) { + for (final Object listener : (List) listenersToExecute) { + executeListener((ActionListener) listener); + } + } else { + executeListener((ActionListener) listenersToExecute); + } + } + } + + @Override + protected T convert(T listenerResponse) { + return listenerResponse; + } + + private void executeListener(final ActionListener listener) { + try { + // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. + // here we know we will never block + listener.onResponse(actionGet(0)); + } catch (Exception e) { + listener.onFailure(e); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java deleted file mode 100644 index 27ad883e240f7..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.support; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ListenableActionFuture; - -import java.util.ArrayList; -import java.util.List; - -public class PlainListenableActionFuture extends AdapterActionFuture implements ListenableActionFuture { - - volatile Object listeners; - boolean executedListeners = false; - - protected PlainListenableActionFuture() {} - - /** - * This method returns a listenable future. The listeners will be called on completion of the future. - * The listeners will be executed by the same thread that completes the future. - * - * @param the result of the future - * @return a listenable future - */ - public static PlainListenableActionFuture newListenableFuture() { - return new PlainListenableActionFuture<>(); - } - - @Override - public void addListener(final ActionListener listener) { - internalAddListener(listener); - } - - @Override - protected void done(boolean success) { - super.done(success); - synchronized (this) { - executedListeners = true; - } - Object listeners = this.listeners; - if (listeners != null) { - if (listeners instanceof List) { - List list = (List) listeners; - for (Object listener : list) { - executeListener((ActionListener) listener); - } - } else { - executeListener((ActionListener) listeners); - } - } - } - - @Override - protected T convert(T listenerResponse) { - return listenerResponse; - } - - private void internalAddListener(ActionListener listener) { - boolean executeImmediate = false; - synchronized (this) { - if (executedListeners) { - executeImmediate = true; - } else { - Object listeners = this.listeners; - if (listeners == null) { - listeners = listener; - } else if (listeners instanceof List) { - ((List) this.listeners).add(listener); - } else { - Object orig = listeners; - listeners = new ArrayList<>(2); - ((List) listeners).add(orig); - ((List) listeners).add(listener); - } - this.listeners = listeners; - } - } - if (executeImmediate) { - executeListener(listener); - } - } - - private void executeListener(final ActionListener listener) { - try { - // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. - // here we know we will never block - listener.onResponse(actionGet(0)); - } catch (Exception e) { - listener.onFailure(e); - } - } - -} diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index edf4525123e9c..08f8c40e60374 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -14,7 +14,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -281,7 +281,7 @@ private enum ActivityType { private class ConnectionTarget { private final DiscoveryNode discoveryNode; - private PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); + private ListenableActionFuture future = new ListenableActionFuture<>(); private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); @@ -412,10 +412,10 @@ private void addListener(@Nullable ActionListener listener) { } } - private PlainListenableActionFuture getAndClearFuture() { + private ListenableActionFuture getAndClearFuture() { assert Thread.holdsLock(mutex) : "mutex not held"; - final PlainListenableActionFuture drainedFuture = future; - future = PlainListenableActionFuture.newListenableFuture(); + final ListenableActionFuture drainedFuture = future; + future = new ListenableActionFuture<>(); return drainedFuture; } @@ -437,7 +437,7 @@ private Runnable addListenerAndStartActivity(@Nullable ActionListener list } activityType = newActivityType; - final PlainListenableActionFuture oldFuture = getAndClearFuture(); + final ListenableActionFuture oldFuture = getAndClearFuture(); addListener(listener); return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage)); } @@ -449,7 +449,7 @@ private void onCompletion(ActivityType completedActivityType, @Nullable Exceptio synchronized (mutex) { assert activityType != ActivityType.IDLE; if (activityType == completedActivityType) { - final PlainListenableActionFuture oldFuture = getAndClearFuture(); + final ListenableActionFuture oldFuture = getAndClearFuture(); activityType = ActivityType.IDLE; cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 1d14a95c315a4..a615172338dad 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -28,7 +28,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -1379,7 +1379,7 @@ public void getRepositoryData(ActionListener listener) { } // Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking - private PlainListenableActionFuture repoDataInitialized; + private ListenableActionFuture repoDataInitialized; /** * Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that @@ -1400,7 +1400,7 @@ private void initializeRepoGenerationTracking(ActionListener lis return; } logger.trace("[{}] initializing repository generation in cluster state", metadata.name()); - repoDataInitialized = PlainListenableActionFuture.newListenableFuture(); + repoDataInitialized = new ListenableActionFuture<>(); repoDataInitialized.addListener(listener); final Consumer onFailure = e -> { logger.warn(new ParameterizedMessage("[{}] Exception when initializing repository generation in cluster state", diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java index ee3df7572a093..a2004bb740d99 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java @@ -15,17 +15,23 @@ import org.elasticsearch.transport.Transports; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + public class ListenableActionFutureTests extends ESTestCase { - public void testListenerIsCallableFromNetworkThreads() throws Throwable { + public void testListenerIsCallableFromNetworkThreads() throws Exception { ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads"); try { - final PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); + final ListenableActionFuture future = new ListenableActionFuture<>(); final CountDownLatch listenerCalled = new CountDownLatch(1); - final AtomicReference error = new AtomicReference<>(); + final AtomicReference error = new AtomicReference<>(); final Object response = new Object(); future.addListener(new ActionListener() { @Override @@ -47,7 +53,7 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { + protected void doRun() { future.onResponse(response); } }, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX + "_testListenerIsCallableFromNetworkThread"); @@ -62,5 +68,92 @@ protected void doRun() throws Exception { } } + public void testListenersNotifiedOnCorrectThreads() throws InterruptedException { + + final int adderThreads = between(1, 5); + final int completerThreads = between(1, 3); + + final ListenableActionFuture future = new ListenableActionFuture<>(); + + final AtomicBoolean preComplete = new AtomicBoolean(); + final AtomicBoolean postComplete = new AtomicBoolean(); + final String ADDER_THREAD_NAME_PREFIX = "adder-"; + final String COMPLETER_THREAD_NAME_PREFIX = "completer-"; + + final CyclicBarrier barrier = new CyclicBarrier(adderThreads + completerThreads + 1); + final Thread[] threads = new Thread[adderThreads + completerThreads]; + for (int i = 0; i < adderThreads + completerThreads; i++) { + if (i < adderThreads) { + final String threadName = ADDER_THREAD_NAME_PREFIX + i; + threads[i] = new Thread(() -> { + awaitSafe(barrier); + + final AtomicBoolean isComplete = new AtomicBoolean(); + if (postComplete.get()) { + future.addListener(new ActionListener<>() { + @Override + public void onResponse(Void response) { + assertTrue(isComplete.compareAndSet(false, true)); + assertThat(Thread.currentThread().getName(), equalTo(threadName)); + } + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + assertTrue(isComplete.get()); + } else { + final PlainActionFuture completingThreadNameFuture = new PlainActionFuture<>(); + future.addListener(new ActionListener<>() { + @Override + public void onResponse(Void response) { + assertTrue(isComplete.compareAndSet(false, true)); + completingThreadNameFuture.onResponse(Thread.currentThread().getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + + final boolean incompleteAfterAdd = preComplete.get() == false; + final String completingThreadName = completingThreadNameFuture.actionGet(10L, TimeUnit.SECONDS); + if (incompleteAfterAdd) { + assertThat(completingThreadName, startsWith(COMPLETER_THREAD_NAME_PREFIX)); + } else { + assertThat(completingThreadName, anyOf(equalTo(threadName), startsWith(COMPLETER_THREAD_NAME_PREFIX))); + } + } + }, threadName); + } else { + final String threadName = COMPLETER_THREAD_NAME_PREFIX + i; + threads[i] = new Thread(() -> { + awaitSafe(barrier); + + preComplete.set(true); + future.onResponse(null); + postComplete.set(true); + + }, threadName); + } + + threads[i].start(); + } + + awaitSafe(barrier); + for (final Thread thread : threads) { + thread.join(); + } + + } + + private static void awaitSafe(CyclicBarrier barrier) { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } + } } diff --git a/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java b/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java index 9ca708bcfb664..83864566389ab 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.http.HttpChannel; @@ -74,7 +74,7 @@ public void testCompletedTasks() throws Exception { TestHttpChannel channel = new TestHttpChannel(); totalSearches += numTasks; for (int j = 0; j < numTasks; j++) { - PlainListenableActionFuture actionFuture = PlainListenableActionFuture.newListenableFuture(); + ListenableActionFuture actionFuture = new ListenableActionFuture<>(); RestCancellableNodeClient client = new RestCancellableNodeClient(testClient, channel); threadPool.generic().submit(() -> client.execute(SearchAction.INSTANCE, new SearchRequest(), actionFuture)); futures.add(actionFuture); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index ad98a73b3f26b..67331431e0c04 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -9,7 +9,7 @@ package org.elasticsearch.test.rest; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -129,7 +129,7 @@ public Exception getInboundException() { private static class FakeHttpChannel implements HttpChannel { private final InetSocketAddress remoteAddress; - private final PlainListenableActionFuture closeFuture = PlainListenableActionFuture.newListenableFuture(); + private final ListenableActionFuture closeFuture = new ListenableActionFuture<>(); private FakeHttpChannel(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress;