Skip to content

Commit

Permalink
fixup! feat: concat indexes in the same object
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 9, 2023
1 parent fd228e3 commit 867d8b0
Showing 1 changed file with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ChunkCacheTest {
private static final byte[] CHUNK_0 = "0123456789".getBytes();
private static final byte[] CHUNK_1 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10);
static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder()
private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder()
.add(IndexType.OFFSET, 1)
.add(IndexType.TIMESTAMP, 1)
.add(IndexType.PRODUCER_SNAPSHOT, 1)
Expand Down Expand Up @@ -105,16 +105,16 @@ class CacheTests {
void setUp() throws Exception {
doAnswer(invocation -> removalListener).when(chunkCache).removalListener();
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0));
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0));
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1));
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1));
}

@Test
void noEviction() throws IOException, StorageBackendException {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1"
"retention.ms", "-1",
"size", "-1"
));

final InputStream chunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
Expand All @@ -137,69 +137,69 @@ void noEviction() throws IOException, StorageBackendException {
@Test
void timeBasedEviction() throws IOException, StorageBackendException, InterruptedException {
chunkCache.configure(Map.of(
"retention.ms", "100",
"size", "-1"
"retention.ms", "100",
"size", "-1"
));

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

Thread.sleep(100);

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.hasBinaryContent(CHUNK_1);
.hasBinaryContent(CHUNK_1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.hasBinaryContent(CHUNK_1);
.hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(chunkManager);

await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100))
.until(() -> !mockingDetails(removalListener).getInvocations().isEmpty());
.until(() -> !mockingDetails(removalListener).getInvocations().isEmpty());

verify(removalListener)
.onRemoval(
argThat(argument -> argument.chunkId == 0),
any(),
eq(RemovalCause.EXPIRED));
.onRemoval(
argThat(argument -> argument.chunkId == 0),
any(),
eq(RemovalCause.EXPIRED));

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
verify(chunkManager, times(2)).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
}

@Test
void sizeBasedEviction() throws IOException, StorageBackendException {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "18"
"retention.ms", "-1",
"size", "18"
));

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.hasBinaryContent(CHUNK_1);
.hasBinaryContent(CHUNK_1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);

await().atMost(Duration.ofMillis(5000))
.pollDelay(Duration.ofSeconds(2))
.pollInterval(Duration.ofMillis(10))
.until(() -> !mockingDetails(removalListener).getInvocations().isEmpty());
.pollDelay(Duration.ofSeconds(2))
.pollInterval(Duration.ofMillis(10))
.until(() -> !mockingDetails(removalListener).getInvocations().isEmpty());

verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE));

assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.hasBinaryContent(CHUNK_0);
.hasBinaryContent(CHUNK_0);
assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.hasBinaryContent(CHUNK_1);
.hasBinaryContent(CHUNK_1);
verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt());
}

Expand All @@ -208,8 +208,8 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
@Nested
class ErrorHandlingTests {
private final Map<String, String> configs = Map.of(
"retention.ms", "-1",
"size", "-1"
"retention.ms", "-1",
"size", "-1"
);

@BeforeEach
Expand All @@ -220,52 +220,52 @@ void setUp() {
@Test
void failedFetching() throws Exception {
when(chunkManager.getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt()))
.thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));
.thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));

assertThatThrownBy(() -> chunkCache
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(StorageBackendException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(StorageBackendException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
assertThatThrownBy(() -> chunkCache
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
}

@Test
void failedReadingCachedValueWithInterruptedException() throws Exception {
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.thenReturn(new ByteArrayInputStream(CHUNK_0));
.thenReturn(new ByteArrayInputStream(CHUNK_0));

doCallRealMethod().doAnswer(invocation -> {
throw new InterruptedException(TEST_EXCEPTION_MESSAGE);
}).when(chunkCache).cachedChunkToInputStream(any());

chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThatThrownBy(() -> chunkCache
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(InterruptedException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(InterruptedException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
}

@Test
void failedReadingCachedValueWithExecutionException() throws Exception {
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)).thenReturn(
new ByteArrayInputStream(CHUNK_0));
new ByteArrayInputStream(CHUNK_0));
doCallRealMethod().doAnswer(invocation -> {
throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE));
}).when(chunkCache).cachedChunkToInputStream(any());

chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThatThrownBy(() -> chunkCache
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage(TEST_EXCEPTION_MESSAGE);
}
}
}

0 comments on commit 867d8b0

Please sign in to comment.