Skip to content

Commit

Permalink
refactor: fetch and catch parts of segment instead of chunks
Browse files Browse the repository at this point in the history
To decouple fetching rate from transformation, a new concept of Part is introduced.
The goal is to download larger ranges (parts) and transform smaller chunks.
  • Loading branch information
jeqo committed Sep 27, 2023
1 parent 016672c commit a42eeb5
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 156 deletions.
116 changes: 116 additions & 0 deletions core/src/main/java/io/aiven/kafka/tieredstorage/Part.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

public class Part {
public final int firstChunkId;
public final BytesRange range;
public final List<Chunk> chunks;

private final ChunkIndex chunkIndex;
private final int partSize;
private final int finalChunkId;

public Part(final ChunkIndex chunkIndex,
final Chunk chunk,
final int partSize) {
this.chunkIndex = chunkIndex;
this.partSize = partSize;
this.finalChunkId = chunkIndex.chunks().size() - 1;

this.firstChunkId = Math.min((chunk.id / partSize) * partSize, finalChunkId);
final var firstChunk = chunkIndex.chunks().get(firstChunkId);
final var lastChunkId = Math.min(firstChunkId + partSize - 1, finalChunkId);
final var lastChunk = chunkIndex.chunks().get(lastChunkId);
this.range = BytesRange.of(firstChunk.range().from, lastChunk.range().to);
this.chunks = chunkIndex.chunks().subList(firstChunkId, lastChunkId + 1);
}

private Part(final ChunkIndex chunkIndex,
final int partSize,
final int finalChunkId,
final int firstChunkId,
final BytesRange range,
final List<Chunk> chunks) {
this.chunkIndex = chunkIndex;
this.partSize = partSize;
this.finalChunkId = finalChunkId;

this.firstChunkId = firstChunkId;
this.range = range;
this.chunks = chunks;
}

public BytesRange range() {
return range;
}

public Optional<Part> next() {
final var currentLastChunkId = firstChunkId + partSize - 1;
if (currentLastChunkId >= finalChunkId) {
return Optional.empty();
} else {
final var nextFirstChunkId = Math.min(firstChunkId + partSize, finalChunkId);
final var firstChunk = chunkIndex.chunks().get(nextFirstChunkId);
final var nextLastChunkId = Math.min(nextFirstChunkId + partSize - 1, finalChunkId);
final var lastChunk = chunkIndex.chunks().get(nextLastChunkId);
final var range = BytesRange.of(firstChunk.range().from, lastChunk.range().to);
final var chunks = chunkIndex.chunks().subList(nextFirstChunkId, nextLastChunkId + 1);
final var part = new Part(chunkIndex, partSize, finalChunkId, nextFirstChunkId, range, chunks);
return Optional.of(part);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Part part = (Part) o;
return firstChunkId == part.firstChunkId
&& Objects.equals(range, part.range)
&& Objects.equals(chunks, part.chunks);
}

@Override
public int hashCode() {
return Objects.hash(firstChunkId, range, chunks);
}

public int startPosition() {
return chunks.get(0).originalPosition;
}

@Override
public String toString() {
return "Part{"
+ "firstChunkId=" + firstChunkId
+ ", range=" + range
+ ", chunks=" + chunks.size()
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private boolean compressionHeuristic;
private boolean encryptionEnabled;
private int chunkSize;
private int partSize;
private RsaEncryptionProvider rsaEncryptionProvider;
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
Expand Down Expand Up @@ -152,6 +153,7 @@ public void configure(final Map<String, ?> configs) {
chunkManagerFactory.configure(configs);
chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider);
chunkSize = config.chunkSize();
partSize = 16 * 1024 * 1024 / chunkSize; // e.g. 16MB/100KB
compressionEnabled = config.compressionEnabled();
compressionHeuristic = config.compressionHeuristicEnabled();

Expand Down Expand Up @@ -376,7 +378,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
Math.min(endPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1)
);

log.trace("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range);
log.debug("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range);

metrics.recordSegmentFetch(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
Expand All @@ -387,7 +389,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var suffix = ObjectKey.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range, partSize)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
throw new RemoteResourceNotFoundException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import java.io.IOException;
import java.io.InputStream;

import io.aiven.kafka.tieredstorage.Part;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface ChunkManager {

InputStream getChunk(final String objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException;
InputStream chunksContent(final String objectKeyPath,
final SegmentManifest manifest,
final Part part)
throws StorageBackendException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package io.aiven.kafka.tieredstorage.chunkmanager;

import java.io.InputStream;
import java.util.List;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.Part;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
Expand All @@ -46,13 +45,13 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
*
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
*/
public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest,
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
@Override
public InputStream chunksContent(final String objectKeyPath,
final SegmentManifest manifest,
final Part part) throws StorageBackendException {
final InputStream chunkContent = fetcher.fetch(objectKeyPath, part.range);

final InputStream chunkContent = fetcher.fetch(objectKeyPath, chunk.range());

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, part.chunks);
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.tieredstorage.Part;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
Expand Down Expand Up @@ -65,10 +66,11 @@ protected ChunkCache(final ChunkManager chunkManager) {
* opened right when fetching from cache happens even if the actual value is removed from the cache,
* the InputStream will still contain the data.
*/
public InputStream getChunk(final String objectKeyPath,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId);
@Override
public InputStream chunksContent(final String objectKeyPath,
final SegmentManifest manifest,
final Part part) throws StorageBackendException, IOException {
final ChunkKey chunkKey = new ChunkKey(objectKeyPath, part.firstChunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
return cache.asMap()
Expand All @@ -77,7 +79,7 @@ public InputStream getChunk(final String objectKeyPath,
statsCounter.recordMiss();
try {
final InputStream chunk =
chunkManager.getChunk(objectKeyPath, manifest, chunkId);
chunkManager.chunksContent(objectKeyPath, manifest, part);
final T t = this.cacheChunk(chunkKey, chunk);
result.getAndSet(cachedChunkToInputStream(t));
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,51 +23,62 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.Part;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import org.apache.commons.io.input.BoundedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchChunkEnumeration implements Enumeration<InputStream> {
static final Logger log = LoggerFactory.getLogger(FetchChunkEnumeration.class);

private final ChunkManager chunkManager;
private final String objectKeyPath;
private final SegmentManifest manifest;
private final BytesRange range;
final int startChunkId;
final int lastChunkId;
final Part firstPart;
final Chunk firstChunk;
final Part lastPart;
final Chunk lastChunk;
private final ChunkIndex chunkIndex;
int currentChunkId;
Optional<Part> currentPart;
public boolean closed;

final int partSize;

/**
*
* @param chunkManager provides chunk input to fetch from
* @param chunkManager provides chunk input to fetch from
* @param objectKeyPath required by chunkManager
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param manifest provides to index to build response from
* @param range original offset range start/end position
* @param partSize fetch part size
*/
public FetchChunkEnumeration(final ChunkManager chunkManager,
final String objectKeyPath,
final SegmentManifest manifest,
final BytesRange range) {
final BytesRange range,
final int partSize) {
this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null");
this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null");
this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null");
this.range = Objects.requireNonNull(range, "range cannot be null");
this.partSize = partSize;

this.chunkIndex = manifest.chunkIndex();

final Chunk firstChunk = getFirstChunk(range.from);
startChunkId = firstChunk.id;
currentChunkId = startChunkId;
final Chunk lastChunk = getLastChunk(range.to);
lastChunkId = lastChunk.id;
firstChunk = getFirstChunk(range.from);
firstPart = new Part(chunkIndex, firstChunk, this.partSize);
currentPart = Optional.of(firstPart);
lastChunk = getLastChunk(range.to);
lastPart = new Part(chunkIndex, lastChunk, this.partSize);
}

private Chunk getFirstChunk(final int fromPosition) {
Expand All @@ -91,55 +102,63 @@ private Chunk getLastChunk(final int endPosition) {

@Override
public boolean hasMoreElements() {
return !closed && currentChunkId <= lastChunkId;
return !closed && currentPart.isPresent();
}

@Override
public InputStream nextElement() {
if (!hasMoreElements()) {
if (currentPart.isEmpty()) {
throw new NoSuchElementException();
}

InputStream chunkContent = getChunkContent(currentChunkId);
final InputStream partContent = partContent(currentPart.get());

final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId);
final int chunkStartPosition = currentChunk.originalPosition;
final boolean isAtFirstChunk = currentChunkId == startChunkId;
final boolean isAtLastChunk = currentChunkId == lastChunkId;
final boolean isSingleChunk = isAtFirstChunk && isAtLastChunk;
if (isSingleChunk) {
final int chunkStartPosition = currentPart.get().startPosition();
final boolean isAtFirstPart = currentPart.get().equals(firstPart);
final boolean isAtLastPart = currentPart.get().equals(lastPart);
final boolean isSinglePart = isAtFirstPart && isAtLastPart;
if (isSinglePart) {
final int toSkip = range.from - chunkStartPosition;
try {
chunkContent.skip(toSkip);
final int chunkSize = range.size();
chunkContent = new BoundedInputStream(chunkContent, chunkSize);
partContent.skip(toSkip);
} catch (final IOException e) {
throw new RuntimeException(e);
}

final int chunkSize = range.size();

log.trace("Returning part {} with size {}", currentPart.get(), chunkSize);

currentPart = Optional.empty();
return new BoundedInputStream(partContent, chunkSize);
} else {
if (isAtFirstChunk) {
if (isAtFirstPart) {
final int toSkip = range.from - chunkStartPosition;
try {
chunkContent.skip(toSkip);
partContent.skip(toSkip);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
if (isAtLastChunk) {
if (isAtLastPart) {
final int chunkSize = range.to - chunkStartPosition + 1;
chunkContent = new BoundedInputStream(chunkContent, chunkSize);

log.trace("Returning part {} with size {}", currentPart.get(), chunkSize);

currentPart = Optional.empty();
return new BoundedInputStream(partContent, chunkSize);
}
}

currentChunkId += 1;
return chunkContent;
log.trace("Returning part {} with size {}", currentPart.get(), currentPart.get().range.size());

currentPart = currentPart.get().next();
return partContent;
}

private InputStream getChunkContent(final int chunkId) {
private InputStream partContent(final Part part) {
try {
return chunkManager.getChunk(objectKeyPath, manifest, chunkId);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
return chunkManager.chunksContent(objectKeyPath, manifest, part);
} catch (final StorageBackendException | IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit a42eeb5

Please sign in to comment.