Skip to content

Commit

Permalink
KAFKA-16711: Make sure to update highestOffsetInRemoteStorage after l…
Browse files Browse the repository at this point in the history
…og dir change (apache#15947)

Reviewers: Kamal Chandraprakash<[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
showuon authored May 28, 2024
1 parent 64f699a commit a649bc4
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ boolean isLeader() {
// the task's run() method.
private volatile Optional<OffsetAndEpoch> copiedOffsetOption = Optional.empty();
private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = false;
private volatile Optional<String> logDirectory = Optional.empty();

public void convertToLeader(int leaderEpochVal) {
if (leaderEpochVal < 0) {
Expand Down Expand Up @@ -815,6 +816,13 @@ public void run() {
}

UnifiedLog log = unifiedLogOptional.get();
// In the first run after completing altering logDir within broker, we should make sure the state is reset. (KAFKA-16711)
if (!log.parentDir().equals(logDirectory.orElse(null))) {
copiedOffsetOption = Optional.empty();
isLogStartOffsetUpdatedOnBecomingLeader = false;
logDirectory = Optional.of(log.parentDir());
}

if (isLeader()) {
// Copy log segments to remote storage
copyLogSegmentsToRemote(log);
Expand Down
119 changes: 117 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public List<EpochEntry> read() {
};
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);

private final UnifiedLog mockLog = mock(UnifiedLog.class);
private UnifiedLog mockLog = mock(UnifiedLog.class);

@BeforeEach
void setUp() throws Exception {
Expand Down Expand Up @@ -661,6 +661,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
long nextSegmentStartOffset = 150L;
int segmentCount = 3;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
Expand Down Expand Up @@ -729,7 +730,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
List<RemoteLogSegmentMetadata> list = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// return 3 metadata and then return 0 to simulate all segments are deleted
// return the metadataList 3 times, then return empty list to simulate all segments are deleted
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(list.iterator()).thenReturn(Collections.emptyIterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(list.iterator()).thenReturn(list.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(list.iterator());
Expand Down Expand Up @@ -772,12 +773,126 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
safeLongYammerMetricValue("RemoteLogSizeBytes")));
}

@Test
void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
int segmentCount = 3;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt()))
.thenReturn(Optional.of(0L))
.thenReturn(Optional.of(nextSegmentStartOffset - 1));

File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);

FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
when(mockLog.logEndOffset()).thenReturn(500L);
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 100L);
logProps.put("retention.ms", -1L);
LogConfig logConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(logConfig);

OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.timeIndex()).thenReturn(timeIdx);
when(oldSegment.offsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);

CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
doAnswer(ans -> {
// waiting for verification
copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
List<RemoteLogSegmentMetadata> metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(metadataList.iterator());

// leadership change to log in dir1
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds);

TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return 0L == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log");

UnifiedLog oldMockLog = mockLog;
Mockito.clearInvocations(oldMockLog);
// simulate altering log dir completes, and the new partition leader changes to the same broker in different log dir (dir2)
mockLog = mock(UnifiedLog.class);
when(mockLog.parentDir()).thenReturn("dir2");
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.config()).thenReturn(logConfig);
when(mockLog.logEndOffset()).thenReturn(500L);

remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds);

// after copyLogSegment completes for log (in dir1), updateHighestOffsetInRemoteStorage will be triggered with new offset
// even though the leader replica has changed to log in dir2
copyLogSegmentLatch.countDown();
TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(oldMockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return nextSegmentStartOffset - 1 == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir1 log");

// On the next run of RLMTask, the log in dir2 will be picked and start by updateHighestOffsetInRemoteStorage to the expected offset
TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return nextSegmentStartOffset - 1 == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir2 log");

}

@Test
void testRemoteLogManagerRemoteMetrics() throws Exception {
long oldestSegmentStartOffset = 0L;
long olderSegmentStartOffset = 75L;
long nextSegmentStartOffset = 150L;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
Expand Down

0 comments on commit a649bc4

Please sign in to comment.