Skip to content

Commit

Permalink
Fix some RemoteStorageManagerTest
Browse files Browse the repository at this point in the history
Newly added tests were incorrect because they didn't differentiate between manifest-not-found and data-file-not-found errors.
  • Loading branch information
ivanyu committed Sep 27, 2023
1 parent 1aedadf commit 186a5e6
Showing 1 changed file with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.KeyNotFoundRuntimeException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -471,52 +472,81 @@ void testRequiresCompression(final CompressionType compressionType, final boolea
}

@Test
void testFetchingSegmentNonExistent() throws StorageBackendException, IOException {
void testFetchingSegmentFileNonExistent() throws IOException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");

// Ensure the manifest exists.
writeManifest(objectKey);

// Make sure the exception is connected to the log file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundRuntimeException.class)
.hasStackTraceContaining(expectedMessage);
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0, 100))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundRuntimeException.class)
.hasStackTraceContaining(expectedMessage);
}

@Test
void testFetchingSegmentManifestNotFound() throws StorageBackendException, IOException {
void testFetchingSegmentManifestNotFound() {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0, 100))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
}

@ParameterizedTest
@EnumSource(IndexType.class)
void testFetchingIndexNonExistent(final IndexType indexType) throws StorageBackendException {
void testFetchingIndexNonExistent(final IndexType indexType) throws IOException {
final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");

// Ensure the manifest exists.
writeManifest(objectKey);

// Make sure the exception is connected to the index file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType))
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
}

@ParameterizedTest
Expand All @@ -529,8 +559,26 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
);
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class);
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
}

private void writeManifest(final ObjectKey objectKey) throws IOException {
// Ensure the manifest exists.
final String manifest =
"{\"version\":\"1\","
+ "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100,"
+ "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110},"
+ "\"compression\":false}";
final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST));
Files.createDirectories(manifestPath.getParent());
Files.writeString(manifestPath, manifest);
}
}

0 comments on commit 186a5e6

Please sign in to comment.