diff --git a/docs/changelog/104581.yaml b/docs/changelog/104581.yaml new file mode 100644 index 0000000000000..5f9b71acbfed7 --- /dev/null +++ b/docs/changelog/104581.yaml @@ -0,0 +1,6 @@ +pr: 104581 +summary: Fix bogus assertion tripped by force-executed tasks +area: Infra/Core +type: bug +issues: + - 104580 diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java index 192769591aded..52bd736f2bcf4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java @@ -8,26 +8,27 @@ package org.elasticsearch.common.util.concurrent; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; public class EsAbortPolicy extends EsRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (r instanceof AbstractRunnable) { - if (((AbstractRunnable) r).isForceExecution()) { - BlockingQueue queue = executor.getQueue(); - if ((queue instanceof SizeBlockingQueue) == false) { - throw new IllegalStateException("forced execution, but expected a size queue"); + if (r instanceof AbstractRunnable abstractRunnable) { + if (abstractRunnable.isForceExecution()) { + if (executor.getQueue() instanceof SizeBlockingQueue sizeBlockingQueue) { + try { + sizeBlockingQueue.forcePut(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("forced execution, but got interrupted", e); + } + if ((executor.isShutdown() && sizeBlockingQueue.remove(r)) == false) { + return; + } // else fall through and reject the task since the executor is shut down + } else { + throw new IllegalStateException("expected but did not find SizeBlockingQueue: " + executor); } - try { - ((SizeBlockingQueue) queue).forcePut(r); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("forced execution, but got interrupted", e); - } - return; } } incrementRejections(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index e6865e5c66e74..5fcb4684d3f8d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -126,11 +126,14 @@ public static EsThreadPoolExecutor newFixed( ThreadContext contextHolder, TaskTrackingConfig config ) { - BlockingQueue queue; + final BlockingQueue queue; + final EsRejectedExecutionHandler rejectedExecutionHandler; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); + rejectedExecutionHandler = new RejectOnShutdownOnlyPolicy(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); + rejectedExecutionHandler = new EsAbortPolicy(); } if (config.trackExecutionTime()) { return new TaskExecutionTimeTrackingEsThreadPoolExecutor( @@ -142,7 +145,7 @@ public static EsThreadPoolExecutor newFixed( queue, TimedRunnable::new, threadFactory, - new EsAbortPolicy(), + rejectedExecutionHandler, contextHolder, config ); @@ -155,7 +158,7 @@ public static EsThreadPoolExecutor newFixed( TimeUnit.MILLISECONDS, queue, threadFactory, - new EsAbortPolicy(), + rejectedExecutionHandler, contextHolder ); } @@ -411,6 +414,15 @@ private void reject(ThreadPoolExecutor executor, Runnable task) { } } + static class RejectOnShutdownOnlyPolicy extends EsRejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + assert executor.isShutdown() : executor; + incrementRejections(); + throw newRejectedException(task, executor, true); + } + } + public static class TaskTrackingConfig { // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable public static double DEFAULT_EWMA_ALPHA = 0.3; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index cb1dddd7c51f3..7b4357262edd3 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; @@ -18,6 +20,8 @@ import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -539,4 +543,129 @@ public void onFailure(Exception e) { ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS); } } + + public void testScalingDropOnShutdown() { + final var executor = EsExecutors.newScaling( + getName(), + 0, + between(1, 5), + 60, + TimeUnit.SECONDS, + false, + EsExecutors.daemonThreadFactory(getName()), + new ThreadContext(Settings.EMPTY) + ); + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + executor.execute(() -> fail("should not run")); // no-op + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + fail("should not call onFailure"); + } + + @Override + protected void doRun() { + fail("should not call doRun"); + } + + @Override + public boolean isForceExecution() { + return randomBoolean(); + } + + @Override + public void onRejection(Exception e) { + fail("should not call onRejection"); + } + + @Override + public void onAfter() { + fail("should not call onAfter"); + } + }); + } + + public void testScalingRejectOnShutdown() { + runRejectOnShutdownTest( + EsExecutors.newScaling( + getName(), + 0, + between(1, 5), + 60, + TimeUnit.SECONDS, + true, + EsExecutors.daemonThreadFactory(getName()), + new ThreadContext(Settings.EMPTY) + ) + ); + } + + public void testFixedBoundedRejectOnShutdown() { + runRejectOnShutdownTest( + EsExecutors.newFixed( + getName(), + between(1, 5), + between(1, 5), + EsExecutors.daemonThreadFactory(getName()), + threadContext, + randomFrom(DEFAULT, DO_NOT_TRACK) + ) + ); + } + + public void testFixedUnboundedRejectOnShutdown() { + runRejectOnShutdownTest( + EsExecutors.newFixed( + getName(), + between(1, 5), + -1, + EsExecutors.daemonThreadFactory(getName()), + threadContext, + randomFrom(DEFAULT, DO_NOT_TRACK) + ) + ); + } + + private static void runRejectOnShutdownTest(ExecutorService executor) { + for (int i = between(0, 10); i > 0; i--) { + final var delayMillis = between(0, 100); + executor.execute(ActionRunnable.wrap(ActionListener.noop(), l -> safeSleep(delayMillis))); + } + try { + executor.shutdown(); + assertShutdownAndRejectingTasks(executor); + } finally { + ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); + } + assertShutdownAndRejectingTasks(executor); + } + + private static void assertShutdownAndRejectingTasks(Executor executor) { + final var rejected = new AtomicBoolean(); + final var shouldBeRejected = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + fail("should not call onFailure"); + } + + @Override + protected void doRun() { + fail("should not call doRun"); + } + + @Override + public boolean isForceExecution() { + return randomBoolean(); + } + + @Override + public void onRejection(Exception e) { + assertTrue(asInstanceOf(EsRejectedExecutionException.class, e).isExecutorShutdown()); + assertTrue(rejected.compareAndSet(false, true)); + } + }; + assertTrue(expectThrows(EsRejectedExecutionException.class, () -> executor.execute(shouldBeRejected::doRun)).isExecutorShutdown()); + executor.execute(shouldBeRejected); + assertTrue(rejected.get()); + } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java index 5e82070800f1e..62295be7751eb 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.remotecluster; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -37,7 +37,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteConnectionInfo; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; @@ -70,7 +69,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/104567") public class RemoteClusterSecurityFcActionAuthorizationIT extends ESRestTestCase { @ClassRule @@ -117,12 +115,7 @@ private static try { return future.get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { - if (e.getCause() instanceof RemoteTransportException remoteTransportException - && remoteTransportException.getCause() instanceof Exception cause) { - throw cause; - } - - if (e.getCause() instanceof Exception cause) { + if (ExceptionsHelper.unwrapCause(e.getCause()) instanceof Exception cause) { throw cause; } diff --git a/x-pack/plugin/security/qa/security-basic/src/javaRestTest/java/org/elasticsearch/xpack/security/QueryUserIT.java b/x-pack/plugin/security/qa/security-basic/src/javaRestTest/java/org/elasticsearch/xpack/security/QueryUserIT.java index 8e6290163efcd..25feb9d97ef1e 100644 --- a/x-pack/plugin/security/qa/security-basic/src/javaRestTest/java/org/elasticsearch/xpack/security/QueryUserIT.java +++ b/x-pack/plugin/security/qa/security-basic/src/javaRestTest/java/org/elasticsearch/xpack/security/QueryUserIT.java @@ -54,6 +54,7 @@ private Request queryUserRequestWithAuth() { return request; } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/104603") public void testQuery() throws IOException { // No users to match yet assertQuery("", users -> assertThat(users, empty()));