Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetk88 committed Jun 12, 2024
1 parent 82d0413 commit 189e859
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 37 deletions.
34 changes: 16 additions & 18 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,16 +1166,14 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) ||
k.equals(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) ||
k.equals(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
val newValue = v.asInstanceOf[Long]
val oldValue = getValue(server.config, k)
if (newValue != oldValue && newValue <= 0) {
val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
if (RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k) ||
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP.equals(k) ||
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k)) {
val newValue = v.asInstanceOf[Long]
val oldValue = getValue(server.config, k)
if (newValue != oldValue && newValue <= 0) {
val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
}
Expand All @@ -1195,13 +1193,15 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
} else if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
}
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateCopyQuota(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
} else if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
}
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateFetchQuota(newValue)
Expand All @@ -1213,12 +1213,10 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w

private def getValue(config: KafkaConfig, name: String): Long = {
name match {
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP |
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP |
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
config.getLong(name)
case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
}
}
Expand Down
68 changes: 49 additions & 19 deletions core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -852,39 +852,55 @@ class DynamicBrokerConfigTest {

@Test
def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
testRemoteLogManagerQuotaUpdates(
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
(remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateCopyQuota(quota)
)
}

@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
testRemoteLogManagerQuotaUpdates(
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
(remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateFetchQuota(quota)
)
}

def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, verifyMethod: (RemoteLogManager, Long) => Unit): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))
val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager]))

Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)

config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))

// Default value is Long.MaxValue
assertEquals(Long.MaxValue, config.getLong(copyQuotaProp))
assertEquals(defaultQuota, config.getLong(quotaProp))

// Update default config
props.put(copyQuotaProp, "100")
props.put(quotaProp, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(copyQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
assertEquals(100, config.getLong(quotaProp))
verifyMethod(remoteLogManagerMockOpt.get, 100)

// Update per broker config
props.put(copyQuotaProp, "200")
props.put(quotaProp, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.getLong(copyQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
assertEquals(200, config.getLong(quotaProp))
verifyMethod(remoteLogManagerMockOpt.get, 200)

Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}

@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
def testRemoteLogManagerMultipleConfigUpdates(): Unit = {
val indexFileCacheSizeProp = RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
val copyQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
val fetchQuotaProp = RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP

val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
Expand All @@ -898,20 +914,34 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))

// Default value is Long.MaxValue
assertEquals(Long.MaxValue, config.getLong(fetchQuotaProp))
// Default values
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.getLong(indexFileCacheSizeProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.getLong(fetchQuotaProp))

// Update default config
props.put(fetchQuotaProp, "100")
props.put(indexFileCacheSizeProp, "4")
props.put(copyQuotaProp, "100")
props.put(fetchQuotaProp, "200")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(100)
assertEquals(4, config.getLong(indexFileCacheSizeProp))
assertEquals(100, config.getLong(copyQuotaProp))
assertEquals(200, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)

// Update per broker config
props.put(fetchQuotaProp, "200")
props.put(indexFileCacheSizeProp, "8")
props.put(copyQuotaProp, "200")
props.put(fetchQuotaProp, "400")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)
assertEquals(8, config.getLong(indexFileCacheSizeProp))
assertEquals(200, config.getLong(copyQuotaProp))
assertEquals(400, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8)
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400)

Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}
Expand Down

0 comments on commit 189e859

Please sign in to comment.