Skip to content

Commit

Permalink
KAFKA-17498: Reduce the number of remote calls when serving LIST_OFFS…
Browse files Browse the repository at this point in the history
…ETS request (#17132)

While serving  LIST_OFFSETS request, avoid fetching the remote indexes if they can be served from the local indexes. 

Reviewers: Satish Duggana <[email protected]>, Luke Chen <[email protected]>, Christo Lolov <[email protected]>
  • Loading branch information
kamalcph authored Sep 30, 2024
1 parent 6ee46ea commit 4036081
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 10 deletions.
30 changes: 21 additions & 9 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPar
return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
}

private Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset)
Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset)
throws RemoteStorageException, IOException {
int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset);

Expand Down Expand Up @@ -649,8 +649,13 @@ public AsyncOffsetReadFutureHolder<Either<Exception, Option<FileRecords.Timestam
}

/**
* Search the message offset in the remote storage based on timestamp and offset.
* <p>
* Search the message offset in the remote storage for the given timestamp and starting-offset.
* Once the target segment where the search to be performed is found:
* 1. If the target segment lies in the local storage (common segments that lies in both remote and local storage),
* then the search will be performed in the local storage.
* 2. If the target segment is found only in the remote storage, then the search will be performed in the remote storage.
*
* <p>
* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
* <p>
* - If there are no messages in the remote storage, return Empty
Expand All @@ -674,12 +679,10 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicParti
if (topicId == null) {
throw new KafkaException("Topic id does not exist for topic partition: " + tp);
}

Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(tp);
if (!unifiedLogOptional.isPresent()) {
throw new KafkaException("UnifiedLog does not exist for topic partition: " + tp);
}

UnifiedLog unifiedLog = unifiedLogOptional.get();

// Get the respective epoch in which the starting-offset exists.
Expand All @@ -688,7 +691,6 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicParti
NavigableMap<Integer, Long> epochWithOffsets = buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
while (maybeEpoch.isPresent()) {
int epoch = maybeEpoch.getAsInt();

// KAFKA-15802: Add a new API for RLMM to choose how to implement the predicate.
// currently, all segments are returned and then iterated, and filtered
Iterator<RemoteLogSegmentMetadata> iterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
Expand All @@ -698,14 +700,24 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicParti
&& rlsMetadata.endOffset() >= startingOffset
&& isRemoteSegmentWithinLeaderEpochs(rlsMetadata, unifiedLog.logEndOffset(), epochWithOffsets)
&& rlsMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
return lookupTimestamp(rlsMetadata, timestamp, startingOffset);
// cache to avoid race conditions
List<LogSegment> segmentsCopy = new ArrayList<>(unifiedLog.logSegments());
if (segmentsCopy.isEmpty() || rlsMetadata.startOffset() < segmentsCopy.get(0).baseOffset()) {
// search in remote-log
return lookupTimestamp(rlsMetadata, timestamp, startingOffset);
} else {
// search in local-log
for (LogSegment segment : segmentsCopy) {
if (segment.largestTimestamp() >= timestamp) {
return segment.findOffsetByTimestamp(timestamp, startingOffset);
}
}
}
}
}

// Move to the next epoch if not found with the current epoch.
maybeEpoch = leaderEpochCache.nextEpoch(epoch);
}

return Optional.empty();
}

Expand Down
94 changes: 93 additions & 1 deletion core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void setUp() throws Exception {
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100");
appendRLMConfig(props);
config = KafkaConfig.fromProps(props);
brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());

remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
Expand Down Expand Up @@ -1588,6 +1588,98 @@ private void doTestFindOffsetByTimestamp(long ts, long startOffset, int targetLe
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);
}

@Test
void testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocally() throws Exception {
TopicPartition tp = new TopicPartition("sample", 0);
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
Map<String, Uuid> topicIds = Collections.singletonMap(tp.topic(), tpId.topicId());

List<EpochEntry> epochEntries = new ArrayList<>();
epochEntries.add(new EpochEntry(0, 0L));
epochEntries.add(new EpochEntry(1, 20L));
epochEntries.add(new EpochEntry(3, 50L));
epochEntries.add(new EpochEntry(4, 100L));
epochEntries.add(new EpochEntry(5, 200L));
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));

long timestamp = time.milliseconds();
RemoteLogSegmentMetadata metadata0 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()),
0, 99, timestamp, brokerId, timestamp, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, truncateAndGetLeaderEpochs(epochEntries, 0L, 99L));
RemoteLogSegmentMetadata metadata1 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()),
100, 199, timestamp + 1, brokerId, timestamp + 1, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, truncateAndGetLeaderEpochs(epochEntries, 100L, 199L));
// Note that the metadata2 is in COPY_SEGMENT_STARTED state
RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()),
100, 299, timestamp + 2, brokerId, timestamp + 2, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, truncateAndGetLeaderEpochs(epochEntries, 200L, 299L));

when(remoteLogMetadataManager.listRemoteLogSegments(eq(tpId), anyInt()))
.thenAnswer(ans -> {
int epoch = ans.getArgument(1);
if (epoch < 4) {
return Collections.singletonList(metadata0).iterator();
} else if (epoch == 4) {
return Arrays.asList(metadata1, metadata2).iterator();
} else {
throw new IllegalArgumentException("Unexpected call!");
}
});
// Different (timestamp, offset) is chosen for remote and local read result to assert the behaviour
// 9999 -> refers to read from local, 999 -> refers to read from remote
FileRecords.TimestampAndOffset expectedLocalResult = new FileRecords.TimestampAndOffset(timestamp + 9999, 9999, Optional.of(Integer.MAX_VALUE));
FileRecords.TimestampAndOffset expectedRemoteResult = new FileRecords.TimestampAndOffset(timestamp + 999, 999, Optional.of(Integer.MAX_VALUE));
Partition mockFollowerPartition = mockPartition(tpId);

LogSegment logSegment = mockLogSegment(50L, timestamp, null);
LogSegment logSegment1 = mockLogSegment(100L, timestamp + 1, expectedLocalResult);
when(mockLog.logSegments()).thenReturn(Arrays.asList(logSegment, logSegment1));
when(mockLog.logEndOffset()).thenReturn(300L);
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
partition -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) {
@Override
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
@Override
Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) {
return Optional.of(expectedRemoteResult);
}
};
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(mockFollowerPartition), topicIds);

// Read the offset from the remote storage, since the local-log starts from offset 50L and the message with `timestamp` does not exist in the local log
assertEquals(Optional.of(expectedRemoteResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp, 0L, cache));
// Short-circuits the read from the remote storage since the local-log starts from offset 50L and
// the message with (timestamp + 1) exists in the segment with base_offset: 100 which is available locally.
assertEquals(Optional.of(expectedLocalResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache));

// Move the local-log start offset to 100L, still the read from the remote storage should be short-circuited
// as the message with (timestamp + 1) exists in the local log
when(mockLog.logSegments()).thenReturn(Collections.singletonList(logSegment1));
assertEquals(Optional.of(expectedLocalResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache));

// Move the local log start offset to 101L, now message with (timestamp + 1) does not exist in the local log and
// the indexes needs to be fetched from the remote storage
when(logSegment1.baseOffset()).thenReturn(101L);
assertEquals(Optional.of(expectedRemoteResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache));
}

private LogSegment mockLogSegment(long baseOffset,
long largestTimestamp,
FileRecords.TimestampAndOffset timestampAndOffset) throws IOException {
LogSegment logSegment = mock(LogSegment.class);
when(logSegment.baseOffset()).thenReturn(baseOffset);
when(logSegment.largestTimestamp()).thenReturn(largestTimestamp);
if (timestampAndOffset != null) {
when(logSegment.findOffsetByTimestamp(anyLong(), anyLong()))
.thenReturn(Optional.of(timestampAndOffset));
}
return logSegment;
}

@Test
void testIdempotentClose() throws IOException {
remoteLogManager.close();
Expand Down

0 comments on commit 4036081

Please sign in to comment.