Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Sep 27, 2023
1 parent 5599292 commit f91c24c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ void testFetchingSegmentFileNonExistent() throws IOException {
);
rsm.configure(config);

final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKeyFactory);
Expand Down Expand Up @@ -510,7 +510,7 @@ void testFetchingSegmentManifestNotFound() {
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";
Expand All @@ -535,7 +535,7 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException
);
rsm.configure(config);

final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKeyFactory);
Expand All @@ -562,7 +562,7 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.text.NumberFormat;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
Expand Down Expand Up @@ -69,9 +70,13 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) {
}

private final String prefix;
private final BiFunction<String, String, ObjectKey> objectKeyConstructor;

public ObjectKeyFactory(final String prefix) {
public ObjectKeyFactory(final String prefix, final boolean maskPrefix) {
this.prefix = prefix == null ? "" : prefix;
this.objectKeyConstructor = maskPrefix
? ObjectKeyWithMaskedPrefix::new
: PlainObjectKey::new;
}

/**
Expand All @@ -90,7 +95,7 @@ public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, fi
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

return new PlainObjectKey(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
return objectKeyConstructor.apply(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
}

/**
Expand All @@ -112,7 +117,7 @@ public ObjectKey key(final Map<Integer, Object> fields,

final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix);
final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata));
return new PlainObjectKey(prefix, main + "." + suffix.value);
return objectKeyConstructor.apply(prefix, main + "." + suffix.value);
}

/**
Expand Down Expand Up @@ -152,8 +157,8 @@ private static String filenamePrefixFromOffset(final long offset) {
}

static class PlainObjectKey implements ObjectKey {
private final String prefix;
private final String mainPathAndSuffix;
protected final String prefix;
protected final String mainPathAndSuffix;

PlainObjectKey(final String prefix, final String mainPathAndSuffix) {
this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null");
Expand Down Expand Up @@ -190,4 +195,15 @@ public String toString() {
return value();
}
}

static class ObjectKeyWithMaskedPrefix extends PlainObjectKey {
ObjectKeyWithMaskedPrefix(final String prefix, final String mainPathAndSuffix) {
super(prefix, mainPathAndSuffix);
}

@Override
public String toString() {
return "<prefix>/" + mainPathAndSuffix;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void configure(final Map<String, ?> configs) {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new Metrics(time, metricConfig);
setStorage(config.storage());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), false); // TODO
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
final Map<String, KeyPair> keyRing = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ObjectKeyFactoryTest {

@Test
void test() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value())
.isEqualTo(
"prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
Expand Down Expand Up @@ -76,7 +76,7 @@ void test() {

@Test
void withCustomFieldsEmpty() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of();
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -118,7 +118,7 @@ void withCustomFieldsEmpty() {

@Test
void withCustomFieldsOnlyPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -160,7 +160,7 @@ void withCustomFieldsOnlyPrefix() {

@Test
void withCustomFieldsOnlyKey() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()
Expand Down Expand Up @@ -188,7 +188,7 @@ void withCustomFieldsOnlyKey() {

@Test
void withCustomFieldsAll() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false);
final Map<Integer, Object> fields = Map.of(
SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
Expand Down Expand Up @@ -218,7 +218,7 @@ void withCustomFieldsAll() {

@Test
void nullPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null);
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null, false);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value())
.isEqualTo(
"topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
Expand Down Expand Up @@ -247,4 +247,49 @@ void suffixForIndexTypes() {
.extracting("value")
.isEqualTo("leader-epoch-checkpoint");
}

@Test
void prefixMasking() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", true);
assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString())
.isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsEmpty() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of();
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsOnlyPrefix() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
}

@Test
void prefixMaskingWithCustomFieldsOnlyKey() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic/7/file.log");
}

@Test
void prefixMaskingWithCustomFieldsAll() {
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true);
final Map<Integer, Object> fields = Map.of(
SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
assertThat(
objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).toString()
).isEqualTo("<prefix>/topic/7/file.log");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SegmentCustomMetadataBuilderTest {
SEGMENT_ID),
1, 100, -1, -1, 1L,
100, Collections.singletonMap(1, 100L));
static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1");
static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1", false);

@Test
void shouldBuildEmptyMap() {
Expand Down

0 comments on commit f91c24c

Please sign in to comment.