Skip to content

Commit

Permalink
refactor: redefine fetch part key based on range
Browse files Browse the repository at this point in the history
Now that cached content is based on a range defined by the fetch part, we can use the range to identify cached content.
  • Loading branch information
jeqo committed Sep 29, 2023
1 parent f718a2b commit ab5a9cf
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
import java.nio.file.Path;
import java.util.Objects;

import io.aiven.kafka.tieredstorage.storage.BytesRange;

public class FetchPartKey {
public final String segmentFileName;
public final int chunkId;
public final BytesRange range;

public FetchPartKey(final String objectKeyPath, final int chunkId) {
public FetchPartKey(final String objectKeyPath, final BytesRange range) {
Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null");
// get last part of segment path + chunk id, as it's used for creating file names
this.segmentFileName = Path.of(objectKeyPath).getFileName().toString();
this.chunkId = chunkId;
this.range = Objects.requireNonNull(range, "range cannot be null");
}

@Override
Expand All @@ -38,31 +40,24 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final FetchPartKey fetchPartKey = (FetchPartKey) o;

if (chunkId != fetchPartKey.chunkId) {
return false;
}
return Objects.equals(segmentFileName, fetchPartKey.segmentFileName);
final FetchPartKey that = (FetchPartKey) o;
return Objects.equals(segmentFileName, that.segmentFileName) && Objects.equals(range, that.range);
}

@Override
public int hashCode() {
int result = segmentFileName.hashCode();
result = 31 * result + chunkId;
return result;
return Objects.hash(segmentFileName, range);
}

@Override
public String toString() {
return "ChunkKey("
return "FetchPartKey("
+ "segmentFileName=" + segmentFileName
+ ", chunkId=" + chunkId
+ ", range=" + range
+ ")";
}

public String path() {
return segmentFileName + "-" + chunkId;
return segmentFileName + "-" + range.from + "-" + range.to;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected FetchCache(final FetchManager fetchManager) {
public InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException, IOException {
final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.firstChunkId);
final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
return cache.asMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.Uuid;

import io.aiven.kafka.tieredstorage.storage.BytesRange;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -28,38 +30,38 @@ class FetchPartKeyTest {

@Test
void identical() {
final var ck1 = new FetchPartKey(UUID_1, 0);
final var ck2 = new FetchPartKey(UUID_1, 0);
final var ck1 = new FetchPartKey(UUID_1, BytesRange.of(0, 9));
final var ck2 = new FetchPartKey(UUID_1, BytesRange.of(0, 9));
assertThat(ck1).isEqualTo(ck2);
assertThat(ck2).isEqualTo(ck1);
assertThat(ck1).hasSameHashCodeAs(ck2);
}

@Test
void differentUuid() {
final var ck1 = new FetchPartKey(UUID_1, 0);
final var ck2 = new FetchPartKey(UUID_2, 0);
final var ck1 = new FetchPartKey(UUID_1, BytesRange.of(0, 9));
final var ck2 = new FetchPartKey(UUID_2, BytesRange.of(0, 9));
assertThat(ck1).isNotEqualTo(ck2);
assertThat(ck2).isNotEqualTo(ck1);
assertThat(ck1).doesNotHaveSameHashCodeAs(ck2);
}

@Test
void differentChunkIds() {
final var ck1 = new FetchPartKey(UUID_1, 0);
final var ck2 = new FetchPartKey(UUID_1, 1);
final var ck1 = new FetchPartKey(UUID_1, BytesRange.of(0, 9));
final var ck2 = new FetchPartKey(UUID_1, BytesRange.of(10, 19));
assertThat(ck1).isNotEqualTo(ck2);
assertThat(ck2).isNotEqualTo(ck1);
assertThat(ck1).doesNotHaveSameHashCodeAs(ck2);
}

@Test
void singlePath() {
assertThat(new FetchPartKey("test", 0).path()).isEqualTo("test-0");
assertThat(new FetchPartKey("test", BytesRange.of(0, 9)).path()).isEqualTo("test-0-9");
}

@Test
void pathWitDir() {
assertThat(new FetchPartKey("parent/test", 0).path()).isEqualTo("test-0");
assertThat(new FetchPartKey("parent/test", BytesRange.of(0, 9)).path()).isEqualTo("test-0-9");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.aiven.kafka.tieredstorage.fetch.FetchManager;
import io.aiven.kafka.tieredstorage.fetch.FetchPartKey;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
Expand Down Expand Up @@ -66,9 +67,9 @@ class DiskBasedFetchCacheTest {
@BeforeEach
void setUp() {
diskBasedFetchCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"path", baseCachePath.toString()
"retention.ms", "-1",
"size", "-1",
"path", baseCachePath.toString()
));
cachePath = baseCachePath.resolve(CACHE_DIRECTORY);
tempCachePath = baseCachePath.resolve(TEMP_CACHE_DIRECTORY);
Expand All @@ -78,69 +79,75 @@ void setUp() {
void failedToMoveFromTempCache() {
try (final MockedStatic<Files> filesMockedStatic = mockStatic(Files.class, CALLS_REAL_METHODS)) {
filesMockedStatic.when(() -> Files.move(any(), any(), any()))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));

final ByteArrayInputStream chunkStream0 = new ByteArrayInputStream(CHUNK_0);
final ByteArrayInputStream chunkStream1 = new ByteArrayInputStream(CHUNK_1);
final FetchPartKey fetchPartKey0 = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey1 = new FetchPartKey(SEGMENT_ID, 1);
final FetchPartKey fetchPartKey0 = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));
final FetchPartKey fetchPartKey1 = new FetchPartKey(SEGMENT_ID, BytesRange.of(10, 19));

assertThatThrownBy(() -> diskBasedFetchCache.cachePartContent(fetchPartKey0, chunkStream0))
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);

assertThatThrownBy(() -> diskBasedFetchCache.cachePartContent(fetchPartKey1, chunkStream1))
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);

assertThat(cachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey0.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey0.range.from + "-" + fetchPartKey0.range.to));
assertThat(tempCachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey0.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey0.range.from + "-" + fetchPartKey0.range.to));

assertThat(cachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey1.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey1.range.from + "-" + fetchPartKey1.range.to));
assertThat(tempCachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey1.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey1.range.from + "-" + fetchPartKey1.range.to));
}
}

@Test
void cacheChunks() throws IOException {
final ByteArrayInputStream chunkStream0 = new ByteArrayInputStream(CHUNK_0);
final ByteArrayInputStream chunkStream1 = new ByteArrayInputStream(CHUNK_1);
final FetchPartKey fetchPartKey0 = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey1 = new FetchPartKey(SEGMENT_ID, 1);
final FetchPartKey fetchPartKey0 = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));
final FetchPartKey fetchPartKey1 = new FetchPartKey(SEGMENT_ID, BytesRange.of(10, 19));

final Path cachedChunkPath0 = diskBasedFetchCache.cachePartContent(fetchPartKey0, chunkStream0);
final Path cachedChunkPath1 = diskBasedFetchCache.cachePartContent(fetchPartKey1, chunkStream1);

assertThat(cachedChunkPath0).exists();
assertThat(diskBasedFetchCache.readCachedPartContent(cachedChunkPath0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);

assertThat(tempCachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey0.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey0.range.from + "-" + fetchPartKey0.range.to));

assertThat(cachedChunkPath1).exists();
assertThat(diskBasedFetchCache.readCachedPartContent(cachedChunkPath1))
.hasBinaryContent(CHUNK_1);
.hasBinaryContent(CHUNK_1);

assertThat(tempCachePath)
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-" + fetchPartKey1.chunkId));
.isDirectoryNotContaining(path -> path.endsWith(SEGMENT_ID + "-"
+ fetchPartKey1.range.from + "-" + fetchPartKey1.range.to));
}

@Test
void failsToReadFile() {
assertThatThrownBy(() -> diskBasedFetchCache.readCachedPartContent(Path.of("does_not_exists")))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IOException.class);
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IOException.class);
}

@Test
void weighsCorrectly() throws IOException {
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));

final Path cachedChunkPath = diskBasedFetchCache.cachePartContent(fetchPartKey, chunkStream);

Expand All @@ -151,13 +158,13 @@ void weighsCorrectly() throws IOException {
@Test
void weighingTooBigFiles() throws IOException {
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));

final Path cachedChunkPath = diskBasedFetchCache.cachePartContent(fetchPartKey, chunkStream);

try (final MockedStatic<Files> filesMockedStatic = mockStatic(Files.class, CALLS_REAL_METHODS)) {
filesMockedStatic.when(() -> Files.size(any()))
.thenReturn((long) Integer.MAX_VALUE + 1);
.thenReturn((long) Integer.MAX_VALUE + 1);
final Weigher<FetchPartKey, Path> weigher = diskBasedFetchCache.weigher();
assertThat(weigher.weigh(fetchPartKey, cachedChunkPath)).isEqualTo(Integer.MAX_VALUE);
}
Expand All @@ -166,24 +173,24 @@ void weighingTooBigFiles() throws IOException {
@Test
void weighingFails() throws IOException {
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));

final Path cachedChunkPath = diskBasedFetchCache.cachePartContent(fetchPartKey, chunkStream);
try (final MockedStatic<Files> filesMockedStatic = mockStatic(Files.class, CALLS_REAL_METHODS)) {
filesMockedStatic.when(() -> Files.size(any()))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));
final Weigher<FetchPartKey, Path> weigher = diskBasedFetchCache.weigher();
assertThatThrownBy(() -> weigher.weigh(fetchPartKey, cachedChunkPath))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IOException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(IOException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
}
}

@Test
void removesCorrectly() throws IOException {
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));

final Path cachedChunkPath = diskBasedFetchCache.cachePartContent(fetchPartKey, chunkStream);

Expand All @@ -195,30 +202,30 @@ void removesCorrectly() throws IOException {
@Test
void removalFails() throws IOException {
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, 0);
final FetchPartKey fetchPartKey = new FetchPartKey(SEGMENT_ID, BytesRange.of(0, 9));

final Path cachedChunkPath = diskBasedFetchCache.cachePartContent(fetchPartKey, chunkStream);

final RemovalListener<FetchPartKey, Path> removalListener = diskBasedFetchCache.removalListener();
try (final MockedStatic<Files> filesMockedStatic = mockStatic(Files.class, CALLS_REAL_METHODS)) {
filesMockedStatic.when(() -> Files.delete(any()))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));

assertThatNoException()
.isThrownBy(() -> removalListener.onRemoval(fetchPartKey, cachedChunkPath, RemovalCause.SIZE));
.isThrownBy(() -> removalListener.onRemoval(fetchPartKey, cachedChunkPath, RemovalCause.SIZE));
assertThat(cachedChunkPath).exists();
}
}

@Test
void cacheInitialized() {
final DiskBasedFetchCache spy = spy(
new DiskBasedFetchCache(mock(FetchManager.class))
new DiskBasedFetchCache(mock(FetchManager.class))
);
final Map<String, String> configs = Map.of(
"retention.ms", "-1",
"size", "-1",
"path", baseCachePath.toString()
"retention.ms", "-1",
"size", "-1",
"path", baseCachePath.toString()
);
spy.configure(configs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.io.ByteArrayInputStream;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.fetch.FetchManager;
import io.aiven.kafka.tieredstorage.fetch.FetchPart;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -60,7 +63,8 @@ class FetchCacheMetricsTest {
@Mock
SegmentManifest segmentManifest;
@Mock
FetchPart firstPart;
ChunkIndex chunkIndex;


private static Stream<Arguments> caches() {
return Stream.of(
Expand Down Expand Up @@ -88,6 +92,9 @@ void shouldRecordMetrics(final Class<FetchCache<?>> fetchCacheClass, final Map<S
// Given a fetch cache implementation
when(fetchManager.partContent(any(), any(), any()))
.thenReturn(new ByteArrayInputStream("test".getBytes()));
final var chunk = new Chunk(0, 0, 10, 0, 10);
when(chunkIndex.chunks()).thenReturn(List.of(chunk));
final FetchPart firstPart = new FetchPart(chunkIndex, chunk, 1);

final var fetchCache = fetchCacheClass.getDeclaredConstructor(FetchManager.class).newInstance(fetchManager);
fetchCache.configure(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte

verify(removalListener)
.onRemoval(
argThat(argument -> argument.chunkId == 0),
argThat(argument -> argument.range.from == 0),
any(),
eq(RemovalCause.EXPIRED));

Expand Down

0 comments on commit ab5a9cf

Please sign in to comment.