Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16711: make sure update highestOffsetInRemoteStorage after log dir change #15947

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ boolean isLeader() {
// the task's run() method.
private volatile Optional<OffsetAndEpoch> copiedOffsetOption = Optional.empty();
private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = false;
private volatile Optional<String> logDirectory = Optional.empty();

public void convertToLeader(int leaderEpochVal) {
if (leaderEpochVal < 0) {
Expand Down Expand Up @@ -815,6 +816,13 @@ public void run() {
}

UnifiedLog log = unifiedLogOptional.get();
// In the first run after completing altering logDir within broker, we should make sure the state is reset. (KAFKA-16711)
if (!log.parentDir().equals(logDirectory.orElse(null))) {
copiedOffsetOption = Optional.empty();
isLogStartOffsetUpdatedOnBecomingLeader = false;
logDirectory = Optional.of(log.parentDir());
}

if (isLeader()) {
// Copy log segments to remote storage
copyLogSegmentsToRemote(log);
Expand Down
119 changes: 117 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public List<EpochEntry> read() {
};
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);

private final UnifiedLog mockLog = mock(UnifiedLog.class);
private UnifiedLog mockLog = mock(UnifiedLog.class);

@BeforeEach
void setUp() throws Exception {
Expand Down Expand Up @@ -660,6 +660,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
long nextSegmentStartOffset = 150L;
int segmentCount = 3;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
Expand Down Expand Up @@ -728,7 +729,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
List<RemoteLogSegmentMetadata> list = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
// return 3 metadata and then return 0 to simulate all segments are deleted
// return the metadataList 3 times, then return empty list to simulate all segments are deleted
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(list.iterator()).thenReturn(Collections.emptyIterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(list.iterator()).thenReturn(list.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(list.iterator());
Expand Down Expand Up @@ -771,12 +772,126 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc
safeLongYammerMetricValue("RemoteLogSizeBytes")));
}

@Test
void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
int segmentCount = 3;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt()))
.thenReturn(Optional.of(0L))
.thenReturn(Optional.of(nextSegmentStartOffset - 1));

File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);

FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
when(mockLog.logEndOffset()).thenReturn(500L);
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 100L);
logProps.put("retention.ms", -1L);
LogConfig logConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(logConfig);

OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.timeIndex()).thenReturn(timeIdx);
when(oldSegment.offsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);

CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
doAnswer(ans -> {
// waiting for verification
copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
List<RemoteLogSegmentMetadata> metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(metadataList.iterator());

// leadership change to log in dir1
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds);

TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return 0L == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log");

UnifiedLog oldMockLog = mockLog;
Mockito.clearInvocations(oldMockLog);
// simulate altering log dir completes, and the new partition leader changes to the same broker in different log dir (dir2)
mockLog = mock(UnifiedLog.class);
when(mockLog.parentDir()).thenReturn("dir2");
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.config()).thenReturn(logConfig);
when(mockLog.logEndOffset()).thenReturn(500L);

remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds);

// after copyLogSegment completes for log (in dir1), updateHighestOffsetInRemoteStorage will be triggered with new offset
// even though the leader replica has changed to log in dir2
copyLogSegmentLatch.countDown();
TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(oldMockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return nextSegmentStartOffset - 1 == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir1 log");

// On the next run of RLMTask, the log in dir2 will be picked and start by updateHighestOffsetInRemoteStorage to the expected offset
TestUtils.waitForCondition(() -> {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
return nextSegmentStartOffset - 1 == argument.getValue();
}, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir2 log");

}

@Test
void testRemoteLogManagerRemoteMetrics() throws Exception {
long oldestSegmentStartOffset = 0L;
long olderSegmentStartOffset = 75L;
long nextSegmentStartOffset = 150L;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.parentDir()).thenReturn("dir1");

// leader epoch preparation
checkpoint.write(totalEpochEntries);
Expand Down