Skip to content

Commit

Permalink
Check hot threads output in ThreadPoolTests
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Dec 6, 2024
1 parent bb04376 commit 28a93af
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.opensearch.test.framework.cluster.TestRestClient.HttpResponse;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.opensearch.security.support.ConfigConstants.SECURITY_RESTAPI_ROLES_ENABLED;
import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS;
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
Expand All @@ -50,7 +52,7 @@ public class ThreadPoolTests {
.build();

@Test
public void testEnsureNoThreadLeftRunningInGenericThreadPool() throws IOException {
public void testEnsureNoThreadLeftRunningInGenericThreadPool() throws IOException, InterruptedException {
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
client.put("test-index");

Expand All @@ -75,6 +77,12 @@ public void testEnsureNoThreadLeftRunningInGenericThreadPool() throws IOExceptio
assertThat(updateDocResponse.getStatusCode(), equalTo(RestStatus.OK.getStatus()));

client.delete("test-index");

Thread.sleep(2000);

HttpResponse hotThreadsResponse = client.get("_nodes/hot_threads");

assertThat(hotThreadsResponse.getBody(), not(containsString("ClusterStateMetadataDependentPrivileges")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,9 @@ public LocalCluster build() {
testCertificates = new TestCertificates(clusterManager.getNodes());
}
clusterName += "_" + num.incrementAndGet();
nodeOverrideSettingsBuilder.put("monitor.fs.health.enabled", "true");
Settings settings = nodeOverrideSettingsBuilder.build();
Map<Integer, Settings> nodeSpecificSettings = new HashMap<>();
for (Map.Entry<Integer, Settings.Builder> entry : nodeSpecificOverrideSettingsBuilder.entrySet()) {
entry.getValue().put("monitor.fs.health.enabled", "true");
nodeSpecificSettings.put(entry.getKey(), entry.getValue().build());
}
return new LocalCluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@
import org.apache.logging.log4j.Logger;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand All @@ -71,7 +67,6 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.test.framework.certificate.TestCertificates;
import org.opensearch.test.framework.cluster.ClusterManager.NodeSettings;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.BindTransportException;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -224,74 +219,17 @@ public boolean isStarted() {
return started;
}

public void stop(boolean checkActiveThreads) throws IOException {
Client client = clientNode().getInternalNodeClient();
AdminClient adminClient = client.admin();

adminClient.indices().prepareFlush().get();
adminClient.indices().prepareRefresh().get();

if (checkActiveThreads) {
int maxRetries = 3;
int retryCount = 0;
boolean threadsActive = true;

while (retryCount < maxRetries && threadsActive) {
try {
threadsActive = false;
final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
threadsActive = true;
break;
}
}
if (threadsActive) {
// TODO Hot Threads added for debugging
NodesHotThreadsResponse hotThreadsResponse = adminClient.cluster().prepareNodesHotThreads().get();
if (hotThreadsResponse.getNodes().isEmpty()) {
log.warn("Hot threads response is empty");
break;
}
log.warn("Hot threads stack trace: {}", hotThreadsResponse.getNodes().get(0).getHotThreads());
break;
}
}

if (threadsActive && retryCount < maxRetries - 1) {
// Add a small delay between retries
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting between retries", e);
}
retryCount++;
}

if (threadsActive) {
throw new IOException("Thread pools still have active threads after " + maxRetries + " attempts");
}
}

public void stop() {
List<CompletableFuture<Boolean>> stopFutures = new ArrayList<>();
for (Node node : nodes) {
stopFutures.add(node.stop(2, TimeUnit.SECONDS));
}
CompletableFuture.allOf(stopFutures.toArray(CompletableFuture[]::new)).join();
}

public void destroy() throws IOException {
public void destroy() {
try {
stop(true);
stop();
nodes.clear();
} finally {
try {
Expand Down Expand Up @@ -336,7 +274,7 @@ private void retry() throws Exception {
throw new RuntimeException("Detected port collisions for cluster manager node. Giving up.");
}

stop(false);
stop();

this.nodes.clear();
this.seedHosts = null;
Expand Down

0 comments on commit 28a93af

Please sign in to comment.