diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 5257cda30..4c22a09ca 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -480,7 +480,7 @@ void testFetchingSegmentFileNonExistent() throws IOException { ); rsm.configure(config); - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); // Ensure the manifest exists. writeManifest(objectKeyFactory); @@ -510,7 +510,7 @@ void testFetchingSegmentManifestNotFound() { rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); final String expectedMessage = "Key " + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + " does not exists in storage"; @@ -535,7 +535,7 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException ); rsm.configure(config); - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); // Ensure the manifest exists. writeManifest(objectKeyFactory); @@ -562,7 +562,7 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); final String expectedMessage = "Key " + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + " does not exists in storage"; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java index 7d7f22b24..ede8008df 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java @@ -19,6 +19,7 @@ import java.text.NumberFormat; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; @@ -69,9 +70,13 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) { } private final String prefix; + private final BiFunction objectKeyConstructor; - public ObjectKeyFactory(final String prefix) { + public ObjectKeyFactory(final String prefix, final boolean maskPrefix) { this.prefix = prefix == null ? "" : prefix; + this.objectKeyConstructor = maskPrefix + ? ObjectKeyWithMaskedPrefix::new + : PlainObjectKey::new; } /** @@ -90,7 +95,7 @@ public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, fi Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); Objects.requireNonNull(suffix, "suffix cannot be null"); - return new PlainObjectKey(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value); + return objectKeyConstructor.apply(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value); } /** @@ -112,7 +117,7 @@ public ObjectKey key(final Map fields, final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix); final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata)); - return new PlainObjectKey(prefix, main + "." + suffix.value); + return objectKeyConstructor.apply(prefix, main + "." + suffix.value); } /** @@ -152,8 +157,8 @@ private static String filenamePrefixFromOffset(final long offset) { } static class PlainObjectKey implements ObjectKey { - private final String prefix; - private final String mainPathAndSuffix; + protected final String prefix; + protected final String mainPathAndSuffix; PlainObjectKey(final String prefix, final String mainPathAndSuffix) { this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null"); @@ -190,4 +195,15 @@ public String toString() { return value(); } } + + static class ObjectKeyWithMaskedPrefix extends PlainObjectKey { + ObjectKeyWithMaskedPrefix(final String prefix, final String mainPathAndSuffix) { + super(prefix, mainPathAndSuffix); + } + + @Override + public String toString() { + return "/" + mainPathAndSuffix; + } + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 2d9ee8392..b057b5483 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -139,7 +139,7 @@ public void configure(final Map configs) { .recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG))); metrics = new Metrics(time, metricConfig); setStorage(config.storage()); - objectKeyFactory = new ObjectKeyFactory(config.keyPrefix()); + objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask()); encryptionEnabled = config.encryptionEnabled(); if (encryptionEnabled) { final Map keyRing = new HashMap<>(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index 5e1c9e4ad..cf295ba72 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -48,6 +48,9 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix"; private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix"; + private static final String OBJECT_KEY_PREFIX_MASK_CONFIG = "key.prefix.mask"; + private static final String OBJECT_KEY_PREFIX_MASK_DOC = "Whether to mask path prefix in logs"; + private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache."; private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size"; private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default @@ -115,6 +118,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig { OBJECT_KEY_PREFIX_DOC ); + CONFIG.define( + OBJECT_KEY_PREFIX_MASK_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + OBJECT_KEY_PREFIX_MASK_DOC + ); + CONFIG.define( SEGMENT_MANIFEST_CACHE_SIZE_CONFIG, ConfigDef.Type.LONG, @@ -335,6 +346,10 @@ public String keyPrefix() { return getString(OBJECT_KEY_PREFIX_CONFIG); } + public boolean keyPrefixMask() { + return getBoolean(OBJECT_KEY_PREFIX_MASK_CONFIG); + } + public int chunkSize() { return getInt(CHUNK_SIZE_CONFIG); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java index 88537aa7d..4feae062a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java @@ -42,7 +42,7 @@ class ObjectKeyFactoryTest { @Test void test() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" @@ -76,7 +76,7 @@ void test() { @Test void withCustomFieldsEmpty() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); final Map fields = Map.of(); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() @@ -118,7 +118,7 @@ void withCustomFieldsEmpty() { @Test void withCustomFieldsOnlyPrefix() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() @@ -160,7 +160,7 @@ void withCustomFieldsOnlyPrefix() { @Test void withCustomFieldsOnlyKey() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() @@ -188,7 +188,7 @@ void withCustomFieldsOnlyKey() { @Test void withCustomFieldsAll() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); final Map fields = Map.of( SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); @@ -218,7 +218,7 @@ void withCustomFieldsAll() { @Test void nullPrefix() { - final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null, false); assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) .isEqualTo( "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); @@ -247,4 +247,49 @@ void suffixForIndexTypes() { .extracting("value") .isEqualTo("leader-epoch-checkpoint"); } + + @Test + void prefixMasking() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", true); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()) + .isEqualTo("/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsEmpty() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString() + ).isEqualTo("/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsOnlyPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString() + ).isEqualTo("/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsOnlyKey() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString() + ).isEqualTo("/topic/7/file.log"); + } + + @Test + void prefixMaskingWithCustomFieldsAll() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of( + SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", + SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString() + ).isEqualTo("/topic/7/file.log"); + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java index 3b3cd86d9..74b78aa69 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java @@ -51,6 +51,7 @@ void minimalConfig() { assertThat(config.encryptionKeyPairId()).isNull(); assertThat(config.encryptionKeyRing()).isNull(); assertThat(config.keyPrefix()).isEmpty(); + assertThat(config.keyPrefixMask()).isFalse(); assertThat(config.customMetadataKeysIncluded()).isEmpty(); } @@ -338,4 +339,16 @@ void invalidCustomMetadataFields() { .hasMessage("Invalid value unknown for configuration custom.metadata.fields.include: " + "String must be one of: REMOTE_SIZE, OBJECT_PREFIX, OBJECT_KEY"); } + + @Test + void keyPrefixMasking() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "key.prefix.mask", "true" + ) + ); + assertThat(config.keyPrefixMask()).isTrue(); + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java index 2a8ce1af9..7a1ed0dcc 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java @@ -43,7 +43,7 @@ class SegmentCustomMetadataBuilderTest { SEGMENT_ID), 1, 100, -1, -1, 1L, 100, Collections.singletonMap(1, 100L)); - static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1"); + static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1", false); @Test void shouldBuildEmptyMap() { diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 8c0861837..5a57fab46 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; +import org.assertj.core.util.Throwables; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -154,6 +155,33 @@ void testFetchNonExistingKey() { .hasMessage("Key non-existing does not exists in storage " + storage()); } + @Test + void testFetchNonExistingKeyMasking() { + final ObjectKey key = new ObjectKey() { + @Override + public String value() { + return "real-key"; + } + + @Override + public String toString() { + return "masked-key"; + } + }; + + assertThatThrownBy(() -> storage().fetch(key)) + .extracting(Throwables::getStackTrace) + .asString() + .contains("masked-key") + .doesNotContain("real-key"); + + assertThatThrownBy(() -> storage().fetch(key, BytesRange.of(0, 1))) + .extracting(Throwables::getStackTrace) + .asString() + .contains("masked-key") + .doesNotContain("real-key"); + } + @Test protected void testDelete() throws StorageBackendException { storage().upload(new ByteArrayInputStream("test".getBytes()), TOPIC_PARTITION_SEGMENT_KEY); diff --git a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java index 5442b18bb..dac4718a0 100644 --- a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java +++ b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java @@ -65,7 +65,7 @@ public InputStream fetch(final ObjectKey key) throws StorageBackendException { final Path path = fsRoot.resolve(key.value()); return Files.newInputStream(path); } catch (final NoSuchFileException e) { - throw new KeyNotFoundException(this, key, e); + throw new KeyNotFoundException(this, key); } catch (final IOException e) { throw new StorageBackendException("Failed to fetch " + key, e); } @@ -86,7 +86,7 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto final long size = Math.min(range.to, fileSize) - range.from + 1; return new BoundedInputStream(chunkContent, size); } catch (final NoSuchFileException e) { - throw new KeyNotFoundException(this, key, e); + throw new KeyNotFoundException(this, key); } catch (final IOException e) { throw new StorageBackendException("Failed to fetch " + key + ", with range " + range, e); } diff --git a/storage/s3/build.gradle b/storage/s3/build.gradle index 5056ab5b9..0a2739833 100644 --- a/storage/s3/build.gradle +++ b/storage/s3/build.gradle @@ -38,5 +38,8 @@ dependencies { testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" testImplementation "org.testcontainers:localstack:$testcontainersVersion" - integrationTestImplementation "org.wiremock:wiremock:$wireMockVersion" + + integrationTestImplementation("org.wiremock:wiremock:$wireMockVersion") { + exclude group: "org.slf4j" + } }