Skip to content

Commit

Permalink
Rename ObjectKey -> ObjectKeyFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Sep 27, 2023
1 parent 016672c commit ba077a3
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,15 @@ void testFetchingSegmentFileNonExistent() throws IOException {
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");

// Ensure the manifest exists.
writeManifest(objectKey);
writeManifest(objectKeyFactory);

// Make sure the exception is connected to the log file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage";
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -509,9 +510,10 @@ void testFetchingSegmentManifestNotFound() {
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -533,15 +535,15 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException
);
rsm.configure(config);

final ObjectKey objectKey = new ObjectKey("");
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");

// Ensure the manifest exists.
writeManifest(objectKey);
writeManifest(objectKeyFactory);

// Make sure the exception is connected to the index file.
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType))
+ " does not exists in storage";
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.fromIndexType(indexType))
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand All @@ -560,24 +562,26 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
rsm.configure(config);

// Make sure the exception is connected to the manifest file.
final ObjectKey objectKey = new ObjectKey("");
final String expectedMessage =
"Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
.hasCauseInstanceOf(KeyNotFoundException.class)
.hasStackTraceContaining(expectedMessage);
}

private void writeManifest(final ObjectKey objectKey) throws IOException {
private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOException {
// Ensure the manifest exists.
final String manifest =
"{\"version\":\"1\","
+ "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100,"
+ "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110},"
+ "\"compression\":false}";
final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST));
final Path manifestPath = targetDir.resolve(
objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST));
Files.createDirectories(manifestPath.getParent());
Files.writeString(manifestPath, manifest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* Maps Kafka segment files to object paths/keys in the storage backend.
*/
public final class ObjectKey {
public final class ObjectKeyFactory {

/**
* Supported files and extensions, including log, index types, and segment manifest.
Expand Down Expand Up @@ -68,7 +68,7 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) {

private final String prefix;

public ObjectKey(final String prefix) {
public ObjectKeyFactory(final String prefix) {
this.prefix = prefix == null ? "" : prefix;
}

Expand All @@ -82,7 +82,7 @@ public ObjectKey(final String prefix) {
* <p>For example:
* {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log}
*
* @see ObjectKey#mainPath(RemoteLogSegmentMetadata)
* @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata)
*/
public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
private ChunkManager chunkManager;
private ObjectKey objectKey;
private ObjectKeyFactory objectKeyFactory;
private SegmentCustomMetadataSerde customMetadataSerde;
private Set<SegmentCustomMetadataField> customMetadataFields;

Expand All @@ -138,7 +138,7 @@ public void configure(final Map<String, ?> configs) {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new Metrics(time, metricConfig);
setStorage(config.storage());
objectKey = new ObjectKey(config.keyPrefix());
objectKeyFactory = new ObjectKeyFactory(config.keyPrefix());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
final Map<String, KeyPair> keyRing = new HashMap<>();
Expand Down Expand Up @@ -198,7 +198,7 @@ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadat
remoteLogSegmentMetadata.segmentSizeInBytes());

final var customMetadataBuilder =
new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata);
new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata);

final long startedMs = time.milliseconds();

Expand Down Expand Up @@ -291,15 +291,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
final TransformFinisher transformFinisher,
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
final String fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.LOG,
ObjectKeyFactory.Suffix.LOG,
bytes
);
customMetadataBuilder.addUploadResult(ObjectKey.Suffix.LOG, bytes);
customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes);

log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
Expand All @@ -321,8 +321,8 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
final TransformFinisher transformFinisher =
new TransformFinisher(transformEnum);

final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType);
final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
try (final var in = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
Expand All @@ -341,16 +341,16 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws StorageBackendException, IOException {
final String manifest = mapper.writeValueAsString(segmentManifest);
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
final String manifestFileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKey.Suffix.MANIFEST,
ObjectKeyFactory.Suffix.MANIFEST,
bytes
);
customMetadataBuilder.addUploadResult(ObjectKey.Suffix.MANIFEST, bytes);
customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes);

log.debug("Uploaded segment manifest for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
Expand Down Expand Up @@ -384,7 +384,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);

final var suffix = ObjectKey.Suffix.LOG;
final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
Expand All @@ -404,7 +404,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet

final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);

final var key = objectKey(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.fromIndexType(indexType));
final var in = fetcher.fetch(key);

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
Expand All @@ -425,21 +425,22 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) {
private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final String segmentKey;
if (remoteLogSegmentMetadata.customMetadata().isPresent()) {
final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get();
final var fields = customMetadataSerde.deserialize(customMetadataBytes.value());
segmentKey = objectKey.key(fields, remoteLogSegmentMetadata, suffix);
segmentKey = objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix);
} else {
segmentKey = objectKey.key(remoteLogSegmentMetadata, suffix);
segmentKey = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
}
return segmentKey;
}

private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackendException, IOException {
final String manifestKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
final String manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
return segmentManifestProvider.get(manifestKey);
}

Expand All @@ -455,8 +456,8 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
final long startedMs = time.milliseconds();

try {
for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) {
final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
deleter.delete(key);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@

import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;

import io.aiven.kafka.tieredstorage.ObjectKey;
import io.aiven.kafka.tieredstorage.ObjectKeyFactory;

public class SegmentCustomMetadataBuilder {
final ObjectKey objectKey;
final ObjectKeyFactory objectKeyFactory;
final RemoteLogSegmentMetadata segmentMetadata;
final EnumMap<ObjectKey.Suffix, Long> uploadResults;
final EnumMap<ObjectKeyFactory.Suffix, Long> uploadResults;

final Set<SegmentCustomMetadataField> fields;

public SegmentCustomMetadataBuilder(final Set<SegmentCustomMetadataField> fields,
final ObjectKey objectKey,
final ObjectKeyFactory objectKeyFactory,
final RemoteLogSegmentMetadata segmentMetadata) {
this.fields = fields;
this.objectKey = objectKey;
this.objectKeyFactory = objectKeyFactory;
this.segmentMetadata = segmentMetadata;
this.uploadResults = new EnumMap<>(ObjectKey.Suffix.class);
this.uploadResults = new EnumMap<>(ObjectKeyFactory.Suffix.class);
}

public SegmentCustomMetadataBuilder addUploadResult(final ObjectKey.Suffix suffix,
public SegmentCustomMetadataBuilder addUploadResult(final ObjectKeyFactory.Suffix suffix,
final long bytes) {
if (uploadResults.containsKey(suffix)) {
throw new IllegalArgumentException("Upload results for suffix " + suffix + " already added");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;

import io.aiven.kafka.tieredstorage.ObjectKey;
import io.aiven.kafka.tieredstorage.ObjectKeyFactory;

// index define values on custom metadata fields and cannot be changed without breaking compatibility.
public enum SegmentCustomMetadataField {
REMOTE_SIZE(0, new Field("remote_size", Type.VARLONG), SegmentCustomMetadataBuilder::totalSize),
OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKey.prefix()),
OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKey.mainPath(b.segmentMetadata));
OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKeyFactory.prefix()),
OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKeyFactory.mainPath(b.segmentMetadata));

static final TaggedFieldsSection FIELDS_SECTION = TaggedFieldsSection.of(
REMOTE_SIZE.index, REMOTE_SIZE.field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;

import io.aiven.kafka.tieredstorage.ObjectKey;
import io.aiven.kafka.tieredstorage.ObjectKeyFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -300,14 +300,14 @@ private void recordSegmentFetchRequests(final TopicPartition topicPartition) {
.record();
}

public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix,
final long bytes) {
recordObjectUploadRequests(topicPartition, suffix);
recordObjectUploadBytes(topicPartition, suffix, bytes);
}

private void recordObjectUploadBytes(final TopicPartition topicPartition,
final ObjectKey.Suffix suffix,
final ObjectKeyFactory.Suffix suffix,
final long bytes) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES))
.with(metricsRegistry.objectUploadBytesRate, new Rate())
Expand Down Expand Up @@ -348,7 +348,7 @@ private void recordObjectUploadBytes(final TopicPartition topicPartition,
.record(bytes);
}

private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) {
private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD))
.with(metricsRegistry.objectUploadRequestsRate, new Rate())
.with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount())
Expand Down
Loading

0 comments on commit ba077a3

Please sign in to comment.