Skip to content

Commit

Permalink
Add time-to-first-byte from remote metric
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Jun 8, 2023
1 parent ae757e1 commit cabbb9a
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 8 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="S3RemoteStorageManagerConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="ChunkManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="AbbreviationAsWordInName" files="DataKeyAndAADEqualsTest"/>
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
</suppressions>
14 changes: 12 additions & 2 deletions core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.aiven.kafka.tieredstorage.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.Metrics;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand All @@ -44,15 +45,18 @@ public class ChunkManager {
private final ObjectKey objectKey;
private final AesEncryptionProvider aesEncryptionProvider;
private final ChunkCache chunkCache;
private final Metrics metrics;

public ChunkManager(final ObjectFetcher fetcher,
final ObjectKey objectKey,
final AesEncryptionProvider aesEncryptionProvider,
final ChunkCache chunkCache) {
final ChunkCache chunkCache,
final Metrics metrics) {
this.fetcher = fetcher;
this.objectKey = objectKey;
this.aesEncryptionProvider = aesEncryptionProvider;
this.chunkCache = chunkCache;
this.metrics = metrics;
}

/**
Expand Down Expand Up @@ -114,6 +118,12 @@ private InputStream getChunkFromCache(final RemoteLogSegmentMetadata remoteLogSe
private InputStream getChunkFromStorage(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final Chunk chunk) throws StorageBackendException {
final String segmentKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
return fetcher.fetch(segmentKey, chunk.range());
final InputStream inputStream = fetcher.fetch(segmentKey, chunk.range());

if (metrics != null) {
return metrics.measureInputStreamFromRemote(inputStream);
} else {
return inputStream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void configure(final Map<String, ?> configs) {
fetcher,
objectKey,
aesEncryptionProvider,
config.chunkCache()
config.chunkCache(),
metrics
);

chunkSize = config.chunkSize();
Expand Down Expand Up @@ -303,6 +304,10 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet

}

private InputStream timeToFirstByteMeasureWrapper(final InputStream is) {
return is;
}

@Override
public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws RemoteStorageException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.tieredstorage.metrics;

import java.io.InputStream;
import java.util.List;

import org.apache.kafka.common.metrics.JmxReporter;
Expand All @@ -39,6 +40,7 @@ public class Metrics {
private final Sensor segmentCopyPerSec;
private final Sensor segmentCopyTime;
private final Sensor segmentFetchPerSec;
private final Sensor segmentFetchToFirstByteFromRemoteTime;

public Metrics(final Time time) {
this.time = time;
Expand All @@ -59,6 +61,14 @@ public Metrics(final Time time) {

segmentFetchPerSec = metrics.sensor("segment-fetch");
segmentFetchPerSec.add(metrics.metricName("segment-fetch-rate", metricGroup), new Rate());

// Measures the time between the first byte(s) of a segment was requested from the remote storage and
// it's being available.
segmentFetchToFirstByteFromRemoteTime = metrics.sensor("segment-fetch-to-first-byte-from-remote-time");
segmentFetchToFirstByteFromRemoteTime.add(
metrics.metricName("segment-fetch-to-first-byte-from-remote-time-avg", metricGroup), new Avg());
segmentFetchToFirstByteFromRemoteTime.add(metrics.metricName(
"segment-fetch-to-first-byte-from-remote-time-max", metricGroup), new Max());
}

public void recordSegmentCopy() {
Expand All @@ -73,6 +83,10 @@ public void recordSegmentFetch() {
segmentFetchPerSec.record();
}

public InputStream measureInputStreamFromRemote(final InputStream inputStream) {
return new TimeToFirstByteMeasuringInputStream(inputStream, time, segmentFetchToFirstByteFromRemoteTime);
}

public void close() {
try {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

/**
* The input stream that measures the time to first byte.
*
* <p>The time is measured on {@link #read()}, {@link #read(byte[], int, int)}, and {@link #skip(long)}.
* Otherwise, the class delegates everything to the inner {@link InputStream}.
*/
public class TimeToFirstByteMeasuringInputStream extends FilterInputStream {
private final Time time;
private final Sensor sensor;

private boolean firstByteArrived = false;

public TimeToFirstByteMeasuringInputStream(final InputStream in,
final Time time,
final Sensor sensor) {
super(Objects.requireNonNull(in, "in cannot be null"));
this.time = Objects.requireNonNull(time, "time cannot be null");
this.sensor = Objects.requireNonNull(sensor, "sensor cannot be null");
}

@Override
public int read() throws IOException {
if (firstByteArrived) {
return super.read();
}

final long startMs = time.milliseconds();
firstByteArrived = true;
final var r = super.read();
sensor.record(time.milliseconds() - startMs);
return r;
}

@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (firstByteArrived) {
return super.read(b, off, len);
}

final long startMs = time.milliseconds();
firstByteArrived = true;
final var r = super.read(b, off, len);
sensor.record(time.milliseconds() - startMs);
return r;
}

@Override
public long skip(final long n) throws IOException {
if (firstByteArrived) {
return super.skip(n);
}

final long startMs = time.milliseconds();
firstByteArrived = true;
final var r = super.skip(n);
sensor.record(time.milliseconds() - startMs);
return r;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void testGetChunk() throws StorageBackendException {
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);

final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null);
final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, null);
final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, null, null);
when(storage.fetch("test.log", chunkIndex.chunks().get(0).range()))
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));

Expand All @@ -96,7 +96,8 @@ void testGetChunkWithCaching() throws StorageBackendException {
storage,
objectKey,
null,
new UnboundInMemoryChunkCache()
new UnboundInMemoryChunkCache(),
null
);

assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasContent("0123456789");
Expand Down Expand Up @@ -133,7 +134,8 @@ void testGetChunkWithEncryption() throws Exception {
storage,
objectKey,
aesEncryptionProvider,
new UnboundInMemoryChunkCache()
new UnboundInMemoryChunkCache(),
null
);

assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
Expand Down Expand Up @@ -166,7 +168,8 @@ void testGetChunkWithCompression() throws Exception {
storage,
objectKey,
null,
new UnboundInMemoryChunkCache()
new UnboundInMemoryChunkCache(),
null
);

assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,12 @@ void metricsShouldBeReported() throws RemoteStorageException, JMException, IOExc

assertThat((double) MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "segment-fetch-rate"))
.isEqualTo(1.0 / METRIC_TIME_WINDOW_SEC);

assertThat((double) MBEAN_SERVER.getAttribute(
segmentCopyPerSecName, "segment-fetch-to-first-byte-from-remote-time-avg"))
.isZero();
assertThat((double) MBEAN_SERVER.getAttribute(
segmentCopyPerSecName, "segment-fetch-to-first-byte-from-remote-time-max"))
.isZero();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.metrics;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class TimeToFirstByteMeasuringInputStreamTest {
@Mock
Time time;
@Mock
Sensor sensor;
TimeToFirstByteMeasuringInputStream testInputStream;

@BeforeEach
void setup() {
when(time.milliseconds())
.thenReturn(0L)
.thenReturn(3L);

testInputStream = new TimeToFirstByteMeasuringInputStream(
new ByteArrayInputStream(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}),
time, sensor);
}

@Test
void testReadFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

assertThat(testInputStream.read()).isEqualTo(0);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(1);
assertThat(testInputStream.readAllBytes()).contains(2, 3, 4, 5, 6, 7, 8, 9);
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}

@Test
void testReadArrayFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

final byte[] b = new byte[3];
assertThat(testInputStream.read(b)).isEqualTo(3);
assertThat(b).contains(0, 1, 2);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(3);
assertThat(testInputStream.readAllBytes()).contains(4, 5, 6, 7, 8, 9);
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}

@Test
void testReadArrayWithOffsetAndLengthFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

final byte[] b = new byte[3];
assertThat(testInputStream.read(b, 0, 3)).isEqualTo(3);
assertThat(b).contains(0, 1, 2);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(3);
assertThat(testInputStream.readAllBytes()).contains(4, 5, 6, 7, 8, 9);
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}

@Test
void testReadAllBytesFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

assertThat(testInputStream.readAllBytes()).contains(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(-1);
assertThat(testInputStream.readAllBytes()).isEmpty();
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}

@Test
void testReadNBytesFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

assertThat(testInputStream.readNBytes(3)).contains(0, 1, 2);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(3);
assertThat(testInputStream.readAllBytes()).contains(4, 5, 6, 7, 8, 9);
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}

@Test
void testReadNBytesWithOffsetAndLengthFirst() throws IOException {
assertThat(testInputStream.available()).isEqualTo(10);

final byte[] b = new byte[3];
assertThat(testInputStream.readNBytes(b, 0, 3)).isEqualTo(3);
assertThat(b).contains(0, 1, 2);
verify(sensor).record(3.0);

assertThat(testInputStream.read()).isEqualTo(3);
assertThat(testInputStream.readAllBytes()).contains(4, 5, 6, 7, 8, 9);
assertThat(testInputStream.available()).isZero();
assertThat(testInputStream.read()).isEqualTo(-1);
verifyNoMoreInteractions(sensor);
}
}
Loading

0 comments on commit cabbb9a

Please sign in to comment.