diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java index 06e5c3259b39a..5928e4f1e16be 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskStorageRetryIT.java @@ -9,7 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -63,8 +63,7 @@ public void testRetry() throws Exception { }); barrier.await(); Task task; - PlainListenableActionFuture future = - PlainListenableActionFuture.newListenableFuture(); + ListenableActionFuture future = new ListenableActionFuture<>(); try { logger.info("start a task that will store its results"); TestTaskPlugin.NodesRequest req = new TestTaskPlugin.NodesRequest("foo"); diff --git a/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java deleted file mode 100644 index d2374515425c2..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action; - -/** - * An {@link ActionFuture} that listeners can be added to. - * - * - */ -public interface ListenableActionFuture extends ActionFuture { - - /** - * Add an action listener to be invoked when a response has received. - */ - void addListener(ActionListener listener); -} diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java new file mode 100644 index 0000000000000..36b56fd7cb1a1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@code Future} and {@link ActionListener} against which which other {@link ActionListener}s can be registered later, to support + * fanning-out a result to a dynamic collection of listeners. + */ +public class ListenableActionFuture extends AdapterActionFuture { + + private Object listeners; + private boolean executedListeners = false; + + /** + * Registers an {@link ActionListener} to be notified when this future is completed. If the future is already completed then the + * listener is notified immediately, on the calling thread. If not, the listener is notified on the thread that completes the listener. + */ + @SuppressWarnings("unchecked,rawtypes") + public void addListener(final ActionListener listener) { + final boolean executeImmediate; + synchronized (this) { + executeImmediate = executedListeners; + if (executeImmediate == false) { + final Object oldListeners = listeners; + final Object newListeners; + if (oldListeners == null) { + // adding the first listener + newListeners = listener; + } else if (oldListeners instanceof List) { + // adding a listener after the second + ((List) oldListeners).add(listener); + newListeners = oldListeners; + } else { + // adding the second listener + newListeners = new ArrayList<>(2); + ((List) newListeners).add(oldListeners); + ((List) newListeners).add(listener); + } + this.listeners = newListeners; + } + } + if (executeImmediate) { + executeListener(listener); + } + } + + @Override + @SuppressWarnings("unchecked,rawtypes") + protected void done(boolean success) { + super.done(success); + final Object listenersToExecute; + synchronized (this) { + executedListeners = true; + listenersToExecute = listeners; + listeners = null; + } + + if (listenersToExecute != null) { + if (listenersToExecute instanceof List) { + for (final Object listener : (List) listenersToExecute) { + executeListener((ActionListener) listener); + } + } else { + executeListener((ActionListener) listenersToExecute); + } + } + } + + @Override + protected T convert(T listenerResponse) { + return listenerResponse; + } + + private void executeListener(final ActionListener listener) { + try { + // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. + // here we know we will never block + listener.onResponse(actionGet(0)); + } catch (Exception e) { + listener.onFailure(e); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java deleted file mode 100644 index 27ad883e240f7..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.support; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ListenableActionFuture; - -import java.util.ArrayList; -import java.util.List; - -public class PlainListenableActionFuture extends AdapterActionFuture implements ListenableActionFuture { - - volatile Object listeners; - boolean executedListeners = false; - - protected PlainListenableActionFuture() {} - - /** - * This method returns a listenable future. The listeners will be called on completion of the future. - * The listeners will be executed by the same thread that completes the future. - * - * @param the result of the future - * @return a listenable future - */ - public static PlainListenableActionFuture newListenableFuture() { - return new PlainListenableActionFuture<>(); - } - - @Override - public void addListener(final ActionListener listener) { - internalAddListener(listener); - } - - @Override - protected void done(boolean success) { - super.done(success); - synchronized (this) { - executedListeners = true; - } - Object listeners = this.listeners; - if (listeners != null) { - if (listeners instanceof List) { - List list = (List) listeners; - for (Object listener : list) { - executeListener((ActionListener) listener); - } - } else { - executeListener((ActionListener) listeners); - } - } - } - - @Override - protected T convert(T listenerResponse) { - return listenerResponse; - } - - private void internalAddListener(ActionListener listener) { - boolean executeImmediate = false; - synchronized (this) { - if (executedListeners) { - executeImmediate = true; - } else { - Object listeners = this.listeners; - if (listeners == null) { - listeners = listener; - } else if (listeners instanceof List) { - ((List) this.listeners).add(listener); - } else { - Object orig = listeners; - listeners = new ArrayList<>(2); - ((List) listeners).add(orig); - ((List) listeners).add(listener); - } - this.listeners = listeners; - } - } - if (executeImmediate) { - executeListener(listener); - } - } - - private void executeListener(final ActionListener listener) { - try { - // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. - // here we know we will never block - listener.onResponse(actionGet(0)); - } catch (Exception e) { - listener.onFailure(e); - } - } - -} diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index edf4525123e9c..08f8c40e60374 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -14,7 +14,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -281,7 +281,7 @@ private enum ActivityType { private class ConnectionTarget { private final DiscoveryNode discoveryNode; - private PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); + private ListenableActionFuture future = new ListenableActionFuture<>(); private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); @@ -412,10 +412,10 @@ private void addListener(@Nullable ActionListener listener) { } } - private PlainListenableActionFuture getAndClearFuture() { + private ListenableActionFuture getAndClearFuture() { assert Thread.holdsLock(mutex) : "mutex not held"; - final PlainListenableActionFuture drainedFuture = future; - future = PlainListenableActionFuture.newListenableFuture(); + final ListenableActionFuture drainedFuture = future; + future = new ListenableActionFuture<>(); return drainedFuture; } @@ -437,7 +437,7 @@ private Runnable addListenerAndStartActivity(@Nullable ActionListener list } activityType = newActivityType; - final PlainListenableActionFuture oldFuture = getAndClearFuture(); + final ListenableActionFuture oldFuture = getAndClearFuture(); addListener(listener); return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage)); } @@ -449,7 +449,7 @@ private void onCompletion(ActivityType completedActivityType, @Nullable Exceptio synchronized (mutex) { assert activityType != ActivityType.IDLE; if (activityType == completedActivityType) { - final PlainListenableActionFuture oldFuture = getAndClearFuture(); + final ListenableActionFuture oldFuture = getAndClearFuture(); activityType = ActivityType.IDLE; cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 1d14a95c315a4..a615172338dad 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -28,7 +28,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -1379,7 +1379,7 @@ public void getRepositoryData(ActionListener listener) { } // Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking - private PlainListenableActionFuture repoDataInitialized; + private ListenableActionFuture repoDataInitialized; /** * Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that @@ -1400,7 +1400,7 @@ private void initializeRepoGenerationTracking(ActionListener lis return; } logger.trace("[{}] initializing repository generation in cluster state", metadata.name()); - repoDataInitialized = PlainListenableActionFuture.newListenableFuture(); + repoDataInitialized = new ListenableActionFuture<>(); repoDataInitialized.addListener(listener); final Consumer onFailure = e -> { logger.warn(new ParameterizedMessage("[{}] Exception when initializing repository generation in cluster state", diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java index ee3df7572a093..a2004bb740d99 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java @@ -15,17 +15,23 @@ import org.elasticsearch.transport.Transports; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + public class ListenableActionFutureTests extends ESTestCase { - public void testListenerIsCallableFromNetworkThreads() throws Throwable { + public void testListenerIsCallableFromNetworkThreads() throws Exception { ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads"); try { - final PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); + final ListenableActionFuture future = new ListenableActionFuture<>(); final CountDownLatch listenerCalled = new CountDownLatch(1); - final AtomicReference error = new AtomicReference<>(); + final AtomicReference error = new AtomicReference<>(); final Object response = new Object(); future.addListener(new ActionListener() { @Override @@ -47,7 +53,7 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { + protected void doRun() { future.onResponse(response); } }, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX + "_testListenerIsCallableFromNetworkThread"); @@ -62,5 +68,92 @@ protected void doRun() throws Exception { } } + public void testListenersNotifiedOnCorrectThreads() throws InterruptedException { + + final int adderThreads = between(1, 5); + final int completerThreads = between(1, 3); + + final ListenableActionFuture future = new ListenableActionFuture<>(); + + final AtomicBoolean preComplete = new AtomicBoolean(); + final AtomicBoolean postComplete = new AtomicBoolean(); + final String ADDER_THREAD_NAME_PREFIX = "adder-"; + final String COMPLETER_THREAD_NAME_PREFIX = "completer-"; + + final CyclicBarrier barrier = new CyclicBarrier(adderThreads + completerThreads + 1); + final Thread[] threads = new Thread[adderThreads + completerThreads]; + for (int i = 0; i < adderThreads + completerThreads; i++) { + if (i < adderThreads) { + final String threadName = ADDER_THREAD_NAME_PREFIX + i; + threads[i] = new Thread(() -> { + awaitSafe(barrier); + + final AtomicBoolean isComplete = new AtomicBoolean(); + if (postComplete.get()) { + future.addListener(new ActionListener<>() { + @Override + public void onResponse(Void response) { + assertTrue(isComplete.compareAndSet(false, true)); + assertThat(Thread.currentThread().getName(), equalTo(threadName)); + } + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + assertTrue(isComplete.get()); + } else { + final PlainActionFuture completingThreadNameFuture = new PlainActionFuture<>(); + future.addListener(new ActionListener<>() { + @Override + public void onResponse(Void response) { + assertTrue(isComplete.compareAndSet(false, true)); + completingThreadNameFuture.onResponse(Thread.currentThread().getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + + final boolean incompleteAfterAdd = preComplete.get() == false; + final String completingThreadName = completingThreadNameFuture.actionGet(10L, TimeUnit.SECONDS); + if (incompleteAfterAdd) { + assertThat(completingThreadName, startsWith(COMPLETER_THREAD_NAME_PREFIX)); + } else { + assertThat(completingThreadName, anyOf(equalTo(threadName), startsWith(COMPLETER_THREAD_NAME_PREFIX))); + } + } + }, threadName); + } else { + final String threadName = COMPLETER_THREAD_NAME_PREFIX + i; + threads[i] = new Thread(() -> { + awaitSafe(barrier); + + preComplete.set(true); + future.onResponse(null); + postComplete.set(true); + + }, threadName); + } + + threads[i].start(); + } + + awaitSafe(barrier); + for (final Thread thread : threads) { + thread.join(); + } + + } + + private static void awaitSafe(CyclicBarrier barrier) { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } + } } diff --git a/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java b/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java index 9ca708bcfb664..83864566389ab 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/RestCancellableNodeClientTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.http.HttpChannel; @@ -74,7 +74,7 @@ public void testCompletedTasks() throws Exception { TestHttpChannel channel = new TestHttpChannel(); totalSearches += numTasks; for (int j = 0; j < numTasks; j++) { - PlainListenableActionFuture actionFuture = PlainListenableActionFuture.newListenableFuture(); + ListenableActionFuture actionFuture = new ListenableActionFuture<>(); RestCancellableNodeClient client = new RestCancellableNodeClient(testClient, channel); threadPool.generic().submit(() -> client.execute(SearchAction.INSTANCE, new SearchRequest(), actionFuture)); futures.add(actionFuture); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index ad98a73b3f26b..67331431e0c04 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -9,7 +9,7 @@ package org.elasticsearch.test.rest; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -129,7 +129,7 @@ public Exception getInboundException() { private static class FakeHttpChannel implements HttpChannel { private final InetSocketAddress remoteAddress; - private final PlainListenableActionFuture closeFuture = PlainListenableActionFuture.newListenableFuture(); + private final ListenableActionFuture closeFuture = new ListenableActionFuture<>(); private FakeHttpChannel(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress;