From 40360819bb97d6b05dfef6451888b4d908fc3bf4 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Mon, 30 Sep 2024 12:14:46 +0530 Subject: [PATCH] KAFKA-17498: Reduce the number of remote calls when serving LIST_OFFSETS request (#17132) While serving LIST_OFFSETS request, avoid fetching the remote indexes if they can be served from the local indexes. Reviewers: Satish Duggana , Luke Chen , Christo Lolov --- .../kafka/log/remote/RemoteLogManager.java | 30 ++++-- .../log/remote/RemoteLogManagerTest.java | 94 ++++++++++++++++++- 2 files changed, 114 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 3a50756a015be..0e877d7f94e8f 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -597,7 +597,7 @@ public Optional fetchRemoteLogSegmentMetadata(TopicPar return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset); } - private Optional lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) + Optional lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) throws RemoteStorageException, IOException { int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset); @@ -649,8 +649,13 @@ public AsyncOffsetReadFutureHolder + * Search the message offset in the remote storage for the given timestamp and starting-offset. + * Once the target segment where the search to be performed is found: + * 1. If the target segment lies in the local storage (common segments that lies in both remote and local storage), + * then the search will be performed in the local storage. + * 2. If the target segment is found only in the remote storage, then the search will be performed in the remote storage. + * + *

* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules: *

* - If there are no messages in the remote storage, return Empty @@ -674,12 +679,10 @@ public Optional findOffsetByTimestamp(TopicParti if (topicId == null) { throw new KafkaException("Topic id does not exist for topic partition: " + tp); } - Optional unifiedLogOptional = fetchLog.apply(tp); if (!unifiedLogOptional.isPresent()) { throw new KafkaException("UnifiedLog does not exist for topic partition: " + tp); } - UnifiedLog unifiedLog = unifiedLogOptional.get(); // Get the respective epoch in which the starting-offset exists. @@ -688,7 +691,6 @@ public Optional findOffsetByTimestamp(TopicParti NavigableMap epochWithOffsets = buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets()); while (maybeEpoch.isPresent()) { int epoch = maybeEpoch.getAsInt(); - // KAFKA-15802: Add a new API for RLMM to choose how to implement the predicate. // currently, all segments are returned and then iterated, and filtered Iterator iterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); @@ -698,14 +700,24 @@ public Optional findOffsetByTimestamp(TopicParti && rlsMetadata.endOffset() >= startingOffset && isRemoteSegmentWithinLeaderEpochs(rlsMetadata, unifiedLog.logEndOffset(), epochWithOffsets) && rlsMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) { - return lookupTimestamp(rlsMetadata, timestamp, startingOffset); + // cache to avoid race conditions + List segmentsCopy = new ArrayList<>(unifiedLog.logSegments()); + if (segmentsCopy.isEmpty() || rlsMetadata.startOffset() < segmentsCopy.get(0).baseOffset()) { + // search in remote-log + return lookupTimestamp(rlsMetadata, timestamp, startingOffset); + } else { + // search in local-log + for (LogSegment segment : segmentsCopy) { + if (segment.largestTimestamp() >= timestamp) { + return segment.findOffsetByTimestamp(timestamp, startingOffset); + } + } + } } } - // Move to the next epoch if not found with the current epoch. maybeEpoch = leaderEpochCache.nextEpoch(epoch); } - return Optional.empty(); } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 439c732f57946..dbf0a426753e1 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -233,7 +233,7 @@ void setUp() throws Exception { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); appendRLMConfig(props); config = KafkaConfig.fromProps(props); - brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), @@ -1588,6 +1588,98 @@ private void doTestFindOffsetByTimestamp(long ts, long startOffset, int targetLe remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); } + @Test + void testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocally() throws Exception { + TopicPartition tp = new TopicPartition("sample", 0); + TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); + Map topicIds = Collections.singletonMap(tp.topic(), tpId.topicId()); + + List epochEntries = new ArrayList<>(); + epochEntries.add(new EpochEntry(0, 0L)); + epochEntries.add(new EpochEntry(1, 20L)); + epochEntries.add(new EpochEntry(3, 50L)); + epochEntries.add(new EpochEntry(4, 100L)); + epochEntries.add(new EpochEntry(5, 200L)); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + long timestamp = time.milliseconds(); + RemoteLogSegmentMetadata metadata0 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), + 0, 99, timestamp, brokerId, timestamp, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, truncateAndGetLeaderEpochs(epochEntries, 0L, 99L)); + RemoteLogSegmentMetadata metadata1 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), + 100, 199, timestamp + 1, brokerId, timestamp + 1, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, truncateAndGetLeaderEpochs(epochEntries, 100L, 199L)); + // Note that the metadata2 is in COPY_SEGMENT_STARTED state + RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), + 100, 299, timestamp + 2, brokerId, timestamp + 2, 1024, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, truncateAndGetLeaderEpochs(epochEntries, 200L, 299L)); + + when(remoteLogMetadataManager.listRemoteLogSegments(eq(tpId), anyInt())) + .thenAnswer(ans -> { + int epoch = ans.getArgument(1); + if (epoch < 4) { + return Collections.singletonList(metadata0).iterator(); + } else if (epoch == 4) { + return Arrays.asList(metadata1, metadata2).iterator(); + } else { + throw new IllegalArgumentException("Unexpected call!"); + } + }); + // Different (timestamp, offset) is chosen for remote and local read result to assert the behaviour + // 9999 -> refers to read from local, 999 -> refers to read from remote + FileRecords.TimestampAndOffset expectedLocalResult = new FileRecords.TimestampAndOffset(timestamp + 9999, 9999, Optional.of(Integer.MAX_VALUE)); + FileRecords.TimestampAndOffset expectedRemoteResult = new FileRecords.TimestampAndOffset(timestamp + 999, 999, Optional.of(Integer.MAX_VALUE)); + Partition mockFollowerPartition = mockPartition(tpId); + + LogSegment logSegment = mockLogSegment(50L, timestamp, null); + LogSegment logSegment1 = mockLogSegment(100L, timestamp + 1, expectedLocalResult); + when(mockLog.logSegments()).thenReturn(Arrays.asList(logSegment, logSegment1)); + when(mockLog.logEndOffset()).thenReturn(300L); + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + partition -> Optional.of(mockLog), + (topicPartition, offset) -> currentLogStartOffset.set(offset), + brokerTopicStats, metrics) { + @Override + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + @Override + Optional lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) { + return Optional.of(expectedRemoteResult); + } + }; + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(mockFollowerPartition), topicIds); + + // Read the offset from the remote storage, since the local-log starts from offset 50L and the message with `timestamp` does not exist in the local log + assertEquals(Optional.of(expectedRemoteResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp, 0L, cache)); + // Short-circuits the read from the remote storage since the local-log starts from offset 50L and + // the message with (timestamp + 1) exists in the segment with base_offset: 100 which is available locally. + assertEquals(Optional.of(expectedLocalResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache)); + + // Move the local-log start offset to 100L, still the read from the remote storage should be short-circuited + // as the message with (timestamp + 1) exists in the local log + when(mockLog.logSegments()).thenReturn(Collections.singletonList(logSegment1)); + assertEquals(Optional.of(expectedLocalResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache)); + + // Move the local log start offset to 101L, now message with (timestamp + 1) does not exist in the local log and + // the indexes needs to be fetched from the remote storage + when(logSegment1.baseOffset()).thenReturn(101L); + assertEquals(Optional.of(expectedRemoteResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache)); + } + + private LogSegment mockLogSegment(long baseOffset, + long largestTimestamp, + FileRecords.TimestampAndOffset timestampAndOffset) throws IOException { + LogSegment logSegment = mock(LogSegment.class); + when(logSegment.baseOffset()).thenReturn(baseOffset); + when(logSegment.largestTimestamp()).thenReturn(largestTimestamp); + if (timestampAndOffset != null) { + when(logSegment.findOffsetByTimestamp(anyLong(), anyLong())) + .thenReturn(Optional.of(timestampAndOffset)); + } + return logSegment; + } + @Test void testIdempotentClose() throws IOException { remoteLogManager.close();