From 4e24cf9514cac7f164665127d727d192b7ce38e1 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 14 May 2024 17:01:11 +0200 Subject: [PATCH] Refactor KibanaThreadPoolIT/SystemIndexThreadPoolTestCase for resiliency (#108463) the in SystemIndexThreadPoolTestCase#testUserThreadPoolsAreBlocked sometimes was blocked instead of throwing an expected exception (due to a queue on a threadpool being full). IT happens that submitting a busyTask does not guarantee that it will be executed immediately by a threadpool. It might be that some other task was executing at the time This commit refactors the way threadpool is populated and makes sure that before the queue are filled, all the busyTasks are executed on threadpools based on the test failure -> #107625 thread pool's threads were busy, but I cannot tell if a queue was full before the search request was submitted. --- .../kibana/KibanaThreadPoolIT.java | 146 +++++++++++++++++- .../SystemIndexThreadPoolTestCase.java | 120 -------------- 2 files changed, 143 insertions(+), 123 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTestCase.java diff --git a/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java b/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java index 317bfa9edd1c9..275666eec5c42 100644 --- a/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java +++ b/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java @@ -9,30 +9,66 @@ package org.elasticsearch.kibana; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.indices.SystemIndexThreadPoolTestCase; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.Phaser; import java.util.stream.Stream; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.startsWith; -public class KibanaThreadPoolIT extends SystemIndexThreadPoolTestCase { +/** + * Tests to verify that system indices are bypassing user-space thread pools + * + *

We can block thread pools by setting them to one thread and 1 element queue, then submitting + * threads that wait on a phaser. This lets us verify that operations on system indices + * are being directed to other thread pools.

+ */ +public class KibanaThreadPoolIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB") + .put("thread_pool.search.size", 1) + .put("thread_pool.search.queue_size", 1) + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 1) + .put("thread_pool.get.size", 1) + .put("thread_pool.get.queue_size", 1) + .build(); + } + + private static final String USER_INDEX = "user_index"; + // For system indices that use ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS, we'll want to + // block normal system index thread pools as well. + private static final Set THREAD_POOLS_TO_BLOCK = Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); @Override protected Collection> nodePlugins() { return Set.of(KibanaPlugin.class); } - public void testKibanaThreadPool() { + public void testKibanaThreadPoolByPassesBlockedThreadPools() throws Exception { List kibanaSystemIndices = Stream.of( KibanaPlugin.KIBANA_INDEX_DESCRIPTOR.getIndexPattern(), KibanaPlugin.REPORTING_INDEX_DESCRIPTOR.getIndexPattern(), @@ -61,4 +97,108 @@ public void testKibanaThreadPool() { } }); } + + public void testBlockedThreadPoolsRejectUserRequests() throws Exception { + assertAcked(client().admin().indices().prepareCreate(USER_INDEX)); + + runWithBlockedThreadPools(this::assertThreadPoolsBlocked); + + assertAcked(client().admin().indices().prepareDelete(USER_INDEX)); + } + + private void assertThreadPoolsBlocked() { + + var e1 = expectThrows( + EsRejectedExecutionException.class, + () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get() + ); + assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable")); + var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get()); + assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable")); + var e3 = expectThrows( + SearchPhaseExecutionException.class, + () -> client().prepareSearch(USER_INDEX) + .setQuery(QueryBuilders.matchAllQuery()) + // Request times out if max concurrent shard requests is set to 1 + .setMaxConcurrentShardRequests(usually() ? SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : randomIntBetween(2, 10)) + .get() + ); + assertThat(e3.getMessage(), containsString("all shards failed")); + } + + protected void runWithBlockedThreadPools(Runnable runnable) throws Exception { + Phaser phaser = new Phaser(); + + // register this test's thread + phaser.register(); + + blockThreadPool(phaser); + phaser.arriveAndAwaitAdvance();// wait until all waitAction are executing + + fillQueues(); + + logger.debug("number of nodes " + internalCluster().getNodeNames().length); + logger.debug("number of parties arrived " + phaser.getArrivedParties()); + try { + runnable.run(); + } finally { + phaser.arriveAndAwaitAdvance(); // release all waitAction + } + } + + private void blockThreadPool(Phaser phaser) { + for (String nodeName : internalCluster().getNodeNames()) { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); + for (String threadPoolName : THREAD_POOLS_TO_BLOCK) { + blockThreadPool(threadPoolName, threadPool, phaser); + } + } + } + + private void fillQueues() { + for (String nodeName : internalCluster().getNodeNames()) { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); + for (String threadPoolName : THREAD_POOLS_TO_BLOCK) { + fillThreadPoolQueues(threadPoolName, threadPool); + } + } + } + + private static void blockThreadPool(String threadPoolName, ThreadPool threadPool, Phaser phaser) { + ThreadPool.Info info = threadPool.info(threadPoolName); + + Runnable waitAction = () -> { + phaser.arriveAndAwaitAdvance();// block until all are executed on a threadpool + phaser.arriveAndAwaitAdvance();// block until main thread has not finished + }; + + phaser.bulkRegister(info.getMax()); + + for (int i = 0; i < info.getMax(); i++) { + // we need to make sure that there is a task blocking a thread pool + // otherwise a queue might end up having a spot + do { + try { + threadPool.executor(threadPoolName).execute(waitAction); + break; + } catch (EsRejectedExecutionException e) { + // if exception was thrown when submitting, retry. + } + } while (true); + } + } + + private static void fillThreadPoolQueues(String threadPoolName, ThreadPool threadPool) { + ThreadPool.Info info = threadPool.info(threadPoolName); + + for (int i = 0; i < info.getQueueSize().singles(); i++) { + try { + threadPool.executor(threadPoolName).execute(() -> {}); + } catch (EsRejectedExecutionException e) { + // we can't be sure that some other task won't get queued in a test cluster + // but the threadpool's thread is already blocked + } + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTestCase.java b/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTestCase.java deleted file mode 100644 index e105d61f7ee0a..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTestCase.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.indices; - -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Phaser; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.startsWith; - -/** - * Tests to verify that system indices are bypassing user-space thread pools - * - *

We can block thread pools by setting them to one thread and no queue, then submitting - * threads that wait on a countdown latch. This lets us verify that operations on system indices - * are being directed to other thread pools.

- * - *

When implementing this class, don't forget to override {@link ESIntegTestCase#nodePlugins()} if - * the relevant system index is defined in a plugin.

- */ -public abstract class SystemIndexThreadPoolTestCase extends ESIntegTestCase { - - private static final String USER_INDEX = "user_index"; - - // For system indices that use ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS, we'll want to - // block normal system index thread pools as well. - protected Set threadPoolsToBlock() { - return Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH); - } - - protected void runWithBlockedThreadPools(Runnable runnable) { - Phaser phaser = new Phaser(); - Runnable waitAction = () -> { - phaser.arriveAndAwaitAdvance(); - phaser.arriveAndAwaitAdvance(); - }; - phaser.register(); // register this test's thread - - for (String nodeName : internalCluster().getNodeNames()) { - ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); - for (String threadPoolName : threadPoolsToBlock()) { - ThreadPool.Info info = threadPool.info(threadPoolName); - phaser.bulkRegister(info.getMax()); - for (int i = 0; i < info.getMax(); i++) { - threadPool.executor(threadPoolName).submit(waitAction); - } - } - } - phaser.arriveAndAwaitAdvance(); - try { - runnable.run(); - } finally { - phaser.arriveAndAwaitAdvance(); - } - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107625") - public void testUserThreadPoolsAreBlocked() { - assertAcked(client().admin().indices().prepareCreate(USER_INDEX)); - - runWithBlockedThreadPools(this::assertThreadPoolsBlocked); - - assertAcked(client().admin().indices().prepareDelete(USER_INDEX)); - } - - private void assertThreadPoolsBlocked() { - fillThreadPoolQueues(); // rejections are easier to check than timeouts - - var e1 = expectThrows( - EsRejectedExecutionException.class, - () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get() - ); - assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable")); - var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get()); - assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable")); - var e3 = expectThrows( - SearchPhaseExecutionException.class, - () -> client().prepareSearch(USER_INDEX) - .setQuery(QueryBuilders.matchAllQuery()) - // Request times out if max concurrent shard requests is set to 1 - .setMaxConcurrentShardRequests(usually() ? SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : randomIntBetween(2, 10)) - .get() - ); - assertThat(e3.getMessage(), containsString("all shards failed")); - } - - private void fillThreadPoolQueues() { - for (String nodeName : internalCluster().getNodeNames()) { - ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName); - for (String threadPoolName : threadPoolsToBlock()) { - ThreadPool.Info info = threadPool.info(threadPoolName); - - // fill up the queue - for (int i = 0; i < info.getQueueSize().singles(); i++) { - try { - threadPool.executor(threadPoolName).submit(() -> {}); - } catch (EsRejectedExecutionException e) { - // we can't be sure that some other task won't get queued in a test cluster - // but we should put all the tasks in there anyway - } - } - } - } - } -}