Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DownsampleMetrics #106632

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.downsample.DownsampleShardTask;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -133,4 +134,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, DownsampleShardTaskParams.NAME, DownsampleShardTaskParams::new)
);
}

@Override
public Collection<?> createComponents(PluginServices services) {
return List.of(new DownsampleMetrics(services.telemetryProvider().getMeterRegistry()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.io.IOException;
import java.util.Map;

/**
* Contains metrics related to downsampling actions.
* It gets initialized as a component by the {@link Downsample} plugin, can be injected to its actions.
*
* In tests, use TestTelemetryPlugin to inject a MeterRegistry for testing purposes
* and check that metrics get recorded as expected.
*
* To add a new metric, you need to:
* - Add a constant for its name, following the naming conventions for metrics.
* - Register it in method {@link #doStart}.
* - Add a function for recording its value.
* - If needed, inject {@link DownsampleMetrics} to the action containing the logic
* that records the metric value. For reference, see {@link TransportDownsampleIndexerAction}.
*/
public class DownsampleMetrics extends AbstractLifecycleComponent {

public static final String LATENCY_SHARD = "es.tsdb.downsample.latency.shard.histogram";

private final MeterRegistry meterRegistry;

public DownsampleMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

@Override
protected void doStart() {
// Register all metrics to track.
meterRegistry.registerLongHistogram(LATENCY_SHARD, "Downsampling action latency per shard", "ms");
}

@Override
protected void doStop() {}

@Override
protected void doClose() throws IOException {}

enum ShardActionStatus {

SUCCESS("success"),
MISSING_DOCS("missing_docs"),
FAILED("failed");

public static final String NAME = "status";

private final String message;

ShardActionStatus(String message) {
this.message = message;
}

String getMessage() {
return message;
}
}

void recordLatencyShard(long durationInMilliSeconds, ShardActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ShardActionStatus.NAME, status.getMessage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class DownsampleShardIndexer {
public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = new ByteSizeValue(50, ByteSizeUnit.MB);
private final IndexShard indexShard;
private final Client client;
private final DownsampleMetrics downsampleMetrics;
private final String downsampleIndex;
private final Engine.Searcher searcher;
private final SearchExecutionContext searchExecutionContext;
Expand All @@ -103,6 +104,7 @@ class DownsampleShardIndexer {
final DownsampleShardTask task,
final Client client,
final IndexService indexService,
final DownsampleMetrics downsampleMetrics,
final ShardId shardId,
final String downsampleIndex,
final DownsampleConfig config,
Expand All @@ -113,6 +115,7 @@ class DownsampleShardIndexer {
) {
this.task = task;
this.client = client;
this.downsampleMetrics = downsampleMetrics;
this.indexShard = indexService.getShard(shardId.id());
this.downsampleIndex = downsampleIndex;
this.searcher = indexShard.acquireSearcher("downsampling");
Expand Down Expand Up @@ -164,14 +167,15 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
timeSeriesSearcher.search(initialStateQuery, bucketCollector);
}

TimeValue duration = TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime);
logger.info(
"Shard [{}] successfully sent [{}], received source doc [{}], indexed downsampled doc [{}], failed [{}], took [{}]",
indexShard.shardId(),
task.getNumReceived(),
task.getNumSent(),
task.getNumIndexed(),
task.getNumFailed(),
TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime)
duration
);

if (task.getNumIndexed() != task.getNumSent()) {
Expand All @@ -187,6 +191,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumSent()
+ "]";
logger.info(error);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.MISSING_DOCS);
throw new DownsampleShardIndexerException(error, false);
}

Expand All @@ -199,6 +204,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumFailed()
+ "]";
logger.info(error);
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.FAILED);
throw new DownsampleShardIndexerException(error, false);
}

Expand All @@ -208,6 +214,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
ActionListener.noop()
);
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " completed");
downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.SUCCESS);
return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, Clu
static void realNodeOperation(
Client client,
IndicesService indicesService,
DownsampleMetrics downsampleMetrics,
DownsampleShardTask task,
DownsampleShardTaskParams params,
BytesRef lastDownsampledTsid
Expand All @@ -209,6 +210,7 @@ protected void doRun() throws Exception {
task,
client,
indicesService.indexServiceSafe(params.shardId().getIndex()),
downsampleMetrics,
params.shardId(),
params.downsampleIndex(),
params.downsampleConfig(),
Expand Down Expand Up @@ -303,17 +305,25 @@ public static class TA extends TransportAction<Request, ActionResponse.Empty> {

private final Client client;
private final IndicesService indicesService;
private final DownsampleMetrics downsampleMetrics;

@Inject
public TA(TransportService transportService, ActionFilters actionFilters, Client client, IndicesService indicesService) {
public TA(
TransportService transportService,
ActionFilters actionFilters,
Client client,
IndicesService indicesService,
DownsampleMetrics downsampleMetrics
) {
super(NAME, actionFilters, transportService.getTaskManager());
this.client = client;
this.indicesService = indicesService;
this.downsampleMetrics = downsampleMetrics;
}

@Override
protected void doExecute(Task t, Request request, ActionListener<ActionResponse.Empty> listener) {
realNodeOperation(client, indicesService, request.task, request.params, request.lastDownsampleTsid);
realNodeOperation(client, indicesService, downsampleMetrics, request.task, request.params, request.lastDownsampleTsid);
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ public class TransportDownsampleIndexerAction extends TransportBroadcastAction<
private final ClusterService clusterService;
private final IndicesService indicesService;

private final DownsampleMetrics downsampleMetrics;

@Inject
public TransportDownsampleIndexerAction(
Client client,
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
DownsampleMetrics downsampleMetrics
) {
super(
DownsampleIndexerAction.NAME,
Expand All @@ -74,6 +77,7 @@ public TransportDownsampleIndexerAction(
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.downsampleMetrics = downsampleMetrics;
}

@Override
Expand Down Expand Up @@ -139,6 +143,7 @@ protected DownsampleIndexerAction.ShardDownsampleResponse shardOperation(
(DownsampleShardTask) task,
client,
indexService,
downsampleMetrics,
request.shardId(),
request.getDownsampleIndex(),
request.getRollupConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchResponseUtils;
Expand All @@ -80,6 +81,8 @@
import org.elasticsearch.tasks.TaskCancelHelper;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand Down Expand Up @@ -162,7 +165,8 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
Downsample.class,
AggregateMetricMapperPlugin.class,
DataStreamsPlugin.class,
IndexLifecycle.class
IndexLifecycle.class,
TestTelemetryPlugin.class
);
}

Expand Down Expand Up @@ -623,6 +627,7 @@ public void testCancelDownsampleIndexer() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand Down Expand Up @@ -672,6 +677,7 @@ public void testDownsampleBulkFailed() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand Down Expand Up @@ -739,6 +745,7 @@ public void testTooManyBytesInFlight() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand Down Expand Up @@ -791,6 +798,7 @@ public void testDownsampleStats() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand All @@ -810,6 +818,17 @@ public void testDownsampleStats() throws IOException {

assertDownsampleIndexer(indexService, shardNum, task, executeResponse, task.getTotalShardDocCount());
}

// Check that metrics get collected as expected.
final TestTelemetryPlugin plugin = getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();
List<Measurement> measurements = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_SHARD);
assertFalse(measurements.isEmpty());
Measurement measurement = measurements.get(0);
assertTrue(measurement.value().toString(), measurement.value().longValue() >= 0 && measurement.value().longValue() < 1000_000);
assertEquals(1, measurement.attributes().size());
assertEquals("success", measurement.attributes().get("status"));
}

public void testResumeDownsample() throws IOException {
Expand Down Expand Up @@ -848,6 +867,7 @@ public void testResumeDownsample() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand Down Expand Up @@ -923,6 +943,7 @@ public void testResumeDownsamplePartial() throws IOException {
task,
client(),
indexService,
getInstanceFromNode(DownsampleMetrics.class),
shard.shardId(),
downsampleIndex,
config,
Expand Down