Skip to content

Commit

Permalink
Add and use UnsafePlainActionFuture
Browse files Browse the repository at this point in the history
Too many issues to fix in one PR, add a class that is used where we rely on
notifying on same thread to at least have visibility.
  • Loading branch information
henningandersen committed May 24, 2024
1 parent 41c54e1 commit 7722a9e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void onResponse(@Nullable T result) {

@Override
public void onFailure(Exception e) {
assert assertCompleteAllowed();
if (sync.setException(Objects.requireNonNull(e))) {
done(false);
}
Expand Down Expand Up @@ -115,6 +116,7 @@ public boolean isCancelled() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
assert assertCompleteAllowed();
if (sync.cancel() == false) {
return false;
}
Expand All @@ -132,6 +134,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
* @return true if the state was successfully changed.
*/
protected final boolean set(@Nullable T value) {
assert assertCompleteAllowed();
boolean result = sync.set(value);
if (result) {
done(true);
Expand Down Expand Up @@ -366,7 +369,6 @@ boolean cancel() {
* @param finalState the state to transition to.
*/
private boolean complete(@Nullable V v, @Nullable Exception e, int finalState) {
assert assertCompleteAllowed();
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
if (doCompletion) {
// If this thread successfully transitioned to COMPLETING, set the value
Expand All @@ -381,18 +383,6 @@ private boolean complete(@Nullable V v, @Nullable Exception e, int finalState) {
}
return doCompletion;
}

private boolean assertCompleteAllowed() {
Thread waiter = getFirstQueuedThread();
assert waiter == null || EsExecutors.differentExecutors(waiter, Thread.currentThread())
: "cannot complete future on thread "
+ Thread.currentThread()
+ " with waiter on thread "
+ waiter
+ ", could deadlock if pool was full\n"
+ ExceptionsHelper.formatStackTrace(waiter.getStackTrace());
return true;
}
}

private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Expand All @@ -414,4 +404,26 @@ public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T
e.accept(fut);
return fut.actionGet(timeout, unit);
}

private boolean assertCompleteAllowed() {
Thread waiter = sync.getFirstQueuedThread();
assert waiter == null || allowedExecutors(waiter, Thread.currentThread())
: "cannot complete future on thread "
+ Thread.currentThread()
+ " with waiter on thread "
+ waiter
+ ", could deadlock if pool was full\n"
+ ExceptionsHelper.formatStackTrace(waiter.getStackTrace());
return true;
}

// only used in assertions
boolean allowedExecutors(Thread thread1, Thread thread2) {
// this should only be used to validate thread interactions, like not waiting for a future completed on the same
// executor, hence calling it with the same thread indicates a bug in the assertion using this.
assert thread1 != thread2 : "only call this for different threads";
String thread1Name = EsExecutors.executorName(thread1);
String thread2Name = EsExecutors.executorName(thread2);
return thread1Name == null || thread2Name == null || thread1Name.equals(thread2Name) == false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.common.util.concurrent.EsExecutors;

import java.util.Objects;

/**
* An unsafe future. You should not need to use this for new code, rather you should be able to convert that code to be async
* or use a clear hierarchy of thread pool executors around the future.
*
* This future is unsafe, since it allows notifying the future on the same thread pool executor that it is being waited on. This
* is a common deadlock scenario, since all threads may be waiting and thus no thread may be able to complete the future.
*/
public class UnsafePlainActionFuture<T> extends PlainActionFuture<T> {

private final String unsafeExecutor;

public UnsafePlainActionFuture(String unsafeExecutor) {
Objects.requireNonNull(unsafeExecutor);
this.unsafeExecutor = unsafeExecutor;
}

@Override
boolean allowedExecutors(Thread thread1, Thread thread2) {
return super.allowedExecutors(thread1, thread2) || unsafeExecutor.equals(EsExecutors.executorName(thread1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
import org.elasticsearch.action.termvectors.MultiTermVectorsRequest;
import org.elasticsearch.action.termvectors.MultiTermVectorsRequestBuilder;
Expand Down Expand Up @@ -410,7 +411,13 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
* on the result before it goes out of scope.
* @param <R> reference counted result type
*/
private static class RefCountedFuture<R extends RefCounted> extends PlainActionFuture<R> {
// todo: the use of UnsafePlainActionFuture here is quite broad, we should find a better way to be more specific
// (unless making all usages safe is easy).
private static class RefCountedFuture<R extends RefCounted> extends UnsafePlainActionFuture<R> {

private RefCountedFuture() {
super(ThreadPool.Names.GENERIC);
}

@Override
public final void onResponse(R result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,7 @@ public static String threadName(final String nodeName, final String namePrefix)
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
}

// to be used in assertions only.
public static boolean differentExecutors(Thread thread1, Thread thread2) {
// this should only be used to validate thread interactions, like not waiting for a future completed on the same
// executor, hence calling it with the same thread indicates a bug in the assertion using this.
assert thread1 != thread2 : "only call this for different threads";
String thread1Name = executorName(thread1);
String thread2Name = executorName(thread2);
return thread1Name == null || thread2Name == null || thread1Name.equals(thread2Name) == false;
}

// visible for tests
static String executorName(Thread thread) {
public static String executorName(Thread thread) {
String name = thread.getName();
// subtract 2 to avoid the `]` of the thread number part.
int executorNameEnd = name.lastIndexOf(']', name.length() - 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
Expand Down Expand Up @@ -1956,21 +1958,16 @@ private boolean drainForClose() {

logger.debug("drainForClose(): draining ops");
releaseEnsureOpenRef.close();
final var future = new PlainActionFuture<Void>() {
final var future = new UnsafePlainActionFuture<Void>(ThreadPool.Names.GENERIC) {
@Override
protected boolean blockingAllowed() {
// TODO remove this blocking, or at least do it elsewhere, see https://github.com/elastic/elasticsearch/issues/89821
return Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)
|| super.blockingAllowed();
}
};
CountDownLatch latch = new CountDownLatch(1);
drainOnCloseListener.addListener(future);
drainOnCloseListener.addListener(ActionListener.releasing(latch::countDown));
try {
// todo: hack to circumvent same executor check here to see if more failures appear.
// need to make this method async, which includes making engine close async, which we should be able to do now.
latch.await();
future.get();
return true;
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -869,7 +870,7 @@ protected final void recoverUnstartedReplica(
routingTable
);
try {
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
PlainActionFuture<RecoveryResponse> future = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
recovery.recoverToTarget(future);
future.actionGet();
recoveryTarget.markAsDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.VersionInformation;
Expand Down Expand Up @@ -996,7 +997,7 @@ public void onFailure(Exception e) {
protected void doRun() throws Exception {
go.await();
for (int iter = 0; iter < 10; iter++) {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
PlainActionFuture<TestResponse> listener = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
final String info = sender + "_" + iter;
final DiscoveryNode node = nodeB; // capture now
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.UnsafePlainActionFuture;
import org.elasticsearch.action.support.replication.PostWriteRefresh;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
Expand Down Expand Up @@ -802,7 +803,7 @@ class CcrAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardO

@Override
protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener<PrimaryResult> listener) {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
final PlainActionFuture<Releasable> permitFuture = new UnsafePlainActionFuture<>(ThreadPool.Names.GENERIC);
primary.acquirePrimaryOperationPermit(permitFuture, EsExecutors.DIRECT_EXECUTOR_SERVICE);
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
final var threadpool = mock(ThreadPool.class);
Expand Down

0 comments on commit 7722a9e

Please sign in to comment.