Skip to content

Commit

Permalink
Merge pull request #402 from Aiven-Open/ivanyu/mask-object-key
Browse files Browse the repository at this point in the history
Add ability to mask object key prefixes in logs
  • Loading branch information
AnatolyPopov authored Sep 29, 2023
2 parents 7d08576 + 7fba3dd commit e6f322a
Show file tree
Hide file tree
Showing 44 changed files with 806 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,15 @@ void testFetchingSegmentFileNonExistent() throws IOException {
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKey);
writeManifest(objectKeyFactory);

// Make sure the exception is connected to the log file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage";
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -509,9 +510,10 @@ void testFetchingSegmentManifestNotFound() {
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -533,15 +535,15 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);

// Ensure the manifest exists.
writeManifest(objectKey);
writeManifest(objectKeyFactory);

// Make sure the exception is connected to the index file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType))
+ " does not exists in storage";
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.fromIndexType(indexType))
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -560,24 +562,26 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false);
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
}

private void writeManifest(final ObjectKey objectKey) throws IOException {
private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOException {
// Ensure the manifest exists.
final String manifest =
"{\"version\":\"1\","
+ "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100,"
+ "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110},"
+ "\"compression\":false}";
final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST));
final Path manifestPath = targetDir.resolve(
objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value());
Files.createDirectories(manifestPath.getParent());
Files.writeString(manifestPath, manifest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
import java.text.NumberFormat;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_KEY;
import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_PREFIX;

/**
* Maps Kafka segment files to object paths/keys in the storage backend.
*/
public final class ObjectKey {
public final class ObjectKeyFactory {

/**
* Supported files and extensions, including log, index types, and segment manifest.
Expand Down Expand Up @@ -67,9 +70,17 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) {
}

private final String prefix;
private final BiFunction<String, String, ObjectKey> objectKeyConstructor;

public ObjectKey(final String prefix) {
/**
* @param prefix the prefix to add to all created keys.
* @param maskPrefix whether to mask the prefix in {@code toString()}.
*/
public ObjectKeyFactory(final String prefix, final boolean maskPrefix) {
this.prefix = prefix == null ? "" : prefix;
this.objectKeyConstructor = maskPrefix
? ObjectKeyWithMaskedPrefix::new
: PlainObjectKey::new;
}

/**
Expand All @@ -82,15 +93,13 @@ public ObjectKey(final String prefix) {
* <p>For example:
* {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log}
*
* @see ObjectKey#mainPath(RemoteLogSegmentMetadata)
* @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata)
*/
public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

return prefix
+ mainPath(remoteLogSegmentMetadata)
+ "." + suffix.value;
return objectKeyConstructor.apply(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
}

/**
Expand All @@ -103,16 +112,16 @@ public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final
* <p>For example:
* {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log}
*/
public String key(final Map<Integer, Object> fields,
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final Suffix suffix) {
public ObjectKey key(final Map<Integer, Object> fields,
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final Suffix suffix) {
Objects.requireNonNull(fields, "fields cannot be null");
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix);
final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata));
return prefix + main + "." + suffix.value;
return objectKeyConstructor.apply(prefix, main + "." + suffix.value);
}

/**
Expand Down Expand Up @@ -150,4 +159,65 @@ private static String filenamePrefixFromOffset(final long offset) {
nf.setGroupingUsed(false);
return nf.format(offset);
}

/**
* The object key that consists of a prefix and main part + suffix.
*
* <p>Its string representation is identical to its value.
*/
static class PlainObjectKey implements ObjectKey {
protected final String prefix;
protected final String mainPathAndSuffix;

PlainObjectKey(final String prefix, final String mainPathAndSuffix) {
this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null");
this.mainPathAndSuffix = Objects.requireNonNull(mainPathAndSuffix, "mainPathAndSuffix cannot be null");
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PlainObjectKey that = (PlainObjectKey) o;
return Objects.equals(prefix, that.prefix)
&& Objects.equals(mainPathAndSuffix, that.mainPathAndSuffix);
}

@Override
public int hashCode() {
int result = prefix.hashCode();
result = 31 * result + mainPathAndSuffix.hashCode();
return result;
}

@Override
public String value() {
return prefix + mainPathAndSuffix;
}

@Override
public String toString() {
return value();
}
}

/**
* The object key that consists of a prefix and main part + suffix (as the parent class {@link PlainObjectKey}).
*
* <p>In its string representation, the prefix is masked with {@code <prefix>/}.
*/
static class ObjectKeyWithMaskedPrefix extends PlainObjectKey {
ObjectKeyWithMaskedPrefix(final String prefix, final String mainPathAndSuffix) {
super(prefix, mainPathAndSuffix);
}

@Override
public String toString() {
return "<prefix>/" + mainPathAndSuffix;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand Down Expand Up @@ -113,7 +114,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
private ChunkManager chunkManager;
private ObjectKey objectKey;
private ObjectKeyFactory objectKeyFactory;
private SegmentCustomMetadataSerde customMetadataSerde;
private Set<SegmentCustomMetadataField> customMetadataFields;

Expand All @@ -138,7 +139,7 @@ public void configure(final Map<String, ?> configs) {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new Metrics(time, metricConfig);
setStorage(config.storage());
objectKey = new ObjectKey(config.keyPrefix());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
final Map<String, KeyPair> keyRing = new HashMap<>();
Expand Down Expand Up @@ -195,7 +196,7 @@ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadat
log.info("Copying log segment data, metadata: {}", remoteLogSegmentMetadata);

final var customMetadataBuilder =
new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata);
new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata);

final long startedMs = time.milliseconds();

Expand Down Expand Up @@ -286,15 +287,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
final TransformFinisher transformFinisher,
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.LOG,
ObjectKeyFactory.Suffix.LOG,
bytes
);
customMetadataBuilder.addUploadResult(ObjectKey.Suffix.LOG, bytes);
customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes);

log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
Expand All @@ -316,8 +317,8 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
final TransformFinisher transformFinisher =
new TransformFinisher(transformEnum);

final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType);
final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
try (final var in = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
Expand All @@ -336,16 +337,17 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws StorageBackendException, IOException {
final String manifest = mapper.writeValueAsString(segmentManifest);
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
final ObjectKey manifestObjectKey =
objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
final var bytes = uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestObjectKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.MANIFEST,
ObjectKeyFactory.Suffix.MANIFEST,
bytes
);
customMetadataBuilder.addUploadResult(ObjectKey.Suffix.MANIFEST, bytes);
customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes);

log.debug("Uploaded segment manifest for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
Expand Down Expand Up @@ -379,7 +381,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);

final var suffix = ObjectKey.Suffix.LOG;
final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
Expand All @@ -399,7 +401,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet

final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);

final var key = objectKey(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.fromIndexType(indexType));
final var in = fetcher.fetch(key);

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
Expand All @@ -420,21 +422,22 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) {
final String segmentKey;
private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final ObjectKey segmentKey;
if (remoteLogSegmentMetadata.customMetadata().isPresent()) {
final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get();
final var fields = customMetadataSerde.deserialize(customMetadataBytes.value());
segmentKey = objectKey.key(fields, remoteLogSegmentMetadata, suffix);
segmentKey = objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix);
} else {
segmentKey = objectKey.key(remoteLogSegmentMetadata, suffix);
segmentKey = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
}
return segmentKey;
}

private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackendException, IOException {
final String manifestKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
final ObjectKey manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
return segmentManifestProvider.get(manifestKey);
}

Expand All @@ -450,8 +453,8 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
final long startedMs = time.milliseconds();

try {
for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) {
final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
deleter.delete(key);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.io.InputStream;

import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface ChunkManager {

InputStream getChunk(final String objectKeyPath,
InputStream getChunk(final ObjectKey objectKey,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException;
}
Loading

0 comments on commit e6f322a

Please sign in to comment.