Skip to content

Commit

Permalink
feat: add latency metrics for S3 API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 3, 2023
1 parent cca6d9b commit fcf1817
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,40 +125,80 @@ void metricsShouldBeReported() throws Exception {
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-rate"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-avg"))
.isEqualTo(Double.NaN);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-max"))
.isEqualTo(Double.NaN);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
Expand All @@ -42,10 +46,13 @@
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.THROTTLING;

class MetricCollector implements MetricPublisher {
private static final Logger log = LoggerFactory.getLogger(MetricCollector.class);

private final org.apache.kafka.common.metrics.Metrics metrics;

private static final String METRIC_GROUP = "s3-metrics";
private final Map<String, Sensor> requestMetrics = new HashMap<>();
private final Map<String, Sensor> latencyMetrics = new HashMap<>();
private final Map<String, Sensor> errorMetrics = new HashMap<>();

MetricCollector() {
Expand All @@ -55,41 +62,71 @@ class MetricCollector implements MetricPublisher {
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.s3")
);
requestMetrics.put("GetObject", createSensor("get-object-requests"));
requestMetrics.put("UploadPart", createSensor("upload-part-requests"));
requestMetrics.put("CreateMultipartUpload", createSensor("create-multipart-upload-requests"));
requestMetrics.put("CompleteMultipartUpload", createSensor("complete-multipart-upload-requests"));
requestMetrics.put("PutObject", createSensor("put-object-requests"));
requestMetrics.put("DeleteObject", createSensor("delete-object-requests"));
requestMetrics.put("AbortMultipartUpload", createSensor("abort-multipart-upload-requests"));

errorMetrics.put(THROTTLING.toString(), createSensor("throttling-errors"));
errorMetrics.put(SERVER_ERROR.toString(), createSensor("server-errors"));
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createSensor("configured-timeout-errors"));
errorMetrics.put(IO.toString(), createSensor("io-errors"));
errorMetrics.put(OTHER.toString(), createSensor("other-errors"));
requestMetrics.put("GetObject", createRequestsSensor("get-object-requests"));
latencyMetrics.put("GetObject", createLatencySensor("get-object-time"));
requestMetrics.put("UploadPart", createRequestsSensor("upload-part-requests"));
latencyMetrics.put("UploadPart", createLatencySensor("upload-part-time"));
requestMetrics.put("CreateMultipartUpload", createRequestsSensor("create-multipart-upload-requests"));
latencyMetrics.put("CreateMultipartUpload", createLatencySensor("create-multipart-upload-time"));
requestMetrics.put("CompleteMultipartUpload", createRequestsSensor("complete-multipart-upload-requests"));
latencyMetrics.put("CompleteMultipartUpload", createLatencySensor("complete-multipart-upload-time"));
requestMetrics.put("PutObject", createRequestsSensor("put-object-requests"));
latencyMetrics.put("PutObject", createLatencySensor("put-object-time"));
requestMetrics.put("DeleteObject", createRequestsSensor("delete-object-requests"));
latencyMetrics.put("DeleteObject", createLatencySensor("delete-object-time"));
requestMetrics.put("AbortMultipartUpload", createRequestsSensor("abort-multipart-upload-requests"));
latencyMetrics.put("AbortMultipartUpload", createLatencySensor("abort-multipart-upload-time"));

errorMetrics.put(THROTTLING.toString(), createRequestsSensor("throttling-errors"));
errorMetrics.put(SERVER_ERROR.toString(), createRequestsSensor("server-errors"));
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createRequestsSensor("configured-timeout-errors"));
errorMetrics.put(IO.toString(), createRequestsSensor("io-errors"));
errorMetrics.put(OTHER.toString(), createRequestsSensor("other-errors"));
}

private Sensor createSensor(final String name) {
private Sensor createRequestsSensor(final String name) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-rate", METRIC_GROUP), new Rate());
sensor.add(metrics.metricName(name + "-total", METRIC_GROUP), new CumulativeCount());
return sensor;
}

private Sensor createLatencySensor(final String name) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-max", METRIC_GROUP), new Max());
sensor.add(metrics.metricName(name + "-avg", METRIC_GROUP), new Avg());
return sensor;
}

@Override
public void publish(final MetricCollection metricCollection) {
final List<String> metricValues = metricCollection.metricValues(CoreMetric.OPERATION_NAME);
for (final String metricValue : metricValues) {
final var sensor = requestMetrics.get(metricValue);
if (sensor != null) {
sensor.record();
// metrics are reported per request, so 1 value can be assumed.
if (metricValues.size() == 1) {
final var metricValue = metricValues.get(0);
final var requests = requestMetrics.get(metricValue);
if (requests != null) {
requests.record();
}

final var durations = metricCollection.metricValues(CoreMetric.API_CALL_DURATION);
if (durations.size() == 1) {
final var latency = latencyMetrics.get(metricValue);
if (latency != null) {
latency.record(durations.get(0).toMillis());
}
} else {
log.warn("Latencies included on metric collection is larger than 1: " + metricValues);
}
} else {
log.warn("Operations included on metric collection is larger than 1: " + metricValues);
}

final List<String> errorValues = metricCollection.childrenWithName("ApiCallAttempt")
.map(metricRecords -> metricRecords.metricValues(CoreMetric.ERROR_TYPE))
.flatMap(Collection::stream)
.collect(Collectors.toList());

for (final String errorValue : errorValues) {
final var sensor = errorMetrics.get(errorValue);
if (sensor != null) {
Expand Down

0 comments on commit fcf1817

Please sign in to comment.