Skip to content

Commit

Permalink
Refactor KibanaThreadPoolIT/SystemIndexThreadPoolTestCase for resilie…
Browse files Browse the repository at this point in the history
…ncy (elastic#108463)

the in SystemIndexThreadPoolTestCase#testUserThreadPoolsAreBlocked sometimes was blocked instead of throwing an expected exception (due to a queue on a threadpool being full).
IT happens that submitting a busyTask does not guarantee that it will be executed immediately by a threadpool. It might be that some other task was executing at the time
This commit refactors the way threadpool is populated and makes sure that before the queue are filled, all the busyTasks are executed on threadpools

based on the test failure -> elastic#107625 thread pool's threads were busy, but I cannot tell if a queue was full before the search request was submitted.
  • Loading branch information
pgomulka authored May 14, 2024
1 parent a3b25a9 commit 4e24cf9
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,66 @@
package org.elasticsearch.kibana;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexThreadPoolTestCase;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;

public class KibanaThreadPoolIT extends SystemIndexThreadPoolTestCase {
/**
* Tests to verify that system indices are bypassing user-space thread pools
*
* <p>We can block thread pools by setting them to one thread and 1 element queue, then submitting
* threads that wait on a phaser. This lets us verify that operations on system indices
* are being directed to other thread pools.</p>
*/
public class KibanaThreadPoolIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 1)
.put("thread_pool.get.size", 1)
.put("thread_pool.get.queue_size", 1)
.build();
}

private static final String USER_INDEX = "user_index";
// For system indices that use ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS, we'll want to
// block normal system index thread pools as well.
private static final Set<String> THREAD_POOLS_TO_BLOCK = Set.of(ThreadPool.Names.GET, ThreadPool.Names.WRITE, ThreadPool.Names.SEARCH);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Set.of(KibanaPlugin.class);
}

public void testKibanaThreadPool() {
public void testKibanaThreadPoolByPassesBlockedThreadPools() throws Exception {
List<String> kibanaSystemIndices = Stream.of(
KibanaPlugin.KIBANA_INDEX_DESCRIPTOR.getIndexPattern(),
KibanaPlugin.REPORTING_INDEX_DESCRIPTOR.getIndexPattern(),
Expand Down Expand Up @@ -61,4 +97,108 @@ public void testKibanaThreadPool() {
}
});
}

public void testBlockedThreadPoolsRejectUserRequests() throws Exception {
assertAcked(client().admin().indices().prepareCreate(USER_INDEX));

runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

assertAcked(client().admin().indices().prepareDelete(USER_INDEX));
}

private void assertThreadPoolsBlocked() {

var e1 = expectThrows(
EsRejectedExecutionException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
var e3 = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(USER_INDEX)
.setQuery(QueryBuilders.matchAllQuery())
// Request times out if max concurrent shard requests is set to 1
.setMaxConcurrentShardRequests(usually() ? SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : randomIntBetween(2, 10))
.get()
);
assertThat(e3.getMessage(), containsString("all shards failed"));
}

protected void runWithBlockedThreadPools(Runnable runnable) throws Exception {
Phaser phaser = new Phaser();

// register this test's thread
phaser.register();

blockThreadPool(phaser);
phaser.arriveAndAwaitAdvance();// wait until all waitAction are executing

fillQueues();

logger.debug("number of nodes " + internalCluster().getNodeNames().length);
logger.debug("number of parties arrived " + phaser.getArrivedParties());
try {
runnable.run();
} finally {
phaser.arriveAndAwaitAdvance(); // release all waitAction
}
}

private void blockThreadPool(Phaser phaser) {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : THREAD_POOLS_TO_BLOCK) {
blockThreadPool(threadPoolName, threadPool, phaser);
}
}
}

private void fillQueues() {
for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : THREAD_POOLS_TO_BLOCK) {
fillThreadPoolQueues(threadPoolName, threadPool);
}
}
}

private static void blockThreadPool(String threadPoolName, ThreadPool threadPool, Phaser phaser) {
ThreadPool.Info info = threadPool.info(threadPoolName);

Runnable waitAction = () -> {
phaser.arriveAndAwaitAdvance();// block until all are executed on a threadpool
phaser.arriveAndAwaitAdvance();// block until main thread has not finished
};

phaser.bulkRegister(info.getMax());

for (int i = 0; i < info.getMax(); i++) {
// we need to make sure that there is a task blocking a thread pool
// otherwise a queue might end up having a spot
do {
try {
threadPool.executor(threadPoolName).execute(waitAction);
break;
} catch (EsRejectedExecutionException e) {
// if exception was thrown when submitting, retry.
}
} while (true);
}
}

private static void fillThreadPoolQueues(String threadPoolName, ThreadPool threadPool) {
ThreadPool.Info info = threadPool.info(threadPoolName);

for (int i = 0; i < info.getQueueSize().singles(); i++) {
try {
threadPool.executor(threadPoolName).execute(() -> {});
} catch (EsRejectedExecutionException e) {
// we can't be sure that some other task won't get queued in a test cluster
// but the threadpool's thread is already blocked
}
}
}

}

This file was deleted.

0 comments on commit 4e24cf9

Please sign in to comment.