Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric for total downsampling latency #106747

Merged
merged 12 commits into from
Mar 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DownsampleMetrics extends AbstractLifecycleComponent {

public static final String LATENCY_SHARD = "es.tsdb.downsample.latency.shard.histogram";
public static final String LATENCY_MASTER = "es.tsdb.downsample.latency.master.histogram";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The master part is leaking implementation details that may change in the future. I think that maybe api latency or operation latency is a better name here. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to total.


private final MeterRegistry meterRegistry;

Expand All @@ -41,6 +42,7 @@ public DownsampleMetrics(MeterRegistry meterRegistry) {
protected void doStart() {
// Register all metrics to track.
meterRegistry.registerLongHistogram(LATENCY_SHARD, "Downsampling action latency per shard", "ms");
meterRegistry.registerLongHistogram(LATENCY_MASTER, "Downsampling latency on master side", "ms");
}

@Override
Expand All @@ -55,7 +57,7 @@ enum ShardActionStatus {
MISSING_DOCS("missing_docs"),
FAILED("failed");

public static final String NAME = "status";
static final String NAME = "status";

private final String message;

Expand All @@ -71,4 +73,8 @@ String getMessage() {
void recordLatencyShard(long durationInMilliSeconds, ShardActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_SHARD).record(durationInMilliSeconds, Map.of(ShardActionStatus.NAME, status.getMessage()));
}

void recordLatencyMaster(long durationInMilliSeconds, ShardActionStatus status) {
meterRegistry.getLongHistogram(LATENCY_MASTER).record(durationInMilliSeconds, Map.of(ShardActionStatus.NAME, status.getMessage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand All @@ -115,6 +116,8 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
private final IndexScopedSettings indexScopedSettings;
private final ThreadContext threadContext;
private final PersistentTasksService persistentTasksService;
private final DownsampleMetrics downsampleMetrics;
private final long startTime;
kkrik-es marked this conversation as resolved.
Show resolved Hide resolved

private static final Set<String> FORBIDDEN_SETTINGS = Set.of(
IndexSettings.DEFAULT_PIPELINE.getKey(),
Expand Down Expand Up @@ -153,7 +156,8 @@ public TransportDownsampleAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings,
PersistentTasksService persistentTasksService
PersistentTasksService persistentTasksService,
DownsampleMetrics downsampleMetrics
) {
super(
DownsampleAction.NAME,
Expand All @@ -173,6 +177,12 @@ public TransportDownsampleAction(
this.threadContext = threadPool.getThreadContext();
this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR);
this.persistentTasksService = persistentTasksService;
this.downsampleMetrics = downsampleMetrics;
this.startTime = client.threadPool().relativeTimeInMillis();
Copy link
Contributor

@lkts lkts Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my education - we actually create new instance of action for every invocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. My understanding is that actions are initialized once in the plugin, then their methods (masterOperation) are called per incoming request.

}

private long getDurationInMillis() {
return TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - startTime).getMillis();
}

@Override
Expand Down Expand Up @@ -414,6 +424,7 @@ private void performShardDownsampling(
// NOTE: before we set the number of replicas to 0, as a result here we are
// only dealing with primary shards.
final AtomicInteger countDown = new AtomicInteger(numberOfShards);
final AtomicBoolean errorReported = new AtomicBoolean(false);
for (int shardNum = 0; shardNum < numberOfShards; shardNum++) {
final ShardId shardId = new ShardId(sourceIndex, shardNum);
final String persistentTaskId = createPersistentTaskId(
Expand Down Expand Up @@ -465,6 +476,9 @@ public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTa
@Override
public void onFailure(Exception e) {
logger.error("error while waiting for downsampling persistent task", e);
if (errorReported.getAndSet(true) == false) {
downsampleMetrics.recordLatencyMaster(getDurationInMillis(), DownsampleMetrics.ShardActionStatus.FAILED);
kkrik-es marked this conversation as resolved.
Show resolved Hide resolved
}
listener.onFailure(e);
}
};
Expand Down Expand Up @@ -529,6 +543,9 @@ private void updateTargetIndexSettingStep(
updateSettingsReq,
new UpdateDownsampleIndexSettingsActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
);

// Record latency for downsampling operation.
downsampleMetrics.recordLatencyMaster(getDurationInMillis(), DownsampleMetrics.ShardActionStatus.SUCCESS);
kkrik-es marked this conversation as resolved.
Show resolved Hide resolved
}

private static DownsampleShardTaskParams createPersistentTaskParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,22 @@ public void testDownsampleStats() throws IOException {
final TestTelemetryPlugin plugin = getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

List<Measurement> measurements = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_SHARD);
assertFalse(measurements.isEmpty());
for (Measurement measurement : measurements) {
assertTrue(measurement.value().toString(), measurement.value().longValue() >= 0 && measurement.value().longValue() < 1000_000);
assertEquals(1, measurement.attributes().size());
assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "failed", "missing_docs")));
}

measurements = plugin.getLongHistogramMeasurement(DownsampleMetrics.LATENCY_MASTER);
assertFalse(measurements.isEmpty());
for (Measurement measurement : measurements) {
assertTrue(measurement.value().toString(), measurement.value().longValue() >= 0 && measurement.value().longValue() < 1000_000);
assertEquals(1, measurement.attributes().size());
assertThat(measurement.attributes().get("status"), Matchers.in(List.of("success", "failed")));
}
}

public void testResumeDownsample() throws IOException {
Expand Down