From a2af99cb0041c429390f9f8ad61b18db07716617 Mon Sep 17 00:00:00 2001 From: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Date: Wed, 27 Mar 2024 15:07:56 +0200 Subject: [PATCH] Add metric for total downsampling latency (#106747) * Add DownsampleMetrics * replace singleton with injection * add comment * add comment * fix test * Metric for total downsampling latency * small fixes * make startTime a local variable --- .../xpack/downsample/DownsampleMetrics.java | 16 ++- .../downsample/DownsampleShardIndexer.java | 6 +- .../downsample/TransportDownsampleAction.java | 112 +++++++++++++----- .../DownsampleActionSingleNodeTests.java | 9 ++ 4 files changed, 108 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java index 576f40a8190f3..797b89ecf11a0 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleMetrics.java @@ -30,6 +30,7 @@ public class DownsampleMetrics extends AbstractLifecycleComponent { public static final String LATENCY_SHARD = "es.tsdb.downsample.latency.shard.histogram"; + public static final String LATENCY_TOTAL = "es.tsdb.downsample.latency.total.histogram"; private final MeterRegistry meterRegistry; @@ -41,6 +42,7 @@ public DownsampleMetrics(MeterRegistry meterRegistry) { protected void doStart() { // Register all metrics to track. meterRegistry.registerLongHistogram(LATENCY_SHARD, "Downsampling action latency per shard", "ms"); + meterRegistry.registerLongHistogram(LATENCY_TOTAL, "Downsampling latency end-to-end", "ms"); } @Override @@ -49,17 +51,17 @@ protected void doStop() {} @Override protected void doClose() throws IOException {} - enum ShardActionStatus { + enum ActionStatus { SUCCESS("success"), MISSING_DOCS("missing_docs"), FAILED("failed"); - public static final String NAME = "status"; + static final String NAME = "status"; private final String message; - ShardActionStatus(String message) { + ActionStatus(String message) { this.message = message; } @@ -68,7 +70,11 @@ String getMessage() { } } - void recordLatencyShard(long durationInMilliSeconds, ShardActionStatus status) { - meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ShardActionStatus.NAME, status.getMessage())); + void recordLatencyShard(long durationInMilliSeconds, ActionStatus status) { + meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage())); + } + + void recordLatencyTotal(long durationInMilliSeconds, ActionStatus status) { + meterRegistry.getLongHistogram(LATENCY_TOTAL).record(durationInMilliSeconds, Map.of(ActionStatus.NAME, status.getMessage())); } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 773dfbe897b50..59c1c9c38efae 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -191,7 +191,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept + task.getNumSent() + "]"; logger.info(error); - downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.MISSING_DOCS); + downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.MISSING_DOCS); throw new DownsampleShardIndexerException(error, false); } @@ -204,7 +204,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept + task.getNumFailed() + "]"; logger.info(error); - downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.FAILED); + downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.FAILED); throw new DownsampleShardIndexerException(error, false); } @@ -214,7 +214,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept ActionListener.noop() ); logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " completed"); - downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ShardActionStatus.SUCCESS); + downsampleMetrics.recordLatencyShard(duration.millis(), DownsampleMetrics.ActionStatus.SUCCESS); return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed()); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 58401451fa86b..c526561999497 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -91,6 +91,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -115,6 +116,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc private final IndexScopedSettings indexScopedSettings; private final ThreadContext threadContext; private final PersistentTasksService persistentTasksService; + private final DownsampleMetrics downsampleMetrics; private static final Set FORBIDDEN_SETTINGS = Set.of( IndexSettings.DEFAULT_PIPELINE.getKey(), @@ -153,7 +155,8 @@ public TransportDownsampleAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, IndexScopedSettings indexScopedSettings, - PersistentTasksService persistentTasksService + PersistentTasksService persistentTasksService, + DownsampleMetrics downsampleMetrics ) { super( DownsampleAction.NAME, @@ -173,6 +176,21 @@ public TransportDownsampleAction( this.threadContext = threadPool.getThreadContext(); this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR); this.persistentTasksService = persistentTasksService; + this.downsampleMetrics = downsampleMetrics; + } + + private void recordLatencyOnSuccess(long startTime) { + downsampleMetrics.recordLatencyTotal( + TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(), + DownsampleMetrics.ActionStatus.SUCCESS + ); + } + + private void recordLatencyOnFailure(long startTime) { + downsampleMetrics.recordLatencyTotal( + TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis(), + DownsampleMetrics.ActionStatus.FAILED + ); } @Override @@ -182,6 +200,7 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { + long startTime = client.threadPool().relativeTimeInMillis(); String sourceIndexName = request.getSourceIndex(); final IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY); @@ -236,7 +255,7 @@ protected void masterOperation( final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId()); // Short circuit if target index has been downsampled: final String downsampleIndexName = request.getTargetIndex(); - if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), state.metadata(), listener)) { + if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), startTime, state.metadata(), listener)) { logger.info("Skipping downsampling, because a previous execution already completed downsampling"); return; } @@ -325,6 +344,7 @@ protected void masterOperation( sourceIndexMetadata, downsampleIndexName, parentTask, + startTime, metricFields, labelFields, dimensionFields @@ -335,7 +355,14 @@ protected void masterOperation( }, e -> { if (e instanceof ResourceAlreadyExistsException) { var metadata = clusterService.state().metadata(); - if (canShortCircuit(request.getTargetIndex(), parentTask, request.getWaitTimeout(), metadata, listener)) { + if (canShortCircuit( + request.getTargetIndex(), + parentTask, + request.getWaitTimeout(), + startTime, + metadata, + listener + )) { logger.info("Downsample tasks are not created, because a previous execution already completed downsampling"); return; } @@ -345,6 +372,7 @@ protected void masterOperation( sourceIndexMetadata, downsampleIndexName, parentTask, + startTime, metricFields, labelFields, dimensionFields @@ -364,6 +392,7 @@ private boolean canShortCircuit( String targetIndexName, TaskId parentTask, TimeValue waitTimeout, + long startTime, Metadata metadata, ActionListener listener ) { @@ -391,7 +420,13 @@ private boolean canShortCircuit( .indices() .refresh( refreshRequest, - new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout) + new RefreshDownsampleIndexActionListener( + listener, + parentTask, + targetIndexMetadata.getIndex().getName(), + waitTimeout, + startTime + ) ); return true; } @@ -405,6 +440,7 @@ private void performShardDownsampling( IndexMetadata sourceIndexMetadata, String downsampleIndexName, TaskId parentTask, + long startTime, List metricFields, List labelFields, List dimensionFields @@ -414,6 +450,7 @@ private void performShardDownsampling( // NOTE: before we set the number of replicas to 0, as a result here we are // only dealing with primary shards. final AtomicInteger countDown = new AtomicInteger(numberOfShards); + final AtomicBoolean errorReported = new AtomicBoolean(false); for (int shardNum = 0; shardNum < numberOfShards; shardNum++) { final ShardId shardId = new ShardId(sourceIndex, shardNum); final String persistentTaskId = createPersistentTaskId( @@ -458,13 +495,16 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask listener, final IndexMetadata sourceIndexMetadata, final String downsampleIndexName, - final TaskId parentTask + final TaskId parentTask, + final long startTime ) { // 4. Make downsample index read-only and set the correct number of replicas final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true); @@ -527,7 +568,13 @@ private void updateTargetIndexSettingStep( .indices() .updateSettings( updateSettingsReq, - new UpdateDownsampleIndexSettingsActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout()) + new UpdateDownsampleIndexSettingsActionListener( + listener, + parentTask, + downsampleIndexName, + request.getWaitTimeout(), + startTime + ) ); } @@ -871,17 +918,20 @@ class UpdateDownsampleIndexSettingsActionListener implements ActionListener listener, final TaskId parentTask, final String downsampleIndexName, - final TimeValue timeout + final TimeValue timeout, + final long startTime ) { this.listener = listener; this.parentTask = parentTask; this.downsampleIndexName = downsampleIndexName; this.timeout = timeout; + this.startTime = startTime; } @Override @@ -890,11 +940,12 @@ public void onResponse(final AcknowledgedResponse response) { request.setParentTask(parentTask); client.admin() .indices() - .refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout)); + .refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout, startTime)); } @Override public void onFailure(Exception e) { + recordLatencyOnSuccess(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -909,17 +960,20 @@ class RefreshDownsampleIndexActionListener implements ActionListener actionListener, TaskId parentTask, final String downsampleIndexName, - final TimeValue timeout + final TimeValue timeout, + final long startTime ) { this.actionListener = actionListener; this.parentTask = parentTask; this.downsampleIndexName = downsampleIndexName; this.timeout = timeout; + this.startTime = startTime; } @Override @@ -930,7 +984,9 @@ public void onResponse(final BroadcastResponse response) { // Mark downsample index as "completed successfully" ("index.downsample.status": "success") taskQueue.submitTask( "update-downsample-metadata [" + downsampleIndexName + "]", - new DownsampleClusterStateUpdateTask(new ForceMergeActionListener(parentTask, downsampleIndexName, actionListener)) { + new DownsampleClusterStateUpdateTask( + new ForceMergeActionListener(parentTask, downsampleIndexName, startTime, actionListener) + ) { @Override public ClusterState execute(ClusterState currentState) { @@ -957,6 +1013,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { + recordLatencyOnSuccess(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -970,42 +1027,43 @@ class ForceMergeActionListener implements ActionListener { final ActionListener actionListener; private final TaskId parentTask; private final String downsampleIndexName; + private final long startTime; ForceMergeActionListener( final TaskId parentTask, final String downsampleIndexName, + final long startTime, final ActionListener onFailure ) { this.parentTask = parentTask; this.downsampleIndexName = downsampleIndexName; + this.startTime = startTime; this.actionListener = onFailure; } @Override public void onResponse(final AcknowledgedResponse response) { - /* - * At this point downsample index has been created - * successfully even force merge fails. - * So, we should not fail the downsmaple operation - */ ForceMergeRequest request = new ForceMergeRequest(downsampleIndexName); request.maxNumSegments(1); request.setParentTask(parentTask); - client.admin() - .indices() - .forceMerge(request, ActionListener.wrap(mergeIndexResp -> actionListener.onResponse(AcknowledgedResponse.TRUE), t -> { - /* - * At this point downsample index has been created - * successfully even if force merge failed. - * So, we should not fail the downsample operation. - */ - logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t); - actionListener.onResponse(AcknowledgedResponse.TRUE); - })); + client.admin().indices().forceMerge(request, ActionListener.wrap(mergeIndexResp -> { + actionListener.onResponse(AcknowledgedResponse.TRUE); + recordLatencyOnSuccess(startTime); + }, t -> { + /* + * At this point downsample index has been created + * successfully even if force merge failed. + * So, we should not fail the downsample operation. + */ + logger.error("Failed to force-merge downsample index [" + downsampleIndexName + "]", t); + actionListener.onResponse(AcknowledgedResponse.TRUE); + recordLatencyOnSuccess(startTime); + })); } @Override public void onFailure(Exception e) { + recordLatencyOnSuccess(startTime); this.actionListener.onFailure(e); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 4c5fdc23e04f9..5e0e6be61b9fc 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -824,6 +824,7 @@ public void testDownsampleStats() throws IOException { final TestTelemetryPlugin plugin = getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class) .findFirst() .orElseThrow(); + List measurements = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_SHARD); assertFalse(measurements.isEmpty()); for (Measurement measurement : measurements) { @@ -831,6 +832,14 @@ public void testDownsampleStats() throws IOException { assertEquals(1, measurement.attributes().size()); assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "failed", "missing_docs"))); } + + measurements = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_TOTAL); + assertFalse(measurements.isEmpty()); + for (Measurement measurement : measurements) { + assertTrue(measurement.value().toString(), measurement.value().longValue() >= 0 && measurement.value().longValue() < 1000_000); + assertEquals(1, measurement.attributes().size()); + assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "failed"))); + } } public void testResumeDownsample() throws IOException {