Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move monitoring collection timeouts to coordinator #67084

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.elasticsearch.action.support.broadcast;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;

public abstract class BroadcastOperationRequestBuilder<
Request extends BroadcastRequest<Request>,
Expand All @@ -45,4 +46,10 @@ public final RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return (RequestBuilder) this;
}

@SuppressWarnings("unchecked")
public RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final RequestBuilder setNodesIds(String... nodesIds) {
}

@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
public RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(allowNoMatch, other.allowNoMatch);
return Objects.equals(jobId, other.jobId)
&& Objects.equals(allowNoMatch, other.allowNoMatch)
&& Objects.equals(getTimeout(), other.getTimeout());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Collection<MonitoringDoc> collect(final long timestamp, final long interv
return doCollect(convertNode(timestamp, clusterService.localNode()), interval, clusterState);
}
} catch (ElasticsearchTimeoutException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] timed out when collecting data", name()));
logger.error("collector [{}] timed out when collecting data: {}", name(), e.getMessage());
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("collector [{}] failed to collect data", name()), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.monitoring.collector;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.HashSet;
import java.util.concurrent.TimeoutException;

/**
* Utilities for identifying timeouts in responses to collection requests, since we prefer to fail the whole collection attempt if any of
* the involved nodes times out.
*/
public final class TimeoutUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadocs for this class as well as its public static methods please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, see 6b845ba.

private TimeoutUtils() {
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static <T extends BaseNodeResponse> void ensureNoTimeouts(TimeValue collectionTimeout, BaseNodesResponse<T> response) {
HashSet<String> timedOutNodeIds = null;
for (FailedNodeException failedNodeException : response.failures()) {
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static void ensureNoTimeouts(TimeValue collectionTimeout, BaseTasksResponse response) {
HashSet<String> timedOutNodeIds = null;
for (ElasticsearchException nodeFailure : response.getNodeFailures()) {
if (nodeFailure instanceof FailedNodeException) {
FailedNodeException failedNodeException = (FailedNodeException) nodeFailure;
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

/**
* @throws ElasticsearchTimeoutException iff the {@code response} contains any node-level timeout. The exception message identifies the
* nodes that timed out and mentions {@code collectionTimeout}.
*/
public static void ensureNoTimeouts(TimeValue collectionTimeout, BroadcastResponse response) {
HashSet<String> timedOutNodeIds = null;
for (DefaultShardOperationFailedException shardFailure : response.getShardFailures()) {
final Throwable shardFailureCause = shardFailure.getCause();
if (shardFailureCause instanceof FailedNodeException) {
FailedNodeException failedNodeException = (FailedNodeException) shardFailureCause;
if (isTimeoutFailure(failedNodeException)) {
if (timedOutNodeIds == null) {
timedOutNodeIds = new HashSet<>();
}
timedOutNodeIds.add(failedNodeException.nodeId());
}
}
}
ensureNoTimeouts(collectionTimeout, timedOutNodeIds);
}

private static boolean isTimeoutFailure(FailedNodeException failedNodeException) {
final Throwable cause = failedNodeException.getCause();
return cause instanceof ElasticsearchTimeoutException
|| cause instanceof TimeoutException
|| cause instanceof ReceiveTimeoutTransportException;
}

private static void ensureNoTimeouts(TimeValue collectionTimeout, HashSet<String> timedOutNodeIds) {
if (timedOutNodeIds != null) {
throw new ElasticsearchTimeoutException((timedOutNodeIds.size() == 1 ? "node " : "nodes ") + timedOutNodeIds +
" did not respond within [" + collectionTimeout + "]");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.elasticsearch.xpack.core.XPackSettings.SECURITY_ENABLED;
import static org.elasticsearch.xpack.core.XPackSettings.TRANSPORT_SSL_ENABLED;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for cluster stats.
Expand Down Expand Up @@ -82,13 +83,12 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final Supplier<ClusterStatsResponse> clusterStatsSupplier =
() -> client.admin().cluster().prepareClusterStats().get(getCollectionTimeout());
final ClusterState clusterState) {
final Supplier<List<XPackFeatureSet.Usage>> usageSupplier =
() -> new XPackUsageRequestBuilder(client).get().getUsages();

final ClusterStatsResponse clusterStats = clusterStatsSupplier.get();
final ClusterStatsResponse clusterStats = client.admin().cluster().prepareClusterStats().setTimeout(getCollectionTimeout()).get();
ensureNoTimeouts(getCollectionTimeout(), clusterStats);

final String clusterName = clusterService.getClusterName().value();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for the Recovery API.
Expand Down Expand Up @@ -64,13 +65,16 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
List<MonitoringDoc> results = new ArrayList<>(1);
RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries()
.setIndices(getCollectionIndices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setActiveOnly(getActiveRecoveriesOnly())
.get(getCollectionTimeout());
.setTimeout(getCollectionTimeout())
.get();

ensureNoTimeouts(getCollectionTimeout(), recoveryResponse);

if (recoveryResponse.hasRecoveries()) {
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for indices and singular index statistics.
* <p>
Expand Down Expand Up @@ -54,7 +56,7 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
final List<MonitoringDoc> results = new ArrayList<>();
final IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats()
.setIndices(getCollectionIndices())
Expand All @@ -71,7 +73,10 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
.setQueryCache(true)
.setRequestCache(true)
.setBulk(true)
.get(getCollectionTimeout());
.setTimeout(getCollectionTimeout())
.get();

ensureNoTimeouts(getCollectionTimeout(), indicesStatsResponse);

final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for Machine Learning Job Stats.
Expand Down Expand Up @@ -71,9 +72,10 @@ protected List<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final ClusterState clusterState) throws Exception {
// fetch details about all jobs
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(MONITORING_ORIGIN)) {
final GetJobsStatsAction.Response jobs =
client.execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(Metadata.ALL))
.actionGet(getCollectionTimeout());
final GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(Metadata.ALL).setTimeout(getCollectionTimeout());
final GetJobsStatsAction.Response jobs = client.execute(GetJobsStatsAction.INSTANCE, request).actionGet();

ensureNoTimeouts(getCollectionTimeout(), jobs);

final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.Objects;

import static org.elasticsearch.xpack.monitoring.collector.TimeoutUtils.ensureNoTimeouts;

/**
* Collector for nodes statistics.
* <p>
Expand Down Expand Up @@ -65,7 +67,7 @@ protected boolean shouldCollect(final boolean isElectedMaster) {
@Override
protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
final long interval,
final ClusterState clusterState) throws Exception {
final ClusterState clusterState) {
NodesStatsRequest request = new NodesStatsRequest("_local");
request.indices(FLAGS);
request.addMetrics(
Expand All @@ -74,8 +76,10 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node,
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.THREAD_POOL.metricName(),
NodesStatsRequest.Metric.FS.metricName());
request.timeout(getCollectionTimeout());

final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet(getCollectionTimeout());
final NodesStatsResponse response = client.admin().cluster().nodesStats(request).actionGet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an ensureNoTimeouts(getCollectionTimeout(), response); after this line as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was thinking no since we throw response.failures().get(0) anyway, but on reflection that'll be a FailedNodeException not the inner timeout. I'll address that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok done, see 56ce978.

ensureNoTimeouts(getCollectionTimeout(), response);

// if there's a failure, then we failed to work with the
// _local node (guaranteed a single exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.monitoring.collector.cluster;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder;
Expand Down Expand Up @@ -37,6 +39,7 @@
import org.junit.Assert;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.UUID;

Expand Down Expand Up @@ -189,7 +192,8 @@ public void testDoCollect() throws Exception {
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
Expand Down Expand Up @@ -280,7 +284,7 @@ public void testDoCollectNoLicense() throws Exception {
{
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
.thenReturn(new Index[0]);
.thenReturn(Index.EMPTY_ARRAY);
}

final Client client = mock(Client.class);
Expand All @@ -296,7 +300,8 @@ public void testDoCollectNoLicense() throws Exception {
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.get(eq(timeout))).thenReturn(mockClusterStatsResponse);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);
Expand Down Expand Up @@ -325,4 +330,58 @@ public void testDoCollectNoLicense() throws Exception {
final ClusterStatsMonitoringDoc doc = (ClusterStatsMonitoringDoc) results.iterator().next();
assertThat(doc.getLicense(), nullValue());
}

public void testDoCollectThrowsTimeoutException() throws Exception {
final TimeValue timeout;
{
final String clusterName = randomAlphaOfLength(10);
whenClusterStateWithName(clusterName);
final String clusterUUID = UUID.randomUUID().toString();
whenClusterStateWithUUID(clusterUUID);
timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
withCollectionTimeout(ClusterStatsCollector.CLUSTER_STATS_TIMEOUT, timeout);
}
final IndexNameExpressionResolver indexNameExpressionResolver;
{
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.lenientExpandOpen(), "apm-*"))
.thenReturn(Index.EMPTY_ARRAY);
}

final Client client = mock(Client.class);
{
final ClusterStatsResponse mockClusterStatsResponse = mock(ClusterStatsResponse.class);
final ClusterHealthStatus clusterStatus = randomFrom(ClusterHealthStatus.values());
when(mockClusterStatsResponse.getStatus()).thenReturn(clusterStatus);
when(mockClusterStatsResponse.getNodesStats()).thenReturn(mock(ClusterStatsNodes.class));
when(mockClusterStatsResponse.failures()).thenReturn(List.of(new FailedNodeException("node", "msg",
new ElasticsearchTimeoutException("timed out"))));

final ClusterStatsIndices mockClusterStatsIndices = mock(ClusterStatsIndices.class);

when(mockClusterStatsIndices.getIndexCount()).thenReturn(0);
when(mockClusterStatsResponse.getIndicesStats()).thenReturn(mockClusterStatsIndices);

final ClusterStatsRequestBuilder clusterStatsRequestBuilder = mock(ClusterStatsRequestBuilder.class);
when(clusterStatsRequestBuilder.setTimeout(eq(timeout))).thenReturn(clusterStatsRequestBuilder);
when(clusterStatsRequestBuilder.get()).thenReturn(mockClusterStatsResponse);

final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
when(clusterAdminClient.prepareClusterStats()).thenReturn(clusterStatsRequestBuilder);

final AdminClient adminClient = mock(AdminClient.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
when(client.admin()).thenReturn(adminClient);
}

final long interval = randomNonNegativeLong();
final Settings.Builder settings = Settings.builder();
final MonitoringDoc.Node node = MonitoringTestUtils.randomMonitoringNode(random());

final ClusterStatsCollector collector =
new ClusterStatsCollector(settings.build(), clusterService, licenseState,
client, licenseService, indexNameExpressionResolver);
expectThrows(ElasticsearchTimeoutException.class, () -> collector.doCollect(node, interval, clusterState));
}

}
Loading