Skip to content

Commit

Permalink
KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetk88 committed May 31, 2024
1 parent 0971924 commit f226aae
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 7 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
Expand Down Expand Up @@ -234,6 +235,16 @@ public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}

public void updateCopyQuota(long quota) {
LOGGER.info("Updating remote copy quota to {} bytes per second", quota);
rlmCopyQuotaManager.updateQuota(new Quota(quota, true));
}

public void updateFetchQuota(long quota) {
LOGGER.info("Updating remote fetch quota to {} bytes per second", quota);
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
}

private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
Expand Down
39 changes: 32 additions & 7 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) ||
k.equals(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) ||
k.equals(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
val newValue = v.asInstanceOf[Long]
val oldValue = getValue(server.config, k)
if (newValue != oldValue && newValue <= 0) {
Expand All @@ -1179,14 +1181,31 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
if (oldValue != newValue) {
val remoteLogManager = server.remoteLogManagerOpt
if (remoteLogManager.nonEmpty) {
def oldLongValue(k: String): Long = oldConfig.getLong(k)
def newLongValue(k: String): Long = newConfig.getLong(k)

def isChangedLongValue(k : String): Boolean = oldLongValue(k) != newLongValue(k)

val remoteLogManager = server.remoteLogManagerOpt
if (remoteLogManager.nonEmpty) {
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
} else if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateCopyQuota(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
} else if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateFetchQuota(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}
}
}
Expand All @@ -1195,13 +1214,19 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
name match {
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
}
}
}

object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,72 @@ class DynamicBrokerConfigTest {
Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}

@Test
def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP

val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))

Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)

config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))

// Default value is Long.MaxValue
assertEquals(Long.MaxValue, config.getLong(copyQuotaProp))

// Update default config
props.put(copyQuotaProp, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(copyQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)

// Update per broker config
props.put(copyQuotaProp, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.getLong(copyQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)

Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}

@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
val fetchQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP

val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))

Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)

config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))

// Default value is Long.MaxValue
assertEquals(Long.MaxValue, config.getLong(fetchQuotaProp))

// Update default config
props.put(fetchQuotaProp, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(100)

// Update per broker config
props.put(fetchQuotaProp, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)

Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}

def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
retentionMs: Long,
logLocalRetentionBytes: Long,
Expand Down

0 comments on commit f226aae

Please sign in to comment.