From 7722a9eee4cca472a26ab9078b82bc24110b095f Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Fri, 24 May 2024 07:45:47 +0200 Subject: [PATCH] Add and use UnsafePlainActionFuture Too many issues to fix in one PR, add a class that is used where we rely on notifying on same thread to at least have visibility. --- .../action/support/PlainActionFuture.java | 38 ++++++++++++------- .../support/UnsafePlainActionFuture.java | 35 +++++++++++++++++ .../internal/support/AbstractClient.java | 9 ++++- .../common/util/concurrent/EsExecutors.java | 13 +------ .../elasticsearch/index/engine/Engine.java | 9 ++--- .../index/shard/IndexShardTestCase.java | 3 +- .../AbstractSimpleTransportTestCase.java | 3 +- .../ShardFollowTaskReplicationTests.java | 3 +- 8 files changed, 78 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/UnsafePlainActionFuture.java diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java index ca5ebf5405693..c52c9ba1264db 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java @@ -39,6 +39,7 @@ public void onResponse(@Nullable T result) { @Override public void onFailure(Exception e) { + assert assertCompleteAllowed(); if (sync.setException(Objects.requireNonNull(e))) { done(false); } @@ -115,6 +116,7 @@ public boolean isCancelled() { @Override public boolean cancel(boolean mayInterruptIfRunning) { + assert assertCompleteAllowed(); if (sync.cancel() == false) { return false; } @@ -132,6 +134,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { * @return true if the state was successfully changed. */ protected final boolean set(@Nullable T value) { + assert assertCompleteAllowed(); boolean result = sync.set(value); if (result) { done(true); @@ -366,7 +369,6 @@ boolean cancel() { * @param finalState the state to transition to. */ private boolean complete(@Nullable V v, @Nullable Exception e, int finalState) { - assert assertCompleteAllowed(); boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); if (doCompletion) { // If this thread successfully transitioned to COMPLETING, set the value @@ -381,18 +383,6 @@ private boolean complete(@Nullable V v, @Nullable Exception e, int finalState) { } return doCompletion; } - - private boolean assertCompleteAllowed() { - Thread waiter = getFirstQueuedThread(); - assert waiter == null || EsExecutors.differentExecutors(waiter, Thread.currentThread()) - : "cannot complete future on thread " - + Thread.currentThread() - + " with waiter on thread " - + waiter - + ", could deadlock if pool was full\n" - + ExceptionsHelper.formatStackTrace(waiter.getStackTrace()); - return true; - } } private static RuntimeException unwrapEsException(ElasticsearchException esEx) { @@ -414,4 +404,26 @@ public static T get(CheckedConsumer extends PlainActionFuture { + + private final String unsafeExecutor; + + public UnsafePlainActionFuture(String unsafeExecutor) { + Objects.requireNonNull(unsafeExecutor); + this.unsafeExecutor = unsafeExecutor; + } + + @Override + boolean allowedExecutors(Thread thread1, Thread thread2) { + return super.allowedExecutors(thread1, thread2) || unsafeExecutor.equals(EsExecutors.executorName(thread1)); + } +} diff --git a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java index 966299408a678..f4e86c8a4eca6 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/support/AbstractClient.java @@ -59,6 +59,7 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.action.termvectors.MultiTermVectorsRequestBuilder; @@ -410,7 +411,13 @@ protected void * on the result before it goes out of scope. * @param reference counted result type */ - private static class RefCountedFuture extends PlainActionFuture { + // todo: the use of UnsafePlainActionFuture here is quite broad, we should find a better way to be more specific + // (unless making all usages safe is easy). + private static class RefCountedFuture extends UnsafePlainActionFuture { + + private RefCountedFuture() { + super(ThreadPool.Names.GENERIC); + } @Override public final void onResponse(R result) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index a7fd4f93e168f..b69fc928d69af 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -266,18 +266,7 @@ public static String threadName(final String nodeName, final String namePrefix) return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]"; } - // to be used in assertions only. - public static boolean differentExecutors(Thread thread1, Thread thread2) { - // this should only be used to validate thread interactions, like not waiting for a future completed on the same - // executor, hence calling it with the same thread indicates a bug in the assertion using this. - assert thread1 != thread2 : "only call this for different threads"; - String thread1Name = executorName(thread1); - String thread2Name = executorName(thread2); - return thread1Name == null || thread2Name == null || thread1Name.equals(thread2Name) == false; - } - - // visible for tests - static String executorName(Thread thread) { + public static String executorName(Thread thread) { String name = thread.getName(); // subtract 2 to avoid the `]` of the thread number part. int executorNameEnd = name.lastIndexOf(']', name.length() - 2); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e3779b678d346..c219e16659c99 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; @@ -75,6 +76,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; import java.io.Closeable; @@ -1956,7 +1958,7 @@ private boolean drainForClose() { logger.debug("drainForClose(): draining ops"); releaseEnsureOpenRef.close(); - final var future = new PlainActionFuture() { + final var future = new UnsafePlainActionFuture(ThreadPool.Names.GENERIC) { @Override protected boolean blockingAllowed() { // TODO remove this blocking, or at least do it elsewhere, see https://github.com/elastic/elasticsearch/issues/89821 @@ -1964,13 +1966,8 @@ protected boolean blockingAllowed() { || super.blockingAllowed(); } }; - CountDownLatch latch = new CountDownLatch(1); drainOnCloseListener.addListener(future); - drainOnCloseListener.addListener(ActionListener.releasing(latch::countDown)); try { - // todo: hack to circumvent same executor check here to see if more failures appear. - // need to make this method async, which includes making engine close async, which we should be able to do now. - latch.await(); future.get(); return true; } catch (ExecutionException e) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a389020cdcde8..442a8c3b82dc6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -869,7 +870,7 @@ protected final void recoverUnstartedReplica( routingTable ); try { - PlainActionFuture future = new PlainActionFuture<>(); + PlainActionFuture future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); recovery.recoverToTarget(future); future.actionGet(); recoveryTarget.markAsDone(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3dc7201535e0a..7cce02f18811f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.VersionInformation; @@ -996,7 +997,7 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { go.await(); for (int iter = 0; iter < 10; iter++) { - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); final String info = sender + "_" + iter; final DiscoveryNode node = nodeB; // capture now try { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 3a16f368d322a..04a97ad9e7f95 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.UnsafePlainActionFuture; import org.elasticsearch.action.support.replication.PostWriteRefresh; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; @@ -802,7 +803,7 @@ class CcrAction extends ReplicationAction listener) { - final PlainActionFuture permitFuture = new PlainActionFuture<>(); + final PlainActionFuture permitFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC); primary.acquirePrimaryOperationPermit(permitFuture, EsExecutors.DIRECT_EXECUTOR_SERVICE); final TransportWriteAction.WritePrimaryResult ccrResult; final var threadpool = mock(ThreadPool.class);