diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index b920a962afca..c1c87d579ef4 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -249,6 +250,16 @@ public void resizeCacheSize(long remoteLogIndexFileCacheSize) { indexCache.resizeCacheSize(remoteLogIndexFileCacheSize); } + public void updateCopyQuota(long quota) { + LOGGER.info("Updating remote copy quota to {} bytes per second", quota); + rlmCopyQuotaManager.updateQuota(new Quota(quota, true)); + } + + public void updateFetchQuota(long quota) { + LOGGER.info("Updating remote fetch quota to {} bytes per second", quota); + rlmFetchQuotaManager.updateQuota(new Quota(quota, true)); + } + private void removeMetrics() { metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC); diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 22576bdceb6f..e37a615fa7f4 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1166,36 +1166,57 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w override def validateReconfiguration(newConfig: KafkaConfig): Unit = { newConfig.values.forEach { (k, v) => - if (reconfigurableConfigs.contains(k)) { - if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { - val newValue = v.asInstanceOf[Long] - val oldValue = getValue(server.config, k) - if (newValue != oldValue && newValue <= 0) { - val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" - throw new ConfigException(s"$errorMsg, value should be at least 1") - } + if (RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k) || + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP.equals(k) || + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k)) { + val newValue = v.asInstanceOf[Long] + val oldValue = getValue(server.config, k) + if (newValue != oldValue && newValue <= 0) { + val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" + throw new ConfigException(s"$errorMsg, value should be at least 1") } } } } override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) - val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) - if (oldValue != newValue) { - val remoteLogManager = server.remoteLogManagerOpt - if (remoteLogManager.nonEmpty) { + def oldLongValue(k: String): Long = oldConfig.getLong(k) + def newLongValue(k: String): Long = newConfig.getLong(k) + + def isChangedLongValue(k : String): Boolean = oldLongValue(k) != newLongValue(k) + + val remoteLogManager = server.remoteLogManagerOpt + if (remoteLogManager.nonEmpty) { + if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) { + val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) remoteLogManager.get.resizeCacheSize(newValue) info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " + s"old value: $oldValue, new value: $newValue") } + if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) { + val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) + val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) + remoteLogManager.get.updateCopyQuota(newValue) + info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " + + s"old value: $oldValue, new value: $newValue") + } + if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) { + val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) + val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) + remoteLogManager.get.updateFetchQuota(newValue) + info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} updated, " + + s"old value: $oldValue, new value: $newValue") + } } } private def getValue(config: KafkaConfig, name: String): Long = { name match { - case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => - config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP | + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP | + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP => + config.getLong(name) case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n") } } @@ -1204,6 +1225,8 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, - RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP + RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP ) } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 2e5d77cdc6fa..3ba158891574 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -850,6 +850,102 @@ class DynamicBrokerConfigTest { Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) } + @Test + def testRemoteLogManagerCopyQuotaUpdates(): Unit = { + testRemoteLogManagerQuotaUpdates( + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateCopyQuota(quota) + ) + } + + @Test + def testRemoteLogManagerFetchQuotaUpdates(): Unit = { + testRemoteLogManagerQuotaUpdates( + RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + (remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateFetchQuota(quota) + ) + } + + def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, verifyMethod: (RemoteLogManager, Long) => Unit): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + val config = KafkaConfig.fromProps(props) + val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager])) + + Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + assertEquals(defaultQuota, config.getLong(quotaProp)) + + // Update default config + props.put(quotaProp, "100") + config.dynamicConfig.updateDefaultConfig(props) + assertEquals(100, config.getLong(quotaProp)) + verifyMethod(remoteLogManagerMockOpt.get, 100) + + // Update per broker config + props.put(quotaProp, "200") + config.dynamicConfig.updateBrokerConfig(0, props) + assertEquals(200, config.getLong(quotaProp)) + verifyMethod(remoteLogManagerMockOpt.get, 200) + + Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + } + + @Test + def testRemoteLogManagerMultipleConfigUpdates(): Unit = { + val indexFileCacheSizeProp = RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP + val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP + val fetchQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP + + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + val config = KafkaConfig.fromProps(props) + val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + + Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) + + // Default values + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.getLong(indexFileCacheSizeProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp)) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.getLong(fetchQuotaProp)) + + // Update default config + props.put(indexFileCacheSizeProp, "4") + props.put(copyQuotaProp, "100") + props.put(fetchQuotaProp, "200") + config.dynamicConfig.updateDefaultConfig(props) + assertEquals(4, config.getLong(indexFileCacheSizeProp)) + assertEquals(100, config.getLong(copyQuotaProp)) + assertEquals(200, config.getLong(fetchQuotaProp)) + Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4) + Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100) + Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200) + + // Update per broker config + props.put(indexFileCacheSizeProp, "8") + props.put(copyQuotaProp, "200") + props.put(fetchQuotaProp, "400") + config.dynamicConfig.updateBrokerConfig(0, props) + assertEquals(8, config.getLong(indexFileCacheSizeProp)) + assertEquals(200, config.getLong(copyQuotaProp)) + assertEquals(400, config.getLong(fetchQuotaProp)) + Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8) + Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200) + Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400) + + Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + } + def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long, retentionMs: Long, logLocalRetentionBytes: Long,