Skip to content

Commit

Permalink
Fork calculating the health indicators to Management pool (#91628)
Browse files Browse the repository at this point in the history
When a request for the health API comes in we first reach out to the
health node (transport request) to fetch the cluster health state. When
we get a response we calculate all the health indicators (except
`master_is_stable` which executes before we do anything else).

Calculating the health indicators can get expensive when a large number
of shards are present in the cluster and we shouldn't keep the transport
worker thread busy with this.

This forks the health indicators health computation to the Management
pool.
  • Loading branch information
andreidan authored Nov 18, 2022
1 parent 45dba98 commit ec4e9cc
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.http;

import org.apache.http.client.methods.HttpGet;
import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -40,7 +39,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91597")
public class HealthRestCancellationIT extends HttpSmokeTestCase {

@Override
Expand Down
56 changes: 44 additions & 12 deletions server/src/main/java/org/elasticsearch/health/HealthService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

package org.elasticsearch.health;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -44,10 +45,10 @@ public class HealthService {
* Detail map key that contains the reasons a result was marked as UNKNOWN
*/
private static final String REASON = "reasons";
private static final Logger logger = LogManager.getLogger(HealthService.class);

private final List<HealthIndicatorService> preflightHealthIndicatorServices;
private final List<HealthIndicatorService> healthIndicatorServices;
private final ThreadPool threadPool;

/**
* Creates a new HealthService.
Expand All @@ -62,10 +63,12 @@ public class HealthService {
*/
public HealthService(
List<HealthIndicatorService> preflightHealthIndicatorServices,
List<HealthIndicatorService> healthIndicatorServices
List<HealthIndicatorService> healthIndicatorServices,
ThreadPool threadPool
) {
this.preflightHealthIndicatorServices = preflightHealthIndicatorServices;
this.healthIndicatorServices = healthIndicatorServices;
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -105,24 +108,53 @@ public void getHealth(
@Override
public void onResponse(FetchHealthInfoCacheAction.Response response) {
HealthInfo healthInfo = response.getHealthInfo();
validateResultsAndNotifyListener(
// fork off to the management pool as calculating the indicators can run for longer than is acceptable
// on a transport thread in case of large numbers of indices
ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable(
indicatorName,
Stream.concat(filteredPreflightResults, filteredIndicators.map(service -> service.calculate(explain, healthInfo)))
.toList(),
healthInfo,
explain,
listener
);

try {
threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable);
} catch (EsRejectedExecutionException e) {
calculateFilteredIndicatorsRunnable.onRejection(e);
}
}

@Override
public void onFailure(Exception e) {
validateResultsAndNotifyListener(
// fork off to the management pool as calculating the indicators can run for longer than is acceptable
// on a transport thread in case of large numbers of indices
ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable(
indicatorName,
Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
).toList(),
HealthInfo.EMPTY_HEALTH_INFO,
explain,
listener
);
try {
threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable);
} catch (EsRejectedExecutionException esRejectedExecutionException) {
calculateFilteredIndicatorsRunnable.onRejection(esRejectedExecutionException);
}
}

private ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable(
String indicatorName,
HealthInfo healthInfo,
boolean explain,
ActionListener<List<HealthIndicatorResult>> listener
) {
return ActionRunnable.wrap(listener, l -> {
List<HealthIndicatorResult> results = Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, healthInfo))
).toList();

validateResultsAndNotifyListener(indicatorName, results, l);
});
}
});

Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ protected Node(
discoveryModule.getCoordinator(),
masterHistoryService
);
HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService);
HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService, threadPool);
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
Expand Down Expand Up @@ -1196,7 +1196,8 @@ private Tracer getTracer(PluginsService pluginsService, Settings settings) {
private HealthService createHealthService(
ClusterService clusterService,
ClusterModule clusterModule,
CoordinationDiagnosticsService coordinationDiagnosticsService
CoordinationDiagnosticsService coordinationDiagnosticsService,
ThreadPool threadPool
) {
List<HealthIndicatorService> preflightHealthIndicatorServices = Collections.singletonList(
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService)
Expand All @@ -1214,7 +1215,8 @@ private HealthService createHealthService(
.toList();
return new HealthService(
preflightHealthIndicatorServices,
concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices)
concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices),
threadPool
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -38,6 +42,19 @@

public class HealthServiceTests extends ESTestCase {

private ThreadPool threadPool;

@Before
public void setupThreadpool() {
threadPool = new TestThreadPool(HealthServiceTests.class.getSimpleName());
}

@After
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testShouldReturnGroupedIndicators() throws Exception {

var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null);
Expand All @@ -50,7 +67,8 @@ public void testShouldReturnGroupedIndicators() throws Exception {
createMockHealthIndicatorService(networkLatency),
createMockHealthIndicatorService(slowTasks),
createMockHealthIndicatorService(shardsAvailable)
)
),
threadPool
);

NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
Expand Down Expand Up @@ -94,30 +112,6 @@ public void onFailure(Exception e) {
};
}

public void testDuplicateIndicatorNames() throws Exception {
// Same indicator name, should throw exception:
var networkLatency = new HealthIndicatorResult(
"network_latency",
GREEN,
null,
null,
Collections.emptyList(),
Collections.emptyList()
);
var slowTasks = new HealthIndicatorResult("network_latency", YELLOW, null, null, Collections.emptyList(), Collections.emptyList());
var service = new HealthService(
Collections.emptyList(),
List.of(
createMockHealthIndicatorService(networkLatency),
createMockHealthIndicatorService(slowTasks),
createMockHealthIndicatorService(networkLatency)
)
);
NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
// This is testing an assertion, so we expect it to blow up in place rather than calling onFailure:
assertGetHealthThrowsException(service, client, null, true, AssertionError.class, null, false);
}

public void testMissingIndicator() throws Exception {
var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null);
var slowTasks = new HealthIndicatorResult("slow_task_assignment", YELLOW, null, null, null, null);
Expand All @@ -129,7 +123,8 @@ public void testMissingIndicator() throws Exception {
createMockHealthIndicatorService(networkLatency),
createMockHealthIndicatorService(slowTasks),
createMockHealthIndicatorService(shardsAvailable)
)
),
threadPool
);
NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
assertGetHealthThrowsException(
Expand Down Expand Up @@ -216,7 +211,8 @@ public void testPreflightIndicatorResultsPresent() throws Exception {
createMockHealthIndicatorService(networkLatency),
createMockHealthIndicatorService(slowTasks),
createMockHealthIndicatorService(shardsAvailable)
)
),
threadPool
);
NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);

Expand Down Expand Up @@ -255,7 +251,8 @@ public void testThatIndicatorsGetHealthInfoData() throws Exception {
createMockHealthIndicatorService(networkLatency, healthInfo),
createMockHealthIndicatorService(slowTasks, healthInfo),
createMockHealthIndicatorService(shardsAvailable, healthInfo)
)
),
threadPool
);
NodeClient client = getTestClient(healthInfo);

Expand Down Expand Up @@ -283,7 +280,8 @@ public void testPreflightIndicatorFailureTriggersUnknownResults() throws Excepti
createMockHealthIndicatorService(networkLatency),
createMockHealthIndicatorService(slowTasks),
createMockHealthIndicatorService(shardsAvailable)
)
),
threadPool
);
NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
{
Expand Down

0 comments on commit ec4e9cc

Please sign in to comment.