Skip to content

Commit

Permalink
Merge pull request #378 from Aiven-Open/ivanyu/throw-remote-resource-…
Browse files Browse the repository at this point in the history
…not-found

Correctly throw RemoteResourceNotFoundException
  • Loading branch information
AnatolyPopov authored Sep 27, 2023
2 parents 2cfdbbd + 773cddf commit 1aedadf
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;

import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache;
Expand All @@ -62,19 +64,24 @@
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
import io.aiven.kafka.tieredstorage.security.EncryptedDataKey;
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.luben.zstd.Zstd;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class RemoteStorageManagerTest extends RsaKeyAwareTest {
RemoteStorageManager rsm;
Expand Down Expand Up @@ -462,4 +469,68 @@ void testRequiresCompression(final CompressionType compressionType, final boolea
final boolean requires = rsm.requiresCompression(logSegmentData);
assertThat(requires).isEqualTo(expectedResult);
}

@Test
void testFetchingSegmentNonExistent() throws StorageBackendException, IOException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0, 100))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
}

@Test
void testFetchingSegmentManifestNotFound() throws StorageBackendException, IOException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0, 100))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
}

@ParameterizedTest
@EnumSource(IndexType.class)
void testFetchingIndexNonExistent(final IndexType indexType) throws StorageBackendException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
}

@ParameterizedTest
@EnumSource(IndexType.class)
void testFetchingIndexManifestNotFound(final IndexType indexType) throws StorageBackendException, IOException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;

Expand Down Expand Up @@ -388,6 +389,8 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
} catch (final Exception e) {
throw new RemoteStorageException(e);
}
Expand Down Expand Up @@ -415,14 +418,11 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
return detransformFinisher.toInputStream();
} catch (final KeyNotFoundException e) {
throw new RemoteResourceNotFoundException(e);
} catch (final Exception e) {
if (e instanceof KeyNotFoundException) {
throw new RemoteResourceNotFoundException(e);
} else {
throw new RemoteStorageException(e);
}
throw new RemoteStorageException(e);
}

}

private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import org.apache.commons.io.input.BoundedInputStream;
Expand Down Expand Up @@ -137,6 +138,8 @@ public InputStream nextElement() {
private InputStream getChunkContent(final int chunkId) {
try {
return chunkManager.getChunk(objectKeyPath, manifest, chunkId);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
} catch (final StorageBackendException | IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.transform;

import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;

public class KeyNotFoundRuntimeException extends RuntimeException {
KeyNotFoundRuntimeException(final KeyNotFoundException keyNotFoundException) {
super(keyNotFoundException);
}
}

0 comments on commit 1aedadf

Please sign in to comment.