Skip to content

Commit

Permalink
refactor: rename chunk manager into fetch manager
Browse files Browse the repository at this point in the history
This includes changing cache classes and configs.

refactor: redefine fetch part key based on range

Now that cached content is based on a range defined by the fetch part, we can use the range to identify cached content.
  • Loading branch information
jeqo committed Oct 18, 2023
1 parent c1eb15d commit 801e894
Show file tree
Hide file tree
Showing 36 changed files with 724 additions and 723 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
<suppress checks="ClassFanOutComplexity" files="FetchCache.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;

import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.fetch.cache.DiskBasedFetchCache;
import io.aiven.kafka.tieredstorage.fetch.cache.InMemoryFetchCache;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule;
import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule;
Expand All @@ -65,6 +66,7 @@
import io.aiven.kafka.tieredstorage.security.EncryptedDataKey;
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.KeyNotFoundRuntimeException;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -121,7 +123,7 @@ class RemoteStorageManagerTest extends RsaKeyAwareTest {
private static List<Arguments> provideEndToEnd() {
final List<Arguments> result = new ArrayList<>();
final var cacheNames =
List.of(InMemoryChunkCache.class.getCanonicalName(), DiskBasedChunkCache.class.getCanonicalName());
List.of(InMemoryFetchCache.class.getCanonicalName(), DiskBasedFetchCache.class.getCanonicalName());
for (final String cacheClass : cacheNames) {
for (final int chunkSize : List.of(1024 * 1024 - 1, 1024 * 1024 * 1024 - 1, Integer.MAX_VALUE / 2)) {
for (final boolean compression : List.of(true, false)) {
Expand Down Expand Up @@ -196,22 +198,19 @@ void endToEnd(
final var cacheDir = tmpDir.resolve("cache");
Files.createDirectories(cacheDir);
final Map<String, String> config = new HashMap<>();
config.putAll(Map.of(
"chunk.size", Integer.toString(chunkSize),
// TODO: make tests with different fetch part size
"fetch.part.size", Integer.toString(chunkSize + 1),
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"key.prefix", "test/",
"storage.root", targetDir.toString(),
"compression.enabled", Boolean.toString(compression),
"encryption.enabled", Boolean.toString(encryption),
"custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY"
));
config.putAll(Map.of(
"chunk.cache.class", cacheClass,
"chunk.cache.path", cacheDir.toString(),
"chunk.cache.size", Integer.toString(100 * 1024 * 1024)
));
config.put("chunk.size", Integer.toString(chunkSize));
// TODO: make tests with different fetch part size
config.put("fetch.part.size", Integer.toString(chunkSize + 1));
config.put("storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage");
config.put("key.prefix", "test/");
config.put("storage.root", targetDir.toString());
config.put("compression.enabled", Boolean.toString(compression));
config.put("encryption.enabled", Boolean.toString(encryption));
config.put("custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY");
config.put("fetch.cache.class", cacheClass);
config.put("fetch.cache.path", cacheDir.toString());
config.put("fetch.cache.size", Integer.toString(100 * 1024 * 1024));

if (encryption) {
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
Expand Down Expand Up @@ -446,18 +445,16 @@ void testRequiresCompression(final CompressionType compressionType, final boolea

// Configure the RSM.
final int chunkSize = 1024 * 1024;
final Map<String, ?> config = Map.of(
"chunk.size", Integer.toString(chunkSize),
"storage.backend.class",
"io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"key.prefix", "test/",
"storage.root", targetDir.toString(),
"compression.enabled", "true",
"compression.heuristic.enabled", "true",
"chunk.cache.size", "10000",
"chunk.cache.class", InMemoryChunkCache.class.getCanonicalName(),
"chunk.cache.retention.ms", "10000"
);
final Map<String, String> config = new HashMap<>();
config.put("chunk.size", Integer.toString(chunkSize));
config.put("storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage");
config.put("key.prefix", "test/");
config.put("storage.root", targetDir.toString());
config.put("compression.enabled", "true");
config.put("compression.heuristic.enabled", "true");
config.put("fetch.cache.size", "10000");
config.put("fetch.cache.class", InMemoryFetchCache.class.getCanonicalName());
config.put("fetch.cache.retention.ms", "10000");

rsm.configure(config);

Expand Down Expand Up @@ -486,8 +483,8 @@ void testFetchingSegmentFileNonExistent() throws IOException {

// Make sure the exception is connected to the log file.
final String expectedMessage = "Key "
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG)
+ " does not exists in storage";
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG)
+ " does not exists in storage";

assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.config.RemoteStorageManagerConfig;
import io.aiven.kafka.tieredstorage.fetch.FetchEnumeration;
import io.aiven.kafka.tieredstorage.fetch.FetchManager;
import io.aiven.kafka.tieredstorage.fetch.FetchManagerFactory;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
Expand Down Expand Up @@ -81,8 +83,6 @@
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;

Expand Down Expand Up @@ -115,7 +115,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private RsaEncryptionProvider rsaEncryptionProvider;
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
private ChunkManager chunkManager;
private FetchManager fetchManager;
private ObjectKeyFactory objectKeyFactory;
private SegmentCustomMetadataSerde customMetadataSerde;
private Set<SegmentCustomMetadataField> customMetadataFields;
Expand Down Expand Up @@ -146,14 +146,13 @@ public void configure(final Map<String, ?> configs) {
if (encryptionEnabled) {
final Map<String, KeyPair> keyRing = new HashMap<>();
config.encryptionKeyRing().forEach((keyId, keyPaths) ->
keyRing.put(keyId, RsaKeyReader.read(keyPaths.publicKey, keyPaths.privateKey)
));
keyRing.put(keyId, RsaKeyReader.read(keyPaths.publicKey, keyPaths.privateKey)));
rsaEncryptionProvider = new RsaEncryptionProvider(config.encryptionKeyPairId(), keyRing);
aesEncryptionProvider = new AesEncryptionProvider();
}
final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory();
chunkManagerFactory.configure(configs);
chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider);
final FetchManagerFactory fetchManagerFactory = new FetchManagerFactory();
fetchManagerFactory.configure(configs);
fetchManager = fetchManagerFactory.initChunkManager(fetcher, aesEncryptionProvider);
chunkSize = config.chunkSize();
partSize = config.fetchPartSize() / chunkSize; // e.g. 16MB/100KB
compressionEnabled = config.compressionEnabled();
Expand Down Expand Up @@ -435,7 +434,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range, partSize)
return new FetchEnumeration(fetchManager, segmentKey, segmentManifest, range, partSize)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.chunkmanager;
package io.aiven.kafka.tieredstorage.fetch;

import java.io.InputStream;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
Expand All @@ -32,27 +31,27 @@
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;

public class DefaultChunkManager implements ChunkManager {
public class DefaultFetchManager implements FetchManager {
private final ObjectFetcher fetcher;
private final AesEncryptionProvider aesEncryptionProvider;

public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) {
public DefaultFetchManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) {
this.fetcher = fetcher;
this.aesEncryptionProvider = aesEncryptionProvider;
}

/**
* Gets a chunk of a segment.
* Gets a part of a segment.
*
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
* @return an {@link InputStream} of the fetch part, plain text (i.e., decrypted and decompressed).
*/
@Override
public InputStream partChunks(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException {
final InputStream chunkContent = fetcher.fetch(objectKey, part.range);
public InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException {
final InputStream partContent = fetcher.fetch(objectKey, part.range);

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, part.chunks);
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(partContent, part.chunks);
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.transform;
package io.aiven.kafka.tieredstorage.fetch;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,8 +26,6 @@
import java.util.Optional;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
Expand All @@ -39,36 +37,38 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchChunkEnumeration implements Enumeration<InputStream> {
static final Logger log = LoggerFactory.getLogger(FetchChunkEnumeration.class);
public class FetchEnumeration implements Enumeration<InputStream> {
static final Logger log = LoggerFactory.getLogger(FetchEnumeration.class);

private final ChunkManager chunkManager;
private final FetchManager fetchManager;
private final ObjectKey objectKey;
private final SegmentManifest manifest;
private final BytesRange range;
final FetchPart firstPart;
final Chunk firstChunk;
final FetchPart lastPart;
final Chunk lastChunk;
private final ChunkIndex chunkIndex;
Optional<FetchPart> currentPart;
public boolean closed;

final int partSize;

// for testing
final Chunk firstChunk;
final Chunk lastChunk;

/**
* @param chunkManager provides chunk input to fetch from
* @param objectKey required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param partSize fetch part size
* @param fetchManager provides part input to fetch from
* @param objectKey required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param partSize fetch part size
*/
public FetchChunkEnumeration(final ChunkManager chunkManager,
final ObjectKey objectKey,
final SegmentManifest manifest,
final BytesRange range,
final int partSize) {
this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null");
public FetchEnumeration(final FetchManager fetchManager,
final ObjectKey objectKey,
final SegmentManifest manifest,
final BytesRange range,
final int partSize) {
this.fetchManager = Objects.requireNonNull(fetchManager, "chunkManager cannot be null");
this.objectKey = Objects.requireNonNull(objectKey, "objectKey cannot be null");
this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null");
this.range = Objects.requireNonNull(range, "range cannot be null");
Expand Down Expand Up @@ -160,7 +160,7 @@ public InputStream nextElement() {

private InputStream partChunks(final FetchPart part) {
try {
return chunkManager.partChunks(objectKey, manifest, part);
return fetchManager.partContent(objectKey, manifest, part);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
} catch (final StorageBackendException | IOException e) {
Expand All @@ -178,13 +178,13 @@ public void close() {

/**
* This class overrides the behavior of {@link SequenceInputStream#close()} to avoid unnecessary calls to
* {@link FetchChunkEnumeration#nextElement()} since {@link FetchChunkEnumeration} is supposed
* {@link FetchEnumeration#nextElement()} since {@link FetchEnumeration} is supposed
* to be lazy and does not create inout streams unless there was such a call.
*/
private static class LazySequenceInputStream extends SequenceInputStream {
private final FetchChunkEnumeration closeableEnumeration;
private final FetchEnumeration closeableEnumeration;

LazySequenceInputStream(final FetchChunkEnumeration e) {
LazySequenceInputStream(final FetchEnumeration e) {
super(e);
this.closeableEnumeration = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.chunkmanager;
package io.aiven.kafka.tieredstorage.fetch;

import java.io.IOException;
import java.io.InputStream;

import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface ChunkManager {
InputStream partChunks(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part)
public interface FetchManager {
InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part)
throws StorageBackendException, IOException;
}
Loading

0 comments on commit 801e894

Please sign in to comment.