Skip to content

Commit

Permalink
Add ability to mask object key prefixes in logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Sep 28, 2023
1 parent f273491 commit 5e01385
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ void testFetchingSegmentFileNonExistent() throws IOException {
);
rsm.configure(config);

final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKeyFactory);
Expand Down Expand Up @@ -510,7 +510,7 @@ void testFetchingSegmentManifestNotFound() {
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";
Expand All @@ -535,7 +535,7 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException
);
rsm.configure(config);

final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKeyFactory);
Expand All @@ -562,7 +562,7 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.text.NumberFormat;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
Expand Down Expand Up @@ -69,12 +70,17 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) {
}

private final String prefix;
private final BiFunction<String, String, ObjectKey> objectKeyConstructor;

/**
* @param prefix the prefix to add to all created keys.
* @param maskPrefix whether to mask the prefix in {@code toString()}.
*/
public ObjectKeyFactory(final String prefix) {
public ObjectKeyFactory(final String prefix, final boolean maskPrefix) {
this.prefix = prefix == null ? "" : prefix;
this.objectKeyConstructor = maskPrefix
? ObjectKeyWithMaskedPrefix::new
: PlainObjectKey::new;
}

/**
Expand All @@ -93,7 +99,7 @@ public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, fi
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

return new PlainObjectKey(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
return objectKeyConstructor.apply(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
}

/**
Expand All @@ -115,7 +121,7 @@ public ObjectKey key(final Map<Integer, Object> fields,

final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix);
final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata));
return new PlainObjectKey(prefix, main + "." + suffix.value);
return objectKeyConstructor.apply(prefix, main + "." + suffix.value);
}

/**
Expand Down Expand Up @@ -160,8 +166,8 @@ private static String filenamePrefixFromOffset(final long offset) {
* <p>Its string representation is identical to its value.
*/
static class PlainObjectKey implements ObjectKey {
private final String prefix;
private final String mainPathAndSuffix;
protected final String prefix;
protected final String mainPathAndSuffix;

PlainObjectKey(final String prefix, final String mainPathAndSuffix) {
this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null");
Expand Down Expand Up @@ -198,4 +204,20 @@ public String toString() {
return value();
}
}

/**
* The object key that consists of a prefix and main part + suffix (as the parent class {@link PlainObjectKey}).
*
* <p>In its string representation, the prefix is masked with {@code <prefix>/}.
*/
static class ObjectKeyWithMaskedPrefix extends PlainObjectKey {
ObjectKeyWithMaskedPrefix(final String prefix, final String mainPathAndSuffix) {
super(prefix, mainPathAndSuffix);
}

@Override
public String toString() {
return "<prefix>/" + mainPathAndSuffix;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void configure(final Map<String, ?> configs) {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new Metrics(time, metricConfig);
setStorage(config.storage());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
final Map<String, KeyPair> keyRing = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix";
private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix";

private static final String OBJECT_KEY_PREFIX_MASK_CONFIG = "key.prefix.mask";
private static final String OBJECT_KEY_PREFIX_MASK_DOC = "Whether to mask path prefix in logs";

private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
Expand Down Expand Up @@ -115,6 +118,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
OBJECT_KEY_PREFIX_DOC
);

CONFIG.define(
OBJECT_KEY_PREFIX_MASK_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
OBJECT_KEY_PREFIX_MASK_DOC
);

CONFIG.define(
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
Expand Down Expand Up @@ -335,6 +346,10 @@ public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}

public boolean keyPrefixMask() {
return getBoolean(OBJECT_KEY_PREFIX_MASK_CONFIG);
}

public int chunkSize() {
return getInt(CHUNK_SIZE_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ObjectKeyFactoryTest {

@Test
void test() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value())
.isEqualTo(
"prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
Expand Down Expand Up @@ -76,7 +76,7 @@ void test() {

@Test
void withCustomFieldsEmpty() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of();
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -118,7 +118,7 @@ void withCustomFieldsEmpty() {

@Test
void withCustomFieldsOnlyPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -160,7 +160,7 @@ void withCustomFieldsOnlyPrefix() {

@Test
void withCustomFieldsOnlyKey() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -188,7 +188,7 @@ void withCustomFieldsOnlyKey() {

@Test
void withCustomFieldsAll() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(
SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
Expand Down Expand Up @@ -218,7 +218,7 @@ void withCustomFieldsAll() {

@Test
void nullPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null);
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null, false);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value())
.isEqualTo(
"topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
Expand Down Expand Up @@ -247,4 +247,49 @@ void suffixForIndexTypes() {
.extracting("value")
.isEqualTo("leader-epoch-checkpoint");
}

@Test
void prefixMasking() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", true);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString())
.isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsEmpty() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of();
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsOnlyPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsOnlyKey() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic/7/file.log");
}

@Test
void prefixMaskingWithCustomFieldsAll() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(
SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic/7/file.log");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void minimalConfig() {
assertThat(config.encryptionKeyPairId()).isNull();
assertThat(config.encryptionKeyRing()).isNull();
assertThat(config.keyPrefix()).isEmpty();
assertThat(config.keyPrefixMask()).isFalse();
assertThat(config.customMetadataKeysIncluded()).isEmpty();
}

Expand Down Expand Up @@ -338,4 +339,16 @@ void invalidCustomMetadataFields() {
.hasMessage("Invalid value unknown for configuration custom.metadata.fields.include: "
+ "String must be one of: REMOTE_SIZE, OBJECT_PREFIX, OBJECT_KEY");
}

@Test
void keyPrefixMasking() {
final var config = new RemoteStorageManagerConfig(
Map.of(
"storage.backend.class", NoopStorageBackend.class.getCanonicalName(),
"chunk.size", "123",
"key.prefix.mask", "true"
)
);
assertThat(config.keyPrefixMask()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SegmentCustomMetadataBuilderTest {
SEGMENT_ID),
1, 100, -1, -1, 1L,
100, Collections.singletonMap(1, 100L));
static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1");
static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1", false);

@Test
void shouldBuildEmptyMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;

import org.assertj.core.util.Throwables;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -154,6 +155,33 @@ void testFetchNonExistingKey() {
.hasMessage("Key non-existing does not exists in storage " + storage());
}

@Test
void testFetchNonExistingKeyMasking() {
final ObjectKey key = new ObjectKey() {
@Override
public String value() {
return "real-key";
}

@Override
public String toString() {
return "masked-key";
}
};

assertThatThrownBy(() -> storage().fetch(key))
.extracting(Throwables::getStackTrace)
.asString()
.contains("masked-key")
.doesNotContain("real-key");

assertThatThrownBy(() -> storage().fetch(key, BytesRange.of(0, 1)))
.extracting(Throwables::getStackTrace)
.asString()
.contains("masked-key")
.doesNotContain("real-key");
}

@Test
protected void testDelete() throws StorageBackendException {
storage().upload(new ByteArrayInputStream("test".getBytes()), TOPIC_PARTITION_SEGMENT_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public InputStream fetch(final ObjectKey key) throws StorageBackendException {
final Path path = fsRoot.resolve(key.value());
return Files.newInputStream(path);
} catch (final NoSuchFileException e) {
throw new KeyNotFoundException(this, key, e);
throw new KeyNotFoundException(this, key);
} catch (final IOException e) {
throw new StorageBackendException("Failed to fetch " + key, e);
}
Expand All @@ -86,7 +86,7 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto
final long size = Math.min(range.to, fileSize) - range.from + 1;
return new BoundedInputStream(chunkContent, size);
} catch (final NoSuchFileException e) {
throw new KeyNotFoundException(this, key, e);
throw new KeyNotFoundException(this, key);
} catch (final IOException e) {
throw new StorageBackendException("Failed to fetch " + key + ", with range " + range, e);
}
Expand Down
5 changes: 4 additions & 1 deletion storage/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ dependencies {

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:localstack:$testcontainersVersion"
integrationTestImplementation "org.wiremock:wiremock:$wireMockVersion"

integrationTestImplementation("org.wiremock:wiremock:$wireMockVersion") {
exclude group: "org.slf4j"
}
}

0 comments on commit 5e01385

Please sign in to comment.