Skip to content

Commit

Permalink
feat: add metrics for chunk transform/detransform latency
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Sep 28, 2023
1 parent 7c2ebc9 commit 56f35b6
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataBuilder;
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataSerde;
import io.aiven.kafka.tieredstorage.metrics.MeteredInputStream;
import io.aiven.kafka.tieredstorage.metrics.Metrics;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
Expand Down Expand Up @@ -287,7 +288,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
try (
final var sis = new MeteredInputStream(
transformFinisher.toInputStream(),
time,
millis -> metrics.recordChunkTransformTime(
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
millis
))
) {
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
Expand Down Expand Up @@ -318,7 +327,15 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta

final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
try (final var in = transformFinisher.toInputStream()) {
try (
final var in = new MeteredInputStream(
transformFinisher.toInputStream(),
time,
millis -> metrics.recordChunkTransformTime(
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
millis
))
) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
Expand Down Expand Up @@ -382,8 +399,14 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var suffix = ObjectKey.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
.toInputStream();
return new MeteredInputStream(
new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range).toInputStream(),
time,
millis -> metrics.recordChunkDetransformTime(
remoteLogSegmentMetadata.topicIdPartition().topicPartition(),
millis
)
);
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;

import org.apache.kafka.common.utils.Time;

public class MeteredInputStream extends InputStream {
final InputStream inner;
final Time time;
long readingTime = 0L;
final Consumer<Long> recorder;

public MeteredInputStream(final InputStream inner,
final Time time,
final Consumer<Long> recorder) {
this.inner = inner;
this.time = time;
this.recorder = recorder;
}

@Override
public int read() throws IOException {
final var start = time.nanoseconds();
try {
return inner.read();
} finally {
readingTime += time.nanoseconds() - start;
}
}

@Override
public void close() throws IOException {
recorder.accept(readingTime / 1_000_000); // to millis
inner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.CHUNK_DETRANSFORM_TIME;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.CHUNK_TRANSFORM_TIME;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_TIME;
Expand Down Expand Up @@ -210,6 +212,46 @@ private void recordSegmentFetchRequestedBytes(final TopicPartition topicPartitio
.record(bytes);
}

public void recordChunkTransformTime(final TopicPartition topicPartition, final long millis) {
new SensorProvider(metrics, sensorName(CHUNK_TRANSFORM_TIME))
.with(metricsRegistry.chunkTransformTimeAvg, new Avg())
.with(metricsRegistry.chunkTransformTimeMax, new Max())
.get()
.record(millis);
new SensorProvider(metrics, sensorNameByTopic(topicPartition, CHUNK_TRANSFORM_TIME),
() -> topicTags(topicPartition))
.with(metricsRegistry.chunkTransformTimeAvgByTopic, new Avg())
.with(metricsRegistry.chunkTransformTimeMaxByTopic, new Max())
.get()
.record(millis);
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, CHUNK_TRANSFORM_TIME),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.chunkTransformTimeAvgByTopicPartition, new Avg())
.with(metricsRegistry.chunkTransformTimeMaxByTopicPartition, new Max())
.get()
.record(millis);
}

public void recordChunkDetransformTime(final TopicPartition topicPartition, final long millis) {
new SensorProvider(metrics, sensorName(CHUNK_DETRANSFORM_TIME))
.with(metricsRegistry.chunkDetransformTimeAvg, new Avg())
.with(metricsRegistry.chunkDetransformTimeMax, new Max())
.get()
.record(millis);
new SensorProvider(metrics, sensorNameByTopic(topicPartition, CHUNK_DETRANSFORM_TIME),
() -> topicTags(topicPartition))
.with(metricsRegistry.chunkDetransformTimeAvgByTopic, new Avg())
.with(metricsRegistry.chunkDetransformTimeMaxByTopic, new Max())
.get()
.record(millis);
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, CHUNK_DETRANSFORM_TIME),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.chunkDetransformTimeAvgByTopicPartition, new Avg())
.with(metricsRegistry.chunkDetransformTimeMaxByTopicPartition, new Max())
.get()
.record(millis);
}

public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
final long bytes) {
recordObjectUploadRequests(topicPartition, suffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,37 @@ public class MetricsRegistry {
final MetricNameTemplate segmentFetchRequestedBytesTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_FETCH_REQUESTED_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);

// Transform/Detransform metrics
static final String CHUNK_TRANSFORM_TIME = "chunk-transform-time";
static final String CHUNK_TRANSFORM_TIME_AVG = CHUNK_TRANSFORM_TIME + "-avg";
final MetricNameTemplate chunkTransformTimeAvg = new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "");
final MetricNameTemplate chunkTransformTimeAvgByTopic =
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate chunkTransformTimeAvgByTopicPartition =
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String CHUNK_TRANSFORM_TIME_MAX = CHUNK_TRANSFORM_TIME + "-max";
final MetricNameTemplate chunkTransformTimeMax = new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "");
final MetricNameTemplate chunkTransformTimeMaxByTopic =
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate chunkTransformTimeMaxByTopicPartition =
new MetricNameTemplate(CHUNK_TRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);

static final String CHUNK_DETRANSFORM_TIME = "chunk-detransform-time";
static final String CHUNK_DETRANSFORM_TIME_AVG = CHUNK_DETRANSFORM_TIME + "-avg";
final MetricNameTemplate chunkDetransformTimeAvg =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "");
final MetricNameTemplate chunkDetransformTimeAvgByTopic =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate chunkDetransformTimeAvgByTopicPartition =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_AVG, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String CHUNK_DETRANSFORM_TIME_MAX = CHUNK_DETRANSFORM_TIME + "-max";
final MetricNameTemplate chunkDetransformTimeMax =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "");
final MetricNameTemplate chunkDetransformTimeMaxByTopic =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate chunkDetransformTimeMaxByTopicPartition =
new MetricNameTemplate(CHUNK_DETRANSFORM_TIME_MAX, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);

// Object upload metrics
static final String OBJECT_UPLOAD = "object-upload";
static final String OBJECT_UPLOAD_RATE = OBJECT_UPLOAD + "-rate";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,18 @@ class RemoteStorageManagerMetricsTest {
1, 100, -1, -1, 1L,
LOG_SEGMENT_BYTES, Collections.singletonMap(1, 100L));

Time time;
RemoteStorageManager rsm;
LogSegmentData logSegmentData;

private Map<String, Object> configs;

@BeforeEach
void setup(@TempDir final Path tmpDir,
@Mock final Time time) throws IOException {
when(time.milliseconds()).thenReturn(0L);
rsm = new RemoteStorageManager(time);
@Mock final Time time) throws IOException {
this.time = time;
when(this.time.milliseconds()).thenReturn(0L);
rsm = new RemoteStorageManager(this.time);

final Path target = tmpDir.resolve("target");
Files.createDirectories(target);
Expand Down Expand Up @@ -117,6 +119,7 @@ void setup(@TempDir final Path tmpDir,
void metricsShouldBeReported(final String tags) throws RemoteStorageException, JMException {
rsm.configure(configs);

when(time.nanoseconds()).thenReturn(0L, 1000000L);
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
logSegmentData.leaderEpochIndex().flip(); // so leader epoch can be consumed again
rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData);
Expand All @@ -133,6 +136,11 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-time-max"))
.isEqualTo(0.0);

assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-transform-time-avg"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-transform-time-max"))
.isEqualTo(1.0);

assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-total"))
.isEqualTo(18.0);
assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-rate"))
Expand Down Expand Up @@ -177,13 +185,28 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
final var segmentManifestCacheObjectName =
new ObjectName("aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache");

rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);
when(time.nanoseconds()).thenReturn(0L, 1000000L);
try (final var inputStream = rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0)) {
inputStream.readAllBytes();
} catch (final IOException e) {
throw new RuntimeException(e);
}

assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-detransform-time-avg"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(metricName, "chunk-detransform-time-max"))
.isEqualTo(1.0);

// check cache size increases after first miss
assertThat(MBEAN_SERVER.getAttribute(segmentManifestCacheObjectName, "cache-size-total"))
.isEqualTo(1.0);

rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);
when(time.nanoseconds()).thenReturn(0L, 1000000L);
try (final var inputStream = rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0)) {
inputStream.readAllBytes();
} catch (final IOException e) {
throw new RuntimeException(e);
}

assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-requested-bytes-rate"))
.isEqualTo(20.0 / METRIC_TIME_WINDOW_SEC);
Expand Down

0 comments on commit 56f35b6

Please sign in to comment.