Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add S3 latency metrics #397

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ static void setupS3() {
LOCALSTACK.getSecretKey()
)
)
)
.build();
final S3Client s3Client = clientBuilder.build();
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
);
try (final S3Client s3Client = clientBuilder.build()) {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET_NAME).build());
}
}

@BeforeEach
Expand Down Expand Up @@ -125,40 +125,80 @@ void metricsShouldBeReported() throws Exception {
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-rate"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-total"))
.isEqualTo(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-avg"))
.isEqualTo(Double.NaN);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-max"))
.isEqualTo(Double.NaN);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-total"))
.isEqualTo(2.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);

assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-total"))
.isEqualTo(1.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
Expand All @@ -42,10 +46,13 @@
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.THROTTLING;

class MetricCollector implements MetricPublisher {
private static final Logger log = LoggerFactory.getLogger(MetricCollector.class);

private final org.apache.kafka.common.metrics.Metrics metrics;

private static final String METRIC_GROUP = "s3-metrics";
private final Map<String, Sensor> requestMetrics = new HashMap<>();
private final Map<String, Sensor> latencyMetrics = new HashMap<>();
private final Map<String, Sensor> errorMetrics = new HashMap<>();

MetricCollector() {
Expand All @@ -55,41 +62,71 @@ class MetricCollector implements MetricPublisher {
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.s3")
);
requestMetrics.put("GetObject", createSensor("get-object-requests"));
requestMetrics.put("UploadPart", createSensor("upload-part-requests"));
requestMetrics.put("CreateMultipartUpload", createSensor("create-multipart-upload-requests"));
requestMetrics.put("CompleteMultipartUpload", createSensor("complete-multipart-upload-requests"));
requestMetrics.put("PutObject", createSensor("put-object-requests"));
requestMetrics.put("DeleteObject", createSensor("delete-object-requests"));
requestMetrics.put("AbortMultipartUpload", createSensor("abort-multipart-upload-requests"));

errorMetrics.put(THROTTLING.toString(), createSensor("throttling-errors"));
errorMetrics.put(SERVER_ERROR.toString(), createSensor("server-errors"));
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createSensor("configured-timeout-errors"));
errorMetrics.put(IO.toString(), createSensor("io-errors"));
errorMetrics.put(OTHER.toString(), createSensor("other-errors"));
requestMetrics.put("GetObject", createRequestsSensor("get-object-requests"));
latencyMetrics.put("GetObject", createLatencySensor("get-object-time"));
requestMetrics.put("UploadPart", createRequestsSensor("upload-part-requests"));
latencyMetrics.put("UploadPart", createLatencySensor("upload-part-time"));
requestMetrics.put("CreateMultipartUpload", createRequestsSensor("create-multipart-upload-requests"));
latencyMetrics.put("CreateMultipartUpload", createLatencySensor("create-multipart-upload-time"));
requestMetrics.put("CompleteMultipartUpload", createRequestsSensor("complete-multipart-upload-requests"));
latencyMetrics.put("CompleteMultipartUpload", createLatencySensor("complete-multipart-upload-time"));
requestMetrics.put("PutObject", createRequestsSensor("put-object-requests"));
latencyMetrics.put("PutObject", createLatencySensor("put-object-time"));
requestMetrics.put("DeleteObject", createRequestsSensor("delete-object-requests"));
latencyMetrics.put("DeleteObject", createLatencySensor("delete-object-time"));
requestMetrics.put("AbortMultipartUpload", createRequestsSensor("abort-multipart-upload-requests"));
latencyMetrics.put("AbortMultipartUpload", createLatencySensor("abort-multipart-upload-time"));

errorMetrics.put(THROTTLING.toString(), createRequestsSensor("throttling-errors"));
errorMetrics.put(SERVER_ERROR.toString(), createRequestsSensor("server-errors"));
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createRequestsSensor("configured-timeout-errors"));
errorMetrics.put(IO.toString(), createRequestsSensor("io-errors"));
errorMetrics.put(OTHER.toString(), createRequestsSensor("other-errors"));
}

private Sensor createSensor(final String name) {
private Sensor createRequestsSensor(final String name) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-rate", METRIC_GROUP), new Rate());
sensor.add(metrics.metricName(name + "-total", METRIC_GROUP), new CumulativeCount());
return sensor;
}

private Sensor createLatencySensor(final String name) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-max", METRIC_GROUP), new Max());
sensor.add(metrics.metricName(name + "-avg", METRIC_GROUP), new Avg());
return sensor;
}

@Override
public void publish(final MetricCollection metricCollection) {
final List<String> metricValues = metricCollection.metricValues(CoreMetric.OPERATION_NAME);
for (final String metricValue : metricValues) {
final var sensor = requestMetrics.get(metricValue);
if (sensor != null) {
sensor.record();
// metrics are reported per request, so 1 value can be assumed.
if (metricValues.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for loop was introduced intentionally after the discussion with @ivanyu and I guess it will not affect anything if what you specified in the comment is true, however both me and @ivanyu were not sure if that's the case. So could you elaborate how did you find out that the size will always be 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API is not clear about it, but docs mention the following: https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/metrics-list.html

Metrics collected with each request

So, my interpretation (and while testing this) is that each instance of metric collection corresponds to a single API call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I looked into the SDK code and seems you are right 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But OTOH what harm would it do it it remains a loop? It'd be more future-proof and we wouldn't expect this internal behavior to change in a casual dependency upgrade

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that we will not be able to understand to what request type the latency metric belongs if there will be multiple values AFAIU. Otherwise I would agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. That's very unfortunate. Well, let's rely on our tests as much as we can then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'm thinking that maybe it's worth logging a warning if the size is different from 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same actually, so yeah, lets have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Adding it now.

final var metricValue = metricValues.get(0);
final var requests = requestMetrics.get(metricValue);
if (requests != null) {
requests.record();
}

final var durations = metricCollection.metricValues(CoreMetric.API_CALL_DURATION);
if (durations.size() == 1) {
AnatolyPopov marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the same warning in the else here

final var latency = latencyMetrics.get(metricValue);
if (latency != null) {
latency.record(durations.get(0).toMillis());
}
} else {
log.warn("Latencies included on metric collection is larger than 1: " + metricValues);
}
} else {
log.warn("Operations included on metric collection is larger than 1: " + metricValues);
}

final List<String> errorValues = metricCollection.childrenWithName("ApiCallAttempt")
.map(metricRecords -> metricRecords.metricValues(CoreMetric.ERROR_TYPE))
.flatMap(Collection::stream)
.collect(Collectors.toList());

for (final String errorValue : errorValues) {
final var sensor = errorMetrics.get(errorValue);
if (sensor != null) {
Expand Down