diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java index 406b4d850..747446134 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java @@ -76,10 +76,10 @@ static void setupS3() { LOCALSTACK.getSecretKey() ) ) - ) - .build(); - final S3Client s3Client = clientBuilder.build(); - s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); + ); + try (final S3Client s3Client = clientBuilder.build()) { + s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build()); + } } @BeforeEach @@ -125,40 +125,80 @@ void metricsShouldBeReported() throws Exception { .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-requests-total")) .isEqualTo(2.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-rate")) .isEqualTo(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-total")) .isEqualTo(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-avg")) + .isEqualTo(Double.NaN); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-max")) + .isEqualTo(Double.NaN); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-rate")) .asInstanceOf(DOUBLE) .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-total")) .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-rate")) .asInstanceOf(DOUBLE) .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-total")) .isEqualTo(2.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-rate")) .asInstanceOf(DOUBLE) .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-total")) .isEqualTo(2.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-rate")) .asInstanceOf(DOUBLE) .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-total")) .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-rate")) .asInstanceOf(DOUBLE) .isGreaterThan(0.0); assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-total")) .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-avg")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); + assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-max")) + .asInstanceOf(DOUBLE) + .isGreaterThan(0.0); } } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/MetricCollector.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/MetricCollector.java index 907c7ff0e..704b53562 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/MetricCollector.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/MetricCollector.java @@ -27,10 +27,14 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.metrics.CoreMetric; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricPublisher; @@ -42,10 +46,13 @@ import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.THROTTLING; class MetricCollector implements MetricPublisher { + private static final Logger log = LoggerFactory.getLogger(MetricCollector.class); + private final org.apache.kafka.common.metrics.Metrics metrics; private static final String METRIC_GROUP = "s3-metrics"; private final Map requestMetrics = new HashMap<>(); + private final Map latencyMetrics = new HashMap<>(); private final Map errorMetrics = new HashMap<>(); MetricCollector() { @@ -55,41 +62,71 @@ class MetricCollector implements MetricPublisher { new MetricConfig(), List.of(reporter), Time.SYSTEM, new KafkaMetricsContext("aiven.kafka.server.tieredstorage.s3") ); - requestMetrics.put("GetObject", createSensor("get-object-requests")); - requestMetrics.put("UploadPart", createSensor("upload-part-requests")); - requestMetrics.put("CreateMultipartUpload", createSensor("create-multipart-upload-requests")); - requestMetrics.put("CompleteMultipartUpload", createSensor("complete-multipart-upload-requests")); - requestMetrics.put("PutObject", createSensor("put-object-requests")); - requestMetrics.put("DeleteObject", createSensor("delete-object-requests")); - requestMetrics.put("AbortMultipartUpload", createSensor("abort-multipart-upload-requests")); - - errorMetrics.put(THROTTLING.toString(), createSensor("throttling-errors")); - errorMetrics.put(SERVER_ERROR.toString(), createSensor("server-errors")); - errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createSensor("configured-timeout-errors")); - errorMetrics.put(IO.toString(), createSensor("io-errors")); - errorMetrics.put(OTHER.toString(), createSensor("other-errors")); + requestMetrics.put("GetObject", createRequestsSensor("get-object-requests")); + latencyMetrics.put("GetObject", createLatencySensor("get-object-time")); + requestMetrics.put("UploadPart", createRequestsSensor("upload-part-requests")); + latencyMetrics.put("UploadPart", createLatencySensor("upload-part-time")); + requestMetrics.put("CreateMultipartUpload", createRequestsSensor("create-multipart-upload-requests")); + latencyMetrics.put("CreateMultipartUpload", createLatencySensor("create-multipart-upload-time")); + requestMetrics.put("CompleteMultipartUpload", createRequestsSensor("complete-multipart-upload-requests")); + latencyMetrics.put("CompleteMultipartUpload", createLatencySensor("complete-multipart-upload-time")); + requestMetrics.put("PutObject", createRequestsSensor("put-object-requests")); + latencyMetrics.put("PutObject", createLatencySensor("put-object-time")); + requestMetrics.put("DeleteObject", createRequestsSensor("delete-object-requests")); + latencyMetrics.put("DeleteObject", createLatencySensor("delete-object-time")); + requestMetrics.put("AbortMultipartUpload", createRequestsSensor("abort-multipart-upload-requests")); + latencyMetrics.put("AbortMultipartUpload", createLatencySensor("abort-multipart-upload-time")); + + errorMetrics.put(THROTTLING.toString(), createRequestsSensor("throttling-errors")); + errorMetrics.put(SERVER_ERROR.toString(), createRequestsSensor("server-errors")); + errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createRequestsSensor("configured-timeout-errors")); + errorMetrics.put(IO.toString(), createRequestsSensor("io-errors")); + errorMetrics.put(OTHER.toString(), createRequestsSensor("other-errors")); } - private Sensor createSensor(final String name) { + private Sensor createRequestsSensor(final String name) { final Sensor sensor = metrics.sensor(name); sensor.add(metrics.metricName(name + "-rate", METRIC_GROUP), new Rate()); sensor.add(metrics.metricName(name + "-total", METRIC_GROUP), new CumulativeCount()); return sensor; } + private Sensor createLatencySensor(final String name) { + final Sensor sensor = metrics.sensor(name); + sensor.add(metrics.metricName(name + "-max", METRIC_GROUP), new Max()); + sensor.add(metrics.metricName(name + "-avg", METRIC_GROUP), new Avg()); + return sensor; + } + @Override public void publish(final MetricCollection metricCollection) { final List metricValues = metricCollection.metricValues(CoreMetric.OPERATION_NAME); - for (final String metricValue : metricValues) { - final var sensor = requestMetrics.get(metricValue); - if (sensor != null) { - sensor.record(); + // metrics are reported per request, so 1 value can be assumed. + if (metricValues.size() == 1) { + final var metricValue = metricValues.get(0); + final var requests = requestMetrics.get(metricValue); + if (requests != null) { + requests.record(); + } + + final var durations = metricCollection.metricValues(CoreMetric.API_CALL_DURATION); + if (durations.size() == 1) { + final var latency = latencyMetrics.get(metricValue); + if (latency != null) { + latency.record(durations.get(0).toMillis()); + } + } else { + log.warn("Latencies included on metric collection is larger than 1: " + metricValues); } + } else { + log.warn("Operations included on metric collection is larger than 1: " + metricValues); } + final List errorValues = metricCollection.childrenWithName("ApiCallAttempt") .map(metricRecords -> metricRecords.metricValues(CoreMetric.ERROR_TYPE)) .flatMap(Collection::stream) .collect(Collectors.toList()); + for (final String errorValue : errorValues) { final var sensor = errorMetrics.get(errorValue); if (sensor != null) {