Skip to content

Commit

Permalink
refactor: fetch and catch parts of segment instead of chunks
Browse files Browse the repository at this point in the history
To decouple fetching rate from transformation, a new concept of Part is introduced.
The goal is to download larger ranges (parts) and transform smaller chunks.
  • Loading branch information
jeqo committed Sep 28, 2023
1 parent d63ac18 commit 816db12
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private boolean compressionHeuristic;
private boolean encryptionEnabled;
private int chunkSize;
private int partSize;
private RsaEncryptionProvider rsaEncryptionProvider;
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
Expand Down Expand Up @@ -152,6 +153,7 @@ public void configure(final Map<String, ?> configs) {
chunkManagerFactory.configure(configs);
chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider);
chunkSize = config.chunkSize();
partSize = config.fetchPartSize() / chunkSize; // e.g. 16MB/100KB
compressionEnabled = config.compressionEnabled();
compressionHeuristic = config.compressionHeuristicEnabled();

Expand Down Expand Up @@ -376,7 +378,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
Math.min(endPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1)
);

log.trace("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range);
log.debug("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range);

metrics.recordSegmentFetch(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
Expand All @@ -387,7 +389,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var suffix = ObjectKey.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range, partSize)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import java.io.IOException;
import java.io.InputStream;

import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface ChunkManager {

InputStream getChunk(final String objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException;
InputStream partChunks(final String objectKeyPath,
final SegmentManifest manifest,
final FetchPart part)
throws StorageBackendException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package io.aiven.kafka.tieredstorage.chunkmanager;

import java.io.InputStream;
import java.util.List;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
Expand All @@ -46,13 +45,13 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
*
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
*/
public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest,
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
@Override
public InputStream partChunks(final String objectKeyPath,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException {
final InputStream chunkContent = fetcher.fetch(objectKeyPath, part.range);

final InputStream chunkContent = fetcher.fetch(objectKeyPath, chunk.range());

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, part.chunks);
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
Expand Down Expand Up @@ -58,26 +59,26 @@ protected ChunkCache(final ChunkManager chunkManager) {
}

/**
* Fetches a specific chunk from remote storage and stores into the cache.
* Fetches chunks of a specific range defined by {@code Part} from remote storage and stores into the cache.
* Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime
* there is a call to cache the InputStream is recreated from the data stored in cache and stored into local
* variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is
* opened right when fetching from cache happens even if the actual value is removed from the cache,
* the InputStream will still contain the data.
*/
public InputStream getChunk(final String objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId);
@Override
public InputStream partChunks(final String objectKeyPath,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException, IOException {
final ChunkKey chunkKey = new ChunkKey(objectKeyPath, part.firstChunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
return cache.asMap()
.compute(chunkKey, (key, val) -> CompletableFuture.supplyAsync(() -> {
if (val == null) {
statsCounter.recordMiss();
try {
final InputStream chunk =
chunkManager.getChunk(objectKeyPath, manifest, chunkId);
final InputStream chunk = chunkManager.partChunks(objectKeyPath, manifest, part);
final T t = this.cacheChunk(chunkKey, chunk);
result.getAndSet(cachedChunkToInputStream(t));
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String CHUNK_SIZE_CONFIG = "chunk.size";
private static final String CHUNK_SIZE_DOC = "The chunk size of log files";

private static final String FETCH_PART_SIZE_CONFIG = "fetch.part.size";
private static final String FETCH_PART_SIZE_DOC = "The size of parts used to fetch from tiered storage. "
+ "Has to be larger than chunk size.";

private static final String COMPRESSION_ENABLED_CONFIG = "compression.enabled";
private static final String COMPRESSION_ENABLED_DOC = "Whether to enable compression";

Expand Down Expand Up @@ -141,6 +145,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
ConfigDef.Importance.HIGH,
CHUNK_SIZE_DOC
);
CONFIG.define(
FETCH_PART_SIZE_CONFIG,
ConfigDef.Type.INT,
16 * 1024 * 1024, // 16MiB
ConfigDef.Range.between(1, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
FETCH_PART_SIZE_DOC
);

CONFIG.define(
COMPRESSION_ENABLED_CONFIG,
Expand Down Expand Up @@ -298,9 +310,16 @@ public RemoteStorageManagerConfig(final Map<String, ?> props) {
}

private void validate() {
validateSizes();
validateCompression();
}

private void validateSizes() {
if (getInt(CHUNK_SIZE_CONFIG) > getInt(FETCH_PART_SIZE_CONFIG)) {
throw new ConfigException(FETCH_PART_SIZE_CONFIG + " must be larger than " + CHUNK_SIZE_CONFIG);
}
}

private void validateCompression() {
if (getBoolean(COMPRESSION_HEURISTIC_ENABLED_CONFIG) && !getBoolean(COMPRESSION_ENABLED_CONFIG)) {
throw new ConfigException(
Expand Down Expand Up @@ -339,6 +358,10 @@ public int chunkSize() {
return getInt(CHUNK_SIZE_CONFIG);
}

public int fetchPartSize() {
return getInt(FETCH_PART_SIZE_CONFIG);
}

public boolean compressionEnabled() {
return getBoolean(COMPRESSION_ENABLED_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.FetchPart;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
Expand All @@ -33,41 +35,51 @@
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import org.apache.commons.io.input.BoundedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchChunkEnumeration implements Enumeration<InputStream> {
static final Logger log = LoggerFactory.getLogger(FetchChunkEnumeration.class);

private final ChunkManager chunkManager;
private final String objectKeyPath;
private final SegmentManifest manifest;
private final BytesRange range;
final int startChunkId;
final int lastChunkId;
final FetchPart firstPart;
final Chunk firstChunk;
final FetchPart lastPart;
final Chunk lastChunk;
private final ChunkIndex chunkIndex;
int currentChunkId;
Optional<FetchPart> currentPart;
public boolean closed;

final int partSize;

/**
*
* @param chunkManager provides chunk input to fetch from
* @param chunkManager provides chunk input to fetch from
* @param objectKeyPath required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param partSize fetch part size
*/
public FetchChunkEnumeration(final ChunkManager chunkManager,
final String objectKeyPath,
final SegmentManifest manifest,
final BytesRange range) {
final BytesRange range,
final int partSize) {
this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null");
this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null");
this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null");
this.range = Objects.requireNonNull(range, "range cannot be null");
this.partSize = partSize;

this.chunkIndex = manifest.chunkIndex();

final Chunk firstChunk = getFirstChunk(range.from);
startChunkId = firstChunk.id;
currentChunkId = startChunkId;
final Chunk lastChunk = getLastChunk(range.to);
lastChunkId = lastChunk.id;
firstChunk = getFirstChunk(range.from);
firstPart = new FetchPart(chunkIndex, firstChunk, this.partSize);
currentPart = Optional.of(firstPart);
lastChunk = getLastChunk(range.to);
lastPart = new FetchPart(chunkIndex, lastChunk, this.partSize);
}

private Chunk getFirstChunk(final int fromPosition) {
Expand All @@ -91,53 +103,63 @@ private Chunk getLastChunk(final int endPosition) {

@Override
public boolean hasMoreElements() {
return !closed && currentChunkId <= lastChunkId;
return !closed && currentPart.isPresent();
}

@Override
public InputStream nextElement() {
if (!hasMoreElements()) {
if (currentPart.isEmpty()) {
throw new NoSuchElementException();
}

InputStream chunkContent = getChunkContent(currentChunkId);
final InputStream partContent = partChunks(currentPart.get());

final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId);
final int chunkStartPosition = currentChunk.originalPosition;
final boolean isAtFirstChunk = currentChunkId == startChunkId;
final boolean isAtLastChunk = currentChunkId == lastChunkId;
final boolean isSingleChunk = isAtFirstChunk && isAtLastChunk;
if (isSingleChunk) {
final int chunkStartPosition = currentPart.get().startPosition();
final boolean isAtFirstPart = currentPart.get().equals(firstPart);
final boolean isAtLastPart = currentPart.get().equals(lastPart);
final boolean isSinglePart = isAtFirstPart && isAtLastPart;
if (isSinglePart) {
final int toSkip = range.from - chunkStartPosition;
try {
chunkContent.skip(toSkip);
final int chunkSize = range.size();
chunkContent = new BoundedInputStream(chunkContent, chunkSize);
partContent.skip(toSkip);
} catch (final IOException e) {
throw new RuntimeException(e);
}

final int chunkSize = range.size();

log.trace("Returning part {} with size {}", currentPart.get(), chunkSize);

currentPart = Optional.empty();
return new BoundedInputStream(partContent, chunkSize);
} else {
if (isAtFirstChunk) {
if (isAtFirstPart) {
final int toSkip = range.from - chunkStartPosition;
try {
chunkContent.skip(toSkip);
partContent.skip(toSkip);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
if (isAtLastChunk) {
if (isAtLastPart) {
final int chunkSize = range.to - chunkStartPosition + 1;
chunkContent = new BoundedInputStream(chunkContent, chunkSize);

log.trace("Returning part {} with size {}", currentPart.get(), chunkSize);

currentPart = Optional.empty();
return new BoundedInputStream(partContent, chunkSize);
}
}

currentChunkId += 1;
return chunkContent;
log.trace("Returning part {} with size {}", currentPart.get(), currentPart.get().range.size());

currentPart = currentPart.get().next();
return partContent;
}

private InputStream getChunkContent(final int chunkId) {
private InputStream partChunks(final FetchPart part) {
try {
return chunkManager.getChunk(objectKeyPath, manifest, chunkId);
return chunkManager.partChunks(objectKeyPath, manifest, part);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
} catch (final StorageBackendException | IOException e) {
Expand Down
Loading

0 comments on commit 816db12

Please sign in to comment.