Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage #15820

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
<disallow class="com.yammer.metrics.Metrics" />
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -122,6 +123,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -158,6 +161,8 @@ public class RemoteLogManager implements Closeable {

private final RemoteLogMetadataManager remoteLogMetadataManager;

private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
satishd marked this conversation as resolved.
Show resolved Hide resolved
private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition();
private final RLMQuotaManager rlmCopyQuotaManager;
private final RLMQuotaManager rlmFetchQuotaManager;

Expand Down Expand Up @@ -244,6 +249,13 @@ private void removeMetrics() {
remoteStorageReaderThreadPool.removeMetrics();
}

/**
* Returns the timeout for the RLM Tasks to wait for the quota to be available
*/
Duration quotaTimeout() {
return Duration.ofSeconds(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the declared constant value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to override this method to have a smaller timeout for tests. That is the reason I did not use a constant value.

}

RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
Expand Down Expand Up @@ -757,6 +769,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
isCancelled(), isLeader());
return;
}

copyQuotaManagerLock.lock();
try {
while (rlmCopyQuotaManager.isQuotaExceeded()) {
satishd marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
// If the thread gets interrupted while waiting, the InterruptedException is thrown
// back to the caller. It's important to note that the task being executed is already
// cancelled before the executing thread is interrupted. The caller is responsible
// for handling the exception gracefully by checking if the task is already cancelled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you cover shutdown when the quota gets breached as unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
// Signal waiting threads to check the quota again
copyQuotaManagerLockCondition.signalAll();
} finally {
copyQuotaManagerLock.unlock();
}
copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
}
}
Expand Down
210 changes: 210 additions & 0 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.opentest4j.AssertionFailedError;
import scala.Option;
import scala.collection.JavaConverters;

Expand All @@ -100,6 +102,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -144,13 +147,15 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -185,6 +190,7 @@ public class RemoteLogManagerTest {

private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class);
private RemoteLogManagerConfig remoteLogManagerConfig = null;

private BrokerTopicStats brokerTopicStats = null;
Expand Down Expand Up @@ -228,6 +234,12 @@ public RemoteStorageManager createRemoteStorageManager() {
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
public RLMQuotaManager createRLMCopyQuotaManager() {
return rlmCopyQuotaManager;
}
public Duration quotaTimeout() {
return Duration.ofMillis(100);
}
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
return 0L;
Expand Down Expand Up @@ -2730,6 +2742,204 @@ public void testEpochEntriesAsByteBuffer() throws Exception {
}


@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCopyQuota(boolean quotaExceeded) throws Exception {
RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);

if (quotaExceeded) {
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

// Verify the highest offset in remote storage is updated only once
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
// Verify the highest offset in remote storage was -1L before the copy started
assertEquals(-1L, capture.getValue());
} else {
// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability
assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog));

// Verify quota check was performed
verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);

// Verify the highest offset in remote storage is updated
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 149L after the copy
assertEquals(149L, capturedValues.get(1).longValue());
}
}

@Test
public void testRLMShutdownDuringQuotaExceededScenario() throws Exception {
remoteLogManager.startup();
setupRLMTask(true);
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);
// Ensure the copy operation is waiting for quota to be available
TestUtils.waitForCondition(() -> {
verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
return true;
}, "Quota exceeded check did not happen");
// Verify RLM is able to shut down
assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close());
}

// helper method to set up a RemoteLogManager.RLMTask for testing copy quota behaviour
private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) throws RemoteStorageException, IOException {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.parentDir()).thenReturn("dir1");
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));

// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

File tempFile = TestUtils.tempFile();
FileRecords fileRecords = mock(FileRecords.class);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);

// Set up the segment that is eligible for copy
when(oldSegment.log()).thenReturn(fileRecords);
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

// set up the active segment
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

File mockProducerSnapshotIndex = TestUtils.tempFile();
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));

when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockLog.lastStableOffset()).thenReturn(250L);

File tempDir = TestUtils.tempDirectory();
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.timeIndex()).thenReturn(timeIdx);
when(oldSegment.offsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());

when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
doNothing().when(rlmCopyQuotaManager).record(anyInt());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
return task;
}

@Test
public void testCopyThrottling() throws Exception {
long oldestSegmentStartOffset = 0L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));

// create 3 log segments
LogSegment segmentToCopy = mock(LogSegment.class);
LogSegment segmentToThrottle = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

File tempFile = TestUtils.tempFile();
FileRecords fileRecords = mock(FileRecords.class);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);

// set up the segment that will be copied
when(segmentToCopy.log()).thenReturn(fileRecords);
when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
when(segmentToCopy.readNextOffset()).thenReturn(100L);

// set up the segment that will not be copied because of hitting quota
when(segmentToThrottle.log()).thenReturn(fileRecords);
when(segmentToThrottle.baseOffset()).thenReturn(100L);
when(segmentToThrottle.readNextOffset()).thenReturn(150L);

// set up the active segment
when(activeSegment.log()).thenReturn(fileRecords);
when(activeSegment.baseOffset()).thenReturn(150L);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment)));

File mockProducerSnapshotIndex = TestUtils.tempFile();
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));

when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockLog.lastStableOffset()).thenReturn(250L);

File tempDir = TestUtils.tempDirectory();
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile);
when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
when(segmentToCopy.offsetIndex()).thenReturn(idx);
when(segmentToCopy.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());

// After the first call, isQuotaExceeded should return true
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
doNothing().when(rlmCopyQuotaManager).record(anyInt());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);

// Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

// Verify the highest offset in remote storage is updated corresponding to the only segment that was copied
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 99L after the copy
assertEquals(99L, capturedValues.get(1).longValue());
}

private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
Expand Down