Skip to content

Commit

Permalink
KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset…
Browse files Browse the repository at this point in the history
… for aborted txns (#17676) (#1 7733)

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
kamalcph authored Nov 10, 2024
1 parent ad3e34d commit 6ea2ed3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 15 deletions.
39 changes: 28 additions & 11 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1610,25 +1610,26 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
}

RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get();
EnrichedRecordBatch enrichedRecordBatch = new EnrichedRecordBatch(null, 0);
InputStream remoteSegInputStream = null;
try {
int startPos = 0;
RecordBatch firstBatch = null;

// Iteration over multiple RemoteSegmentMetadata is required in case of log compaction.
// It may be possible the offset is log compacted in the current RemoteLogSegmentMetadata
// And we need to iterate over the next segment metadata to fetch messages higher than the given offset.
while (firstBatch == null && rlsMetadataOptional.isPresent()) {
while (enrichedRecordBatch.batch == null && rlsMetadataOptional.isPresent()) {
remoteLogSegmentMetadata = rlsMetadataOptional.get();
// Search forward for the position of the last offset that is greater than or equal to the target offset
startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset);
remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
RemoteLogInputStream remoteLogInputStream = getRemoteLogInputStream(remoteSegInputStream);
firstBatch = findFirstBatch(remoteLogInputStream, offset);
if (firstBatch == null) {
enrichedRecordBatch = findFirstBatch(remoteLogInputStream, offset);
if (enrichedRecordBatch.batch == null) {
Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
rlsMetadataOptional = findNextSegmentMetadata(rlsMetadataOptional.get(), logOptional.get().leaderEpochCache());
}
}
RecordBatch firstBatch = enrichedRecordBatch.batch;
if (firstBatch == null)
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
Expand Down Expand Up @@ -1659,16 +1660,19 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
}
buffer.flip();

startPos = startPos + enrichedRecordBatch.skippedBytes;
FetchDataInfo fetchDataInfo = new FetchDataInfo(
new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos),
new LogOffsetMetadata(firstBatch.baseOffset(), remoteLogSegmentMetadata.startOffset(), startPos),
MemoryRecords.readableRecords(buffer));
if (includeAbortedTxns) {
fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get());
}

return fetchDataInfo;
} finally {
Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
if (enrichedRecordBatch.batch != null) {
Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
}
}
}
// for testing
Expand Down Expand Up @@ -1763,15 +1767,18 @@ Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetad
}

// Visible for testing
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
RecordBatch nextBatch;
EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
int skippedBytes = 0;
RecordBatch nextBatch = null;
// Look for the batch which has the desired offset
// We will always have a batch in that segment as it is a non-compacted topic.
do {
if (nextBatch != null) {
skippedBytes += nextBatch.sizeInBytes();
}
nextBatch = remoteLogInputStream.nextBatch();
} while (nextBatch != null && nextBatch.lastOffset() < offset);

return nextBatch;
return new EnrichedRecordBatch(nextBatch, skippedBytes);
}

OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
Expand Down Expand Up @@ -2122,4 +2129,14 @@ public String toString() {
'}';
}
}

static class EnrichedRecordBatch {
private final RecordBatch batch;
private final int skippedBytes;

public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) {
this.batch = batch;
this.skippedBytes = skippedBytes;
}
}
}
79 changes: 75 additions & 4 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import scala.collection.JavaConverters;

import static kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
Expand All @@ -150,6 +151,7 @@
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -2932,8 +2934,8 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l
}

// This is the key scenario that we are testing here
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) {
return null;
EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) {
return new EnrichedRecordBatch(null, 0);
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
Expand Down Expand Up @@ -2999,10 +3001,10 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l
return 1;
}

RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) {
EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
return firstBatch;
return new EnrichedRecordBatch(firstBatch, 0);
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
Expand Down Expand Up @@ -3396,6 +3398,75 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}

@Test
public void testRemoteReadFetchDataInfo() throws RemoteStorageException, IOException {
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition), anyInt(), anyLong()))
.thenAnswer(ans -> {
long offset = ans.getArgument(2);
RemoteLogSegmentId segmentId = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(segmentId,
offset - 10, offset + 99, 1024, totalEpochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
return Optional.of(segmentMetadata);
});

File segmentFile = tempFile();
appendRecordsToFile(segmentFile, 100, 3);
FileInputStream fileInputStream = new FileInputStream(segmentFile);
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt()))
.thenReturn(fileInputStream);

RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
return 0;
}
};
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);

long fetchOffset = 10;
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(
1048576, true, leaderTopicIdPartition.topicPartition(),
partitionData, FetchIsolation.HIGH_WATERMARK, false);
FetchDataInfo fetchDataInfo = remoteLogManager.read(remoteStorageFetchInfo);
// firstBatch baseOffset may not be equal to the fetchOffset
assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
assertEquals(273, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
}

private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException {
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
Compression compression = Compression.NONE;
long offset = 0;
List<SimpleRecord> records = new ArrayList<>();
try (FileRecords fileRecords = FileRecords.open(file)) {
for (long counter = 1; counter < nRecords + 1; counter++) {
records.add(new SimpleRecord("foo".getBytes()));
if (counter % nRecordsPerBatch == 0) {
fileRecords.append(MemoryRecords.withRecords(magic, offset, compression, CREATE_TIME,
records.toArray(new SimpleRecord[0])));
offset += records.size();
records.clear();
}
}
fileRecords.flush();
}
}

private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
Expand Down

0 comments on commit 6ea2ed3

Please sign in to comment.