diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index ed6c53a322ba..a30de55e415c 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -38,6 +38,7 @@ + diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index e5bf9597ca6f..e3402557329b 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -91,6 +91,7 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -117,6 +118,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; + private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); + private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; @@ -239,6 +244,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } + /** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ + Duration quotaTimeout() { + return Duration.ofSeconds(1); + } + RLMQuotaManager createRLMCopyQuotaManager() { return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + + copyQuotaManagerLock.lock(); + try { + while (rlmCopyQuotaManager.isQuotaExceeded()) { + logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); + try { + copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for the quota to be available for copying log segments."); + } + } + rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes()); + // Signal waiting threads to check the quota again + copyQuotaManagerLockCondition.signalAll(); + } finally { + copyQuotaManagerLock.unlock(); + } copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset); } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 7bc199c8cd8a..fb3381023552 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -20,6 +20,7 @@ import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.UnifiedLog; +import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; @@ -86,6 +87,7 @@ import org.mockito.InOrder; import org.mockito.MockedConstruction; import org.mockito.Mockito; +import org.opentest4j.AssertionFailedError; import scala.Option; import scala.collection.JavaConverters; @@ -96,6 +98,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -135,6 +138,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -173,9 +177,11 @@ public class RemoteLogManagerTest { private final String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test"; private final String remoteLogMetadataConsumerTestVal = "consumer.test"; private final String remoteLogMetadataTopicPartitionsNum = "1"; + private static final String EXPECTED_THE_OPERATION_TO_TIME_OUT = "Expected the operation to time out"; private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); private RemoteLogManagerConfig remoteLogManagerConfig = null; private BrokerTopicStats brokerTopicStats = null; @@ -228,6 +234,12 @@ public RemoteStorageManager createRemoteStorageManager() { public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } @Override long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { return 0L; @@ -2679,7 +2691,7 @@ public void testCopyQuotaManagerConfig() { RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); - assertEquals(61, defaultConfig.numQuotaSamples()); + assertEquals(11, defaultConfig.numQuotaSamples()); assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); @@ -2713,6 +2725,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCopyQuota(boolean quotaExceeded) throws Exception { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // Set up the segment that is eligible for copy + when(oldSegment.log()).thenReturn(fileRecords); + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + // set up the active segment + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + + if (quotaExceeded) { + // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded + try { + assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); + fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); + } catch (AssertionFailedError e) { + // Fail the test if the operation completed within the timeout + if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { + fail(e.getMessage()); + } + } + + // Verify the highest offset in remote storage is updated only once + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); + // Verify the highest offset in remote storage was -1L before the copy started + assertEquals(-1L, capture.getValue()); + } else { + // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability + assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + + // Verify quota check was performed + verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); + // Verify bytes to copy was recorded with the quota manager + verify(rlmCopyQuotaManager, times(1)).record(10); + + // Verify the highest offset in remote storage is updated + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 149L after the copy + assertEquals(149L, capturedValues.get(1).longValue()); + } + } + + @Test + public void testCopyThrottling() throws Exception { + long oldestSegmentStartOffset = 0L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 3 log segments + LogSegment segmentToCopy = mock(LogSegment.class); + LogSegment segmentToThrottle = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // set up the segment that will be copied + when(segmentToCopy.log()).thenReturn(fileRecords); + when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset); + when(segmentToCopy.readNextOffset()).thenReturn(100L); + + // set up the segment that will not be copied because of hitting quota + when(segmentToThrottle.log()).thenReturn(fileRecords); + when(segmentToThrottle.baseOffset()).thenReturn(100L); + when(segmentToThrottle.readNextOffset()).thenReturn(150L); + + // set up the active segment + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.baseOffset()).thenReturn(150L); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile); + when(segmentToCopy.timeIndex()).thenReturn(timeIdx); + when(segmentToCopy.offsetIndex()).thenReturn(idx); + when(segmentToCopy.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + // After the first call, isQuotaExceeded should return true + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + + // Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded + try { + assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); + fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); + } catch (AssertionFailedError e) { + // Fail the test if the operation completed within the timeout + if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { + fail(e.getMessage()); + } + } + + // Verify the highest offset in remote storage is updated corresponding to the only segment that was copied + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 99L after the copy + assertEquals(99L, capturedValues.get(1).longValue()); + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);