Skip to content

Commit

Permalink
refactor: remove duplicated metrics already existing at RSM level
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Sep 27, 2023
1 parent 016672c commit 7c2ebc9
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadat

log.info("Copying log segment data, metadata: {}", remoteLogSegmentMetadata);

metrics.recordSegmentCopy(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
remoteLogSegmentMetadata.segmentSizeInBytes());

final var customMetadataBuilder =
new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata);

Expand Down Expand Up @@ -242,8 +239,6 @@ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadat
uploadIndexFile(remoteLogSegmentMetadata, leaderEpoch, LEADER_EPOCH, encryptionMetadata,
customMetadataBuilder);
} catch (final Exception e) {
metrics.recordSegmentCopyError(remoteLogSegmentMetadata.remoteLogSegmentId()
.topicIdPartition().topicPartition());
throw new RemoteStorageException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@

import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.OBJECT_UPLOAD_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_ERRORS;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_COPY_TIME;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE_ERRORS;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_DELETE_TIME;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.SEGMENT_FETCH_REQUESTED_BYTES;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.objectTypeTags;
import static io.aiven.kafka.tieredstorage.metrics.MetricsRegistry.sensorName;
Expand Down Expand Up @@ -79,51 +75,6 @@ public Metrics(final Time time, final MetricConfig metricConfig) {
metricsRegistry = new MetricsRegistry();
}

public void recordSegmentCopy(final TopicPartition topicPartition, final long bytes) {
recordSegmentCopyRequests(topicPartition);
recordSegmentCopyBytes(topicPartition, bytes);
}

private void recordSegmentCopyBytes(final TopicPartition topicPartition, final long bytes) {
new SensorProvider(metrics, sensorName(SEGMENT_COPY_BYTES))
.with(metricsRegistry.segmentCopyBytesRate, new Rate())
.with(metricsRegistry.segmentCopyBytesTotal, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics, sensorNameByTopic(topicPartition, SEGMENT_COPY_BYTES),
() -> topicTags(topicPartition))
.with(metricsRegistry.segmentCopyBytesRateByTopic, new Rate())
.with(metricsRegistry.segmentCopyBytesTotalByTopic, new CumulativeSum())
.get()
.record(bytes);
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, SEGMENT_COPY_BYTES),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.segmentCopyBytesRateByTopicPartition, new Rate())
.with(metricsRegistry.segmentCopyBytesTotalByTopicPartition, new CumulativeSum())
.get()
.record(bytes);
}

private void recordSegmentCopyRequests(final TopicPartition topicPartition) {
new SensorProvider(metrics, sensorName(SEGMENT_COPY))
.with(metricsRegistry.segmentCopyRequestsRate, new Rate())
.with(metricsRegistry.segmentCopyRequestsTotal, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopic(topicPartition, SEGMENT_COPY),
() -> topicTags(topicPartition))
.with(metricsRegistry.segmentCopyRequestsRateByTopic, new Rate())
.with(metricsRegistry.segmentCopyRequestsTotalByTopic, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, SEGMENT_COPY),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.segmentCopyRequestsRateByTopicPartition, new Rate())
.with(metricsRegistry.segmentCopyRequestsTotalByTopicPartition, new CumulativeCount())
.get()
.record();
}

public void recordSegmentCopyTime(final TopicPartition topicPartition, final long startMs, final long endMs) {
final var time = endMs - startMs;
new SensorProvider(metrics, sensorName(SEGMENT_COPY_TIME))
Expand All @@ -145,26 +96,6 @@ public void recordSegmentCopyTime(final TopicPartition topicPartition, final lon
.record(time);
}

public void recordSegmentCopyError(final TopicPartition topicPartition) {
new SensorProvider(metrics, sensorName(SEGMENT_COPY_ERRORS))
.with(metricsRegistry.segmentCopyErrorsRate, new Rate())
.with(metricsRegistry.segmentCopyErrorsTotal, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopic(topicPartition, SEGMENT_COPY_ERRORS),
() -> topicTags(topicPartition))
.with(metricsRegistry.segmentCopyErrorsRateByTopic, new Rate())
.with(metricsRegistry.segmentCopyErrorsTotalByTopic, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, SEGMENT_COPY_ERRORS),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.segmentCopyErrorsRateByTopicPartition, new Rate())
.with(metricsRegistry.segmentCopyErrorsTotalByTopicPartition, new CumulativeCount())
.get()
.record();
}

public void recordSegmentDelete(final TopicPartition topicPartition, final long bytes) {
recordSegmentDeleteRequests(topicPartition);
recordSegmentDeleteBytes(topicPartition, bytes);
Expand Down Expand Up @@ -256,7 +187,6 @@ public void recordSegmentDeleteError(final TopicPartition topicPartition) {
}

public void recordSegmentFetch(final TopicPartition topicPartition, final long bytes) {
recordSegmentFetchRequests(topicPartition);
recordSegmentFetchRequestedBytes(topicPartition, bytes);
}

Expand All @@ -280,26 +210,6 @@ private void recordSegmentFetchRequestedBytes(final TopicPartition topicPartitio
.record(bytes);
}

private void recordSegmentFetchRequests(final TopicPartition topicPartition) {
new SensorProvider(metrics, sensorName(SEGMENT_FETCH))
.with(metricsRegistry.segmentFetchRequestsRate, new Rate())
.with(metricsRegistry.segmentFetchRequestsTotal, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopic(topicPartition, SEGMENT_FETCH),
() -> topicTags(topicPartition))
.with(metricsRegistry.segmentFetchRequestsRateByTopic, new Rate())
.with(metricsRegistry.segmentFetchRequestsTotalByTopic, new CumulativeCount())
.get()
.record();
new SensorProvider(metrics, sensorNameByTopicPartition(topicPartition, SEGMENT_FETCH),
() -> topicPartitionTags(topicPartition), Sensor.RecordingLevel.DEBUG)
.with(metricsRegistry.segmentFetchRequestsRateByTopicPartition, new Rate())
.with(metricsRegistry.segmentFetchRequestsTotalByTopicPartition, new CumulativeCount())
.get()
.record();
}

public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
final long bytes) {
recordObjectUploadRequests(topicPartition, suffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,6 @@ public class MetricsRegistry {

// Segment copy metric names
static final String SEGMENT_COPY = "segment-copy";
static final String SEGMENT_COPY_RATE = SEGMENT_COPY + "-rate";
final MetricNameTemplate segmentCopyRequestsRate = new MetricNameTemplate(SEGMENT_COPY_RATE, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyRequestsRateByTopic =
new MetricNameTemplate(SEGMENT_COPY_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyRequestsRateByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_COPY_TOTAL = SEGMENT_COPY + "-total";
final MetricNameTemplate segmentCopyRequestsTotal = new MetricNameTemplate(SEGMENT_COPY_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyRequestsTotalByTopic =
new MetricNameTemplate(SEGMENT_COPY_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyRequestsTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_COPY_BYTES = SEGMENT_COPY + "-bytes";
static final String SEGMENT_COPY_BYTES_RATE = SEGMENT_COPY_BYTES + "-rate";
final MetricNameTemplate segmentCopyBytesRate = new MetricNameTemplate(SEGMENT_COPY_BYTES_RATE, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyBytesRateByTopic =
new MetricNameTemplate(SEGMENT_COPY_BYTES_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyBytesRateByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_BYTES_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
public static final String SEGMENT_COPY_BYTES_TOTAL = SEGMENT_COPY_BYTES + "-total";
final MetricNameTemplate segmentCopyBytesTotal = new MetricNameTemplate(SEGMENT_COPY_BYTES_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyBytesTotalByTopic =
new MetricNameTemplate(SEGMENT_COPY_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyBytesTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_BYTES_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_COPY_TIME = SEGMENT_COPY + "-time";
static final String SEGMENT_COPY_TIME_AVG = SEGMENT_COPY_TIME + "-avg";
final MetricNameTemplate segmentCopyTimeAvg = new MetricNameTemplate(SEGMENT_COPY_TIME_AVG, METRIC_GROUP, "");
Expand All @@ -76,20 +51,6 @@ public class MetricsRegistry {
new MetricNameTemplate(SEGMENT_COPY_TIME_MAX, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyTimeMaxByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_TIME_MAX, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_COPY_ERRORS = SEGMENT_COPY + "-errors";
static final String SEGMENT_COPY_ERRORS_RATE = SEGMENT_COPY_ERRORS + "-rate";
final MetricNameTemplate segmentCopyErrorsRate = new MetricNameTemplate(SEGMENT_COPY_ERRORS_RATE, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyErrorsRateByTopic =
new MetricNameTemplate(SEGMENT_COPY_ERRORS_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyErrorsRateByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_ERRORS_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_COPY_ERRORS_TOTAL = SEGMENT_COPY_ERRORS + "-total";
final MetricNameTemplate segmentCopyErrorsTotal =
new MetricNameTemplate(SEGMENT_COPY_ERRORS_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate segmentCopyErrorsTotalByTopic =
new MetricNameTemplate(SEGMENT_COPY_ERRORS_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentCopyErrorsTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_COPY_ERRORS_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);

// Segment delete metric names
static final String SEGMENT_DELETE = "segment-delete";
Expand Down Expand Up @@ -156,18 +117,6 @@ public class MetricsRegistry {

// Segment fetch metric names
static final String SEGMENT_FETCH = "segment-fetch";
static final String SEGMENT_FETCH_RATE = SEGMENT_FETCH + "-rate";
final MetricNameTemplate segmentFetchRequestsRate = new MetricNameTemplate(SEGMENT_FETCH_RATE, METRIC_GROUP, "");
final MetricNameTemplate segmentFetchRequestsRateByTopic =
new MetricNameTemplate(SEGMENT_FETCH_RATE, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentFetchRequestsRateByTopicPartition =
new MetricNameTemplate(SEGMENT_FETCH_RATE, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_FETCH_TOTAL = SEGMENT_FETCH + "-total";
final MetricNameTemplate segmentFetchRequestsTotal = new MetricNameTemplate(SEGMENT_FETCH_TOTAL, METRIC_GROUP, "");
final MetricNameTemplate segmentFetchRequestsTotalByTopic =
new MetricNameTemplate(SEGMENT_FETCH_TOTAL, METRIC_GROUP, "", TOPIC_TAG_NAMES);
final MetricNameTemplate segmentFetchRequestsTotalByTopicPartition =
new MetricNameTemplate(SEGMENT_FETCH_TOTAL, METRIC_GROUP, "", TOPIC_PARTITION_TAG_NAMES);
static final String SEGMENT_FETCH_REQUESTED_BYTES = SEGMENT_FETCH + "-requested-bytes";
static final String SEGMENT_FETCH_REQUESTED_BYTES_RATE = SEGMENT_FETCH_REQUESTED_BYTES + "-rate";
final MetricNameTemplate segmentFetchRequestedBytesRate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,6 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
final ObjectName metricName = ObjectName.getInstance(objectName);

// upload related metrics
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-total"))
.isEqualTo(3.0);
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);

assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-bytes-rate"))
.isEqualTo(30.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-bytes-total"))
.isEqualTo(30.0);

assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-time-avg"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-copy-time-max"))
Expand Down Expand Up @@ -195,11 +185,6 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J

rsm.fetchLogSegment(REMOTE_LOG_SEGMENT_METADATA, 0);

assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-rate"))
.isEqualTo(2.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-total"))
.isEqualTo(2.0);

assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-requested-bytes-rate"))
.isEqualTo(20.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(metricName, "segment-fetch-requested-bytes-total"))
Expand Down Expand Up @@ -265,33 +250,6 @@ void metricsErrorsShouldBeReported(final String tags) throws JMException {
final ObjectName rsmMetricsName = ObjectName.getInstance(
"aiven.kafka.server.tieredstorage:type=remote-storage-manager-metrics" + tags);

// checking that upload fails with expected exceptions
assertThatThrownBy(() -> rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCause(testException);
assertThatThrownBy(() -> rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCause(testException);
assertThatThrownBy(() -> rsm.copyLogSegmentData(REMOTE_LOG_SEGMENT_METADATA, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCause(testException);

// verifying uploading failure metrics
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-total"))
.isEqualTo(3.0);

assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-bytes-rate"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-bytes-total"))
.isEqualTo(30.0);

assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-errors-rate"))
.isEqualTo(3.0 / METRIC_TIME_WINDOW_SEC);
assertThat(MBEAN_SERVER.getAttribute(rsmMetricsName, "segment-copy-errors-total"))
.isEqualTo(3.0);

// checking that deletion fails with expected exceptions
assertThatThrownBy(() -> rsm.deleteLogSegmentData(REMOTE_LOG_SEGMENT_METADATA))
.isInstanceOf(RemoteStorageException.class)
Expand Down

0 comments on commit 7c2ebc9

Please sign in to comment.