Skip to content

Commit

Permalink
Use ObjectKey object instead of String
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Sep 27, 2023
1 parent ba077a3 commit 5599292
Show file tree
Hide file tree
Showing 37 changed files with 389 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOExc
+ "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110},"
+ "\"compression\":false}";
final Path manifestPath = targetDir.resolve(
objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST));
objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value());
Files.createDirectories(manifestPath.getParent());
Files.writeString(manifestPath, manifest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_KEY;
import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_PREFIX;

Expand Down Expand Up @@ -84,13 +86,11 @@ public ObjectKeyFactory(final String prefix) {
*
* @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata)
*/
public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

return prefix
+ mainPath(remoteLogSegmentMetadata)
+ "." + suffix.value;
return new PlainObjectKey(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value);
}

/**
Expand All @@ -103,16 +103,16 @@ public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final
* <p>For example:
* {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log}
*/
public String key(final Map<Integer, Object> fields,
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final Suffix suffix) {
public ObjectKey key(final Map<Integer, Object> fields,
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final Suffix suffix) {
Objects.requireNonNull(fields, "fields cannot be null");
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
Objects.requireNonNull(suffix, "suffix cannot be null");

final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix);
final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata));
return prefix + main + "." + suffix.value;
return new PlainObjectKey(prefix, main + "." + suffix.value);
}

/**
Expand Down Expand Up @@ -150,4 +150,44 @@ private static String filenamePrefixFromOffset(final long offset) {
nf.setGroupingUsed(false);
return nf.format(offset);
}

static class PlainObjectKey implements ObjectKey {
private final String prefix;
private final String mainPathAndSuffix;

PlainObjectKey(final String prefix, final String mainPathAndSuffix) {
this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null");
this.mainPathAndSuffix = Objects.requireNonNull(mainPathAndSuffix, "mainPathAndSuffix cannot be null");
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PlainObjectKey that = (PlainObjectKey) o;
return Objects.equals(prefix, that.prefix)
&& Objects.equals(mainPathAndSuffix, that.mainPathAndSuffix);
}

@Override
public int hashCode() {
int result = prefix.hashCode();
result = 31 * result + mainPathAndSuffix.hashCode();
return result;
}

@Override
public String value() {
return prefix + mainPathAndSuffix;
}

@Override
public String toString() {
return value();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand Down Expand Up @@ -291,7 +292,7 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
final TransformFinisher transformFinisher,
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws IOException, StorageBackendException {
final String fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
Expand Down Expand Up @@ -322,7 +323,7 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
new TransformFinisher(transformEnum);

final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType);
final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
try (final var in = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
Expand All @@ -341,10 +342,11 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws StorageBackendException, IOException {
final String manifest = mapper.writeValueAsString(segmentManifest);
final String manifestFileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
final ObjectKey manifestObjectKey =
objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
final var bytes = uploader.upload(manifestContent, manifestFileKey);
final var bytes = uploader.upload(manifestContent, manifestObjectKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
ObjectKeyFactory.Suffix.MANIFEST,
Expand Down Expand Up @@ -425,9 +427,9 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final String segmentKey;
private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final ObjectKeyFactory.Suffix suffix) {
final ObjectKey segmentKey;
if (remoteLogSegmentMetadata.customMetadata().isPresent()) {
final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get();
final var fields = customMetadataSerde.deserialize(customMetadataBytes.value());
Expand All @@ -440,7 +442,7 @@ private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata

private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackendException, IOException {
final String manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
final ObjectKey manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
return segmentManifestProvider.get(manifestKey);
}

Expand All @@ -457,7 +459,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment

try {
for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) {
final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
deleter.delete(key);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import java.nio.file.Path;
import java.util.Objects;

import io.aiven.kafka.tieredstorage.storage.ObjectKey;

public class ChunkKey {
public final String segmentFileName;
public final int chunkId;

public ChunkKey(final String objectKeyPath, final int chunkId) {
Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null");
public ChunkKey(final ObjectKey objectKey, final int chunkId) {
Objects.requireNonNull(objectKey, "objectKey cannot be null");
// get last part of segment path + chunk id, as it's used for creating file names
this.segmentFileName = Path.of(objectKeyPath).getFileName().toString();
this.segmentFileName = Path.of(objectKey.value()).getFileName().toString();
this.chunkId = chunkId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.io.InputStream;

import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface ChunkManager {

InputStream getChunk(final String objectKeyPath,
InputStream getChunk(final ObjectKey objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration;
Expand All @@ -46,7 +47,7 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
*
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
*/
public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest,
public InputStream getChunk(final ObjectKey objectKeyPath, final SegmentManifest manifest,
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import com.github.benmanes.caffeine.cache.AsyncCache;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected ChunkCache(final ChunkManager chunkManager) {
* opened right when fetching from cache happens even if the actual value is removed from the cache,
* the InputStream will still contain the data.
*/
public InputStream getChunk(final String objectKeyPath,
public InputStream getChunk(final ObjectKey objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -37,7 +38,7 @@ public class SegmentManifestProvider {
private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache";
private static final long GET_TIMEOUT_SEC = 10;

private final AsyncLoadingCache<String, SegmentManifest> cache;
private final AsyncLoadingCache<ObjectKey, SegmentManifest> cache;

/**
* @param maxCacheSize the max cache size (in items) or empty if the cache is unbounded.
Expand All @@ -62,7 +63,7 @@ public SegmentManifestProvider(final Optional<Long> maxCacheSize,
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
}

public SegmentManifest get(final String manifestKey)
public SegmentManifest get(final ObjectKey manifestKey)
throws StorageBackendException, IOException {
try {
return cache.get(manifestKey).get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import org.apache.commons.io.input.BoundedInputStream;

public class FetchChunkEnumeration implements Enumeration<InputStream> {
private final ChunkManager chunkManager;
private final String objectKeyPath;
private final ObjectKey objectKey;
private final SegmentManifest manifest;
private final BytesRange range;
final int startChunkId;
Expand All @@ -48,16 +49,16 @@ public class FetchChunkEnumeration implements Enumeration<InputStream> {
/**
*
* @param chunkManager provides chunk input to fetch from
* @param objectKeyPath required by chunkManager
* @param objectKey required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
*/
public FetchChunkEnumeration(final ChunkManager chunkManager,
final String objectKeyPath,
final ObjectKey objectKey,
final SegmentManifest manifest,
final BytesRange range) {
this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null");
this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null");
this.objectKey = Objects.requireNonNull(objectKey, "objectKey cannot be null");
this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null");
this.range = Objects.requireNonNull(range, "range cannot be null");

Expand All @@ -74,7 +75,7 @@ private Chunk getFirstChunk(final int fromPosition) {
final Chunk firstChunk = chunkIndex.findChunkForOriginalOffset(fromPosition);
if (firstChunk == null) {
throw new IllegalArgumentException("Invalid start position " + fromPosition
+ " in segment path " + objectKeyPath);
+ " in segment path " + objectKey);
}
return firstChunk;
}
Expand Down Expand Up @@ -137,7 +138,7 @@ public InputStream nextElement() {

private InputStream getChunkContent(final int chunkId) {
try {
return chunkManager.getChunk(objectKeyPath, manifest, chunkId);
return chunkManager.getChunk(objectKey, manifest, chunkId);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
} catch (final StorageBackendException | IOException e) {
Expand Down
Loading

0 comments on commit 5599292

Please sign in to comment.