diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HealthRestCancellationIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HealthRestCancellationIT.java index 99b4c01695483..e0d6a75495ded 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HealthRestCancellationIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HealthRestCancellationIT.java @@ -9,7 +9,6 @@ package org.elasticsearch.http; import org.apache.http.client.methods.HttpGet; -import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Cancellable; import org.elasticsearch.client.Request; @@ -40,7 +39,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; -@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91597") public class HealthRestCancellationIT extends HttpSmokeTestCase { @Override diff --git a/server/src/main/java/org/elasticsearch/health/HealthService.java b/server/src/main/java/org/elasticsearch/health/HealthService.java index 22969234e6c65..c3079c6a71701 100644 --- a/server/src/main/java/org/elasticsearch/health/HealthService.java +++ b/server/src/main/java/org/elasticsearch/health/HealthService.java @@ -8,14 +8,15 @@ package org.elasticsearch.health; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; import org.elasticsearch.health.node.FetchHealthInfoCacheAction; import org.elasticsearch.health.node.HealthInfo; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.HashSet; @@ -44,10 +45,10 @@ public class HealthService { * Detail map key that contains the reasons a result was marked as UNKNOWN */ private static final String REASON = "reasons"; - private static final Logger logger = LogManager.getLogger(HealthService.class); private final List preflightHealthIndicatorServices; private final List healthIndicatorServices; + private final ThreadPool threadPool; /** * Creates a new HealthService. @@ -62,10 +63,12 @@ public class HealthService { */ public HealthService( List preflightHealthIndicatorServices, - List healthIndicatorServices + List healthIndicatorServices, + ThreadPool threadPool ) { this.preflightHealthIndicatorServices = preflightHealthIndicatorServices; this.healthIndicatorServices = healthIndicatorServices; + this.threadPool = threadPool; } /** @@ -105,24 +108,53 @@ public void getHealth( @Override public void onResponse(FetchHealthInfoCacheAction.Response response) { HealthInfo healthInfo = response.getHealthInfo(); - validateResultsAndNotifyListener( + // fork off to the management pool as calculating the indicators can run for longer than is acceptable + // on a transport thread in case of large numbers of indices + ActionRunnable> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable( indicatorName, - Stream.concat(filteredPreflightResults, filteredIndicators.map(service -> service.calculate(explain, healthInfo))) - .toList(), + healthInfo, + explain, listener ); + + try { + threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable); + } catch (EsRejectedExecutionException e) { + calculateFilteredIndicatorsRunnable.onRejection(e); + } } @Override public void onFailure(Exception e) { - validateResultsAndNotifyListener( + // fork off to the management pool as calculating the indicators can run for longer than is acceptable + // on a transport thread in case of large numbers of indices + ActionRunnable> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable( indicatorName, - Stream.concat( - filteredPreflightResults, - filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO)) - ).toList(), + HealthInfo.EMPTY_HEALTH_INFO, + explain, listener ); + try { + threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable); + } catch (EsRejectedExecutionException esRejectedExecutionException) { + calculateFilteredIndicatorsRunnable.onRejection(esRejectedExecutionException); + } + } + + private ActionRunnable> calculateFilteredIndicatorsRunnable( + String indicatorName, + HealthInfo healthInfo, + boolean explain, + ActionListener> listener + ) { + return ActionRunnable.wrap(listener, l -> { + List results = Stream.concat( + filteredPreflightResults, + filteredIndicators.map(service -> service.calculate(explain, healthInfo)) + ).toList(); + + validateResultsAndNotifyListener(indicatorName, results, l); + }); } }); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f8cabf6ccaf72..d591f4babb1dc 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -997,7 +997,7 @@ protected Node( discoveryModule.getCoordinator(), masterHistoryService ); - HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService); + HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService, threadPool); HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings); LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client); HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService); @@ -1196,7 +1196,8 @@ private Tracer getTracer(PluginsService pluginsService, Settings settings) { private HealthService createHealthService( ClusterService clusterService, ClusterModule clusterModule, - CoordinationDiagnosticsService coordinationDiagnosticsService + CoordinationDiagnosticsService coordinationDiagnosticsService, + ThreadPool threadPool ) { List preflightHealthIndicatorServices = Collections.singletonList( new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService) @@ -1214,7 +1215,8 @@ private HealthService createHealthService( .toList(); return new HealthService( preflightHealthIndicatorServices, - concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices) + concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices), + threadPool ); } diff --git a/server/src/test/java/org/elasticsearch/health/HealthServiceTests.java b/server/src/test/java/org/elasticsearch/health/HealthServiceTests.java index 4b3de52018a38..14c69ed90d374 100644 --- a/server/src/test/java/org/elasticsearch/health/HealthServiceTests.java +++ b/server/src/test/java/org/elasticsearch/health/HealthServiceTests.java @@ -16,6 +16,10 @@ import org.elasticsearch.health.node.FetchHealthInfoCacheAction; import org.elasticsearch.health.node.HealthInfo; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.util.Collections; import java.util.HashMap; @@ -38,6 +42,19 @@ public class HealthServiceTests extends ESTestCase { + private ThreadPool threadPool; + + @Before + public void setupThreadpool() { + threadPool = new TestThreadPool(HealthServiceTests.class.getSimpleName()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + public void testShouldReturnGroupedIndicators() throws Exception { var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null); @@ -50,7 +67,8 @@ public void testShouldReturnGroupedIndicators() throws Exception { createMockHealthIndicatorService(networkLatency), createMockHealthIndicatorService(slowTasks), createMockHealthIndicatorService(shardsAvailable) - ) + ), + threadPool ); NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO); @@ -94,30 +112,6 @@ public void onFailure(Exception e) { }; } - public void testDuplicateIndicatorNames() throws Exception { - // Same indicator name, should throw exception: - var networkLatency = new HealthIndicatorResult( - "network_latency", - GREEN, - null, - null, - Collections.emptyList(), - Collections.emptyList() - ); - var slowTasks = new HealthIndicatorResult("network_latency", YELLOW, null, null, Collections.emptyList(), Collections.emptyList()); - var service = new HealthService( - Collections.emptyList(), - List.of( - createMockHealthIndicatorService(networkLatency), - createMockHealthIndicatorService(slowTasks), - createMockHealthIndicatorService(networkLatency) - ) - ); - NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO); - // This is testing an assertion, so we expect it to blow up in place rather than calling onFailure: - assertGetHealthThrowsException(service, client, null, true, AssertionError.class, null, false); - } - public void testMissingIndicator() throws Exception { var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null); var slowTasks = new HealthIndicatorResult("slow_task_assignment", YELLOW, null, null, null, null); @@ -129,7 +123,8 @@ public void testMissingIndicator() throws Exception { createMockHealthIndicatorService(networkLatency), createMockHealthIndicatorService(slowTasks), createMockHealthIndicatorService(shardsAvailable) - ) + ), + threadPool ); NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO); assertGetHealthThrowsException( @@ -216,7 +211,8 @@ public void testPreflightIndicatorResultsPresent() throws Exception { createMockHealthIndicatorService(networkLatency), createMockHealthIndicatorService(slowTasks), createMockHealthIndicatorService(shardsAvailable) - ) + ), + threadPool ); NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO); @@ -255,7 +251,8 @@ public void testThatIndicatorsGetHealthInfoData() throws Exception { createMockHealthIndicatorService(networkLatency, healthInfo), createMockHealthIndicatorService(slowTasks, healthInfo), createMockHealthIndicatorService(shardsAvailable, healthInfo) - ) + ), + threadPool ); NodeClient client = getTestClient(healthInfo); @@ -283,7 +280,8 @@ public void testPreflightIndicatorFailureTriggersUnknownResults() throws Excepti createMockHealthIndicatorService(networkLatency), createMockHealthIndicatorService(slowTasks), createMockHealthIndicatorService(shardsAvailable) - ) + ), + threadPool ); NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO); {