Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage #16071

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ RLMQuotaManager createRLMFetchQuotaManager() {
"Tracking fetch byte-rate for Remote Log Manager", time);
}

public boolean isRemoteLogFetchQuotaExceeded() {
return rlmFetchQuotaManager.isQuotaExceeded();
}

static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
Expand Down Expand Up @@ -1660,7 +1664,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
*/
public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats));
return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager));
}

void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -34,17 +35,20 @@ public class RemoteLogReader implements Callable<Void> {
private final RemoteLogManager rlm;
private final BrokerTopicStats brokerTopicStats;
private final Consumer<RemoteLogReadResult> callback;
private final RLMQuotaManager quotaManager;

public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
RemoteLogManager rlm,
Consumer<RemoteLogReadResult> callback,
BrokerTopicStats brokerTopicStats) {
BrokerTopicStats brokerTopicStats,
RLMQuotaManager quotaManager) {
this.fetchInfo = fetchInfo;
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
logger = new LogContext() {
@Override
public String logPrefix() {
Expand Down Expand Up @@ -73,6 +77,7 @@ public Void call() {
}

logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);

return null;
Expand Down
27 changes: 21 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1754,12 +1754,27 @@ class ReplicaManager(val config: KafkaConfig,
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
val fetchDataInfo =
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
fetchInfo, params.isolation, params.hardMaxBytesLimit())))
val fetchDataInfo = if (remoteLogManager.get.isRemoteLogFetchQuotaExceeded) {
// We do not want to send an exception in LogReadResult response (like we do in other cases when we send
// UnknownOffsetMetadata), because then it is classified as error in reading data, and a response is
// immediately sent back to the client. Instead, we want that we should be able to serve data for the
// other topic partitions via delayed fetch if required (when sending immediate response, we skip
// delayed fetch). Also, immediately sending response would make the consumer retry again immediately,
// which may run into quota exceeded situation again and thus get it into a loop.
new FetchDataInfo(
satishd marked this conversation as resolved.
Show resolved Hide resolved
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY,
false,
Optional.empty(),
Optional.empty()
)
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
fetchInfo, params.isolation, params.hardMaxBytesLimit())))
}

LogReadResult(fetchDataInfo,
divergingEpoch = None,
Expand Down
16 changes: 14 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
BrokerTopicStats brokerTopicStats = null;
RLMQuotaManager mockQuotaManager = mock(RLMQuotaManager.class);
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);

Expand All @@ -62,7 +64,7 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE

Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager);
remoteLogReader.call();

// verify the callback did get invoked with the expected remoteLogReadResult
Expand All @@ -73,6 +75,11 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE
assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());

// verify the record method on quota manager was called with the expected value
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
assertEquals(100, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
Expand All @@ -89,7 +96,7 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce

Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager);
remoteLogReader.call();

// verify the callback did get invoked with the expected remoteLogReadResult
Expand All @@ -99,6 +106,11 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce
assertTrue(actualRemoteLogReadResult.error.isPresent());
assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());

// verify the record method on quota manager was called with the expected value
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
assertEquals(0, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
Expand Down
73 changes: 73 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6669,6 +6669,79 @@ class ReplicaManagerTest {
))
}
}

@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)

val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0)

assertEquals(1, fetch.size)
assertEquals(tpId0, fetch.head._1)
val fetchInfo = fetch.head._2.info
assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, fetchInfo.fetchOffsetMetadata)
assertFalse(fetchInfo.records.records().iterator().hasNext)
assertFalse(fetchInfo.firstEntryIncomplete)
assertFalse(fetchInfo.abortedTransactions.isPresent)
assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
}

@Test
def testRemoteReadQuotaNotExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)

val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0)

assertEquals(1, fetch.size)
assertEquals(tpId0, fetch.head._1)
val fetchInfo = fetch.head._2.info
assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
}

private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())

val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000, 0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava)
replicaManager.readFromLog(
params,
Seq(new TopicIdPartition(topicId, 0, topic) -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of(leaderEpoch))),
satishd marked this conversation as resolved.
Show resolved Hide resolved
UnboundedQuota,
readFromPurgatory = false)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

// Some threads are closed, but the state didn't reflect in the JVM immediately, so add some wait time for it
private def assertNoNonDaemonThreadsWithWaiting(threadNamePrefix: String, waitTimeMs: Long = 500L): Unit = {
var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
Expand Down