From d948db22a5f5a2fd691920d43086de3fda7d4d4e Mon Sep 17 00:00:00 2001 From: Murali Basani Date: Thu, 6 Jun 2024 18:06:25 +0200 Subject: [PATCH] KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199) Reviewers: Greg Harris , Kamal Chandraprakash , Chia-Ping Tsai --- .../main/scala/kafka/server/KafkaConfig.scala | 4 +- .../log/remote/RemoteLogManagerTest.java | 4 +- .../kafka/server/ReplicaManagerTest.scala | 8 +- .../storage/RemoteLogManagerConfig.java | 245 +++--------------- .../storage/RemoteLogManagerConfigTest.java | 190 ++++---------- 5 files changed, 97 insertions(+), 354 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 45ec15b1008f8..d1f5bf43c0f3d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -455,7 +455,7 @@ object KafkaConfig { } /** ********* Remote Log Management Configuration *********/ - RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) + RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key)) def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala @@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props) def remoteLogManagerConfig = _remoteLogManagerConfig private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 19fa4a8a4431d..667e213b81d08 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; @@ -2754,8 +2753,7 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); - AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props); - return new RemoteLogManagerConfig(config); + return new RemoteLogManagerConfig(props); } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bdb4e6bf7e07c..b20a520561e73 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} +import org.apache.kafka.common.config.{TopicConfig} import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -4093,8 +4093,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) - val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val mockLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( @@ -4193,8 +4192,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) - val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val dummyLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index e41cc011c313f..8224ab2382549 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -20,9 +20,7 @@ import org.apache.kafka.common.config.ConfigDef; import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import java.util.Objects; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; @@ -34,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; -public final class RemoteLogManagerConfig { +public final class RemoteLogManagerConfig extends AbstractConfig { /** * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having @@ -188,10 +186,8 @@ public final class RemoteLogManagerConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; - public static final ConfigDef CONFIG_DEF = new ConfigDef(); - - static { - CONFIG_DEF + public static ConfigDef configDef() { + return new ConfigDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, @@ -358,291 +354,128 @@ public final class RemoteLogManagerConfig { REMOTE_FETCH_MAX_WAIT_MS_DOC); } - private final boolean enableRemoteStorageSystem; - private final String remoteStorageManagerClassName; - private final String remoteStorageManagerClassPath; - private final String remoteLogMetadataManagerClassName; - private final String remoteLogMetadataManagerClassPath; - private final long remoteLogIndexFileCacheTotalSizeBytes; - private final int remoteLogManagerThreadPoolSize; - private final int remoteLogManagerCopierThreadPoolSize; - private final int remoteLogManagerExpirationThreadPoolSize; - private final long remoteLogManagerTaskIntervalMs; - private final long remoteLogManagerTaskRetryBackoffMs; - private final long remoteLogManagerTaskRetryBackoffMaxMs; - private final double remoteLogManagerTaskRetryJitter; - private final int remoteLogReaderThreads; - private final int remoteLogReaderMaxPendingTasks; - private final String remoteStorageManagerPrefix; - private final HashMap remoteStorageManagerProps; - private final String remoteLogMetadataManagerPrefix; - private final HashMap remoteLogMetadataManagerProps; - private final String remoteLogMetadataManagerListenerName; - private final int remoteLogMetadataCustomMetadataMaxBytes; - private final long remoteLogManagerCopyMaxBytesPerSecond; - private final int remoteLogManagerCopyNumQuotaSamples; - private final int remoteLogManagerCopyQuotaWindowSizeSeconds; - private final long remoteLogManagerFetchMaxBytesPerSecond; - private final int remoteLogManagerFetchNumQuotaSamples; - private final int remoteLogManagerFetchQuotaWindowSizeSeconds; - private final int remoteFetchMaxWaitMs; - - public RemoteLogManagerConfig(AbstractConfig config) { - this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP), - config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP), - config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP), - config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP), - config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP), - config.getInt(REMOTE_LOG_READER_THREADS_PROP), - config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), - config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null - ? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap(), - config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null - ? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap(), - config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), - config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), - config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); - } - - // Visible for testing - public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, - String remoteStorageManagerClassName, - String remoteStorageManagerClassPath, - String remoteLogMetadataManagerClassName, - String remoteLogMetadataManagerClassPath, - String remoteLogMetadataManagerListenerName, - long remoteLogIndexFileCacheTotalSizeBytes, - int remoteLogManagerThreadPoolSize, - int remoteLogManagerCopierThreadPoolSize, - int remoteLogManagerExpirationThreadPoolSize, - long remoteLogManagerTaskIntervalMs, - long remoteLogManagerTaskRetryBackoffMs, - long remoteLogManagerTaskRetryBackoffMaxMs, - double remoteLogManagerTaskRetryJitter, - int remoteLogReaderThreads, - int remoteLogReaderMaxPendingTasks, - int remoteLogMetadataCustomMetadataMaxBytes, - String remoteStorageManagerPrefix, - Map remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */ - String remoteLogMetadataManagerPrefix, - Map remoteLogMetadataManagerProps, /* properties having keys stripped out with remoteLogMetadataManagerPrefix */ - long remoteLogManagerCopyMaxBytesPerSecond, - int remoteLogManagerCopyNumQuotaSamples, - int remoteLogManagerCopyQuotaWindowSizeSeconds, - long remoteLogManagerFetchMaxBytesPerSecond, - int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds, - int remoteFetchMaxWaitMs) { - this.enableRemoteStorageSystem = enableRemoteStorageSystem; - this.remoteStorageManagerClassName = remoteStorageManagerClassName; - this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; - this.remoteLogMetadataManagerClassName = remoteLogMetadataManagerClassName; - this.remoteLogMetadataManagerClassPath = remoteLogMetadataManagerClassPath; - this.remoteLogIndexFileCacheTotalSizeBytes = remoteLogIndexFileCacheTotalSizeBytes; - this.remoteLogManagerThreadPoolSize = remoteLogManagerThreadPoolSize; - this.remoteLogManagerCopierThreadPoolSize = remoteLogManagerCopierThreadPoolSize; - this.remoteLogManagerExpirationThreadPoolSize = remoteLogManagerExpirationThreadPoolSize; - this.remoteLogManagerTaskIntervalMs = remoteLogManagerTaskIntervalMs; - this.remoteLogManagerTaskRetryBackoffMs = remoteLogManagerTaskRetryBackoffMs; - this.remoteLogManagerTaskRetryBackoffMaxMs = remoteLogManagerTaskRetryBackoffMaxMs; - this.remoteLogManagerTaskRetryJitter = remoteLogManagerTaskRetryJitter; - this.remoteLogReaderThreads = remoteLogReaderThreads; - this.remoteLogReaderMaxPendingTasks = remoteLogReaderMaxPendingTasks; - this.remoteStorageManagerPrefix = remoteStorageManagerPrefix; - this.remoteStorageManagerProps = new HashMap<>(remoteStorageManagerProps); - this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix; - this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps); - this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName; - this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes; - this.remoteLogManagerCopyMaxBytesPerSecond = remoteLogManagerCopyMaxBytesPerSecond; - this.remoteLogManagerCopyNumQuotaSamples = remoteLogManagerCopyNumQuotaSamples; - this.remoteLogManagerCopyQuotaWindowSizeSeconds = remoteLogManagerCopyQuotaWindowSizeSeconds; - this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; - this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; - this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; - this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; + public RemoteLogManagerConfig(Map props) { + super(configDef(), props); } public boolean enableRemoteStorageSystem() { - return enableRemoteStorageSystem; + return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); } public String remoteStorageManagerClassName() { - return remoteStorageManagerClassName; + return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); } public String remoteStorageManagerClassPath() { - return remoteStorageManagerClassPath; + return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); } public String remoteLogMetadataManagerClassName() { - return remoteLogMetadataManagerClassName; + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } public String remoteLogMetadataManagerClassPath() { - return remoteLogMetadataManagerClassPath; + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } public long remoteLogIndexFileCacheTotalSizeBytes() { - return remoteLogIndexFileCacheTotalSizeBytes; + return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); } public int remoteLogManagerThreadPoolSize() { - return remoteLogManagerThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerCopierThreadPoolSize() { - return remoteLogManagerCopierThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { - return remoteLogManagerExpirationThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); } public long remoteLogManagerTaskIntervalMs() { - return remoteLogManagerTaskIntervalMs; + return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMs() { - return remoteLogManagerTaskRetryBackoffMs; + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMaxMs() { - return remoteLogManagerTaskRetryBackoffMaxMs; + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); } public double remoteLogManagerTaskRetryJitter() { - return remoteLogManagerTaskRetryJitter; + return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); } public int remoteLogReaderThreads() { - return remoteLogReaderThreads; + return getInt(REMOTE_LOG_READER_THREADS_PROP); } public int remoteLogReaderMaxPendingTasks() { - return remoteLogReaderMaxPendingTasks; + return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); } public String remoteLogMetadataManagerListenerName() { - return remoteLogMetadataManagerListenerName; + return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); } public int remoteLogMetadataCustomMetadataMaxBytes() { - return remoteLogMetadataCustomMetadataMaxBytes; + return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); } public String remoteStorageManagerPrefix() { - return remoteStorageManagerPrefix; + return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public String remoteLogMetadataManagerPrefix() { - return remoteLogMetadataManagerPrefix; + return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); } public Map remoteStorageManagerProps() { - return Collections.unmodifiableMap(remoteStorageManagerProps); + return getConfigProps(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public Map remoteLogMetadataManagerProps() { - return Collections.unmodifiableMap(remoteLogMetadataManagerProps); + return getConfigProps(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); + } + + public Map getConfigProps(String configPrefixProp) { + String prefixProp = getString(configPrefixProp); + return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); } public long remoteLogManagerCopyMaxBytesPerSecond() { - return remoteLogManagerCopyMaxBytesPerSecond; + return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); } public int remoteLogManagerCopyNumQuotaSamples() { - return remoteLogManagerCopyNumQuotaSamples; + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerCopyQuotaWindowSizeSeconds() { - return remoteLogManagerCopyQuotaWindowSizeSeconds; + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public long remoteLogManagerFetchMaxBytesPerSecond() { - return remoteLogManagerFetchMaxBytesPerSecond; + return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); } public int remoteLogManagerFetchNumQuotaSamples() { - return remoteLogManagerFetchNumQuotaSamples; + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerFetchQuotaWindowSizeSeconds() { - return remoteLogManagerFetchQuotaWindowSizeSeconds; + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public int remoteFetchMaxWaitMs() { - return remoteFetchMaxWaitMs; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof RemoteLogManagerConfig)) return false; - RemoteLogManagerConfig that = (RemoteLogManagerConfig) o; - return enableRemoteStorageSystem == that.enableRemoteStorageSystem - && remoteLogIndexFileCacheTotalSizeBytes == that.remoteLogIndexFileCacheTotalSizeBytes - && remoteLogManagerThreadPoolSize == that.remoteLogManagerThreadPoolSize - && remoteLogManagerCopierThreadPoolSize == that.remoteLogManagerCopierThreadPoolSize - && remoteLogManagerExpirationThreadPoolSize == that.remoteLogManagerExpirationThreadPoolSize - && remoteLogManagerTaskIntervalMs == that.remoteLogManagerTaskIntervalMs - && remoteLogManagerTaskRetryBackoffMs == that.remoteLogManagerTaskRetryBackoffMs - && remoteLogManagerTaskRetryBackoffMaxMs == that.remoteLogManagerTaskRetryBackoffMaxMs - && remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter - && remoteLogReaderThreads == that.remoteLogReaderThreads - && remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks - && remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes - && Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName) - && Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath) - && Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName) - && Objects.equals(remoteLogMetadataManagerClassPath, that.remoteLogMetadataManagerClassPath) - && Objects.equals(remoteLogMetadataManagerListenerName, that.remoteLogMetadataManagerListenerName) - && Objects.equals(remoteStorageManagerProps, that.remoteStorageManagerProps) - && Objects.equals(remoteLogMetadataManagerProps, that.remoteLogMetadataManagerProps) - && Objects.equals(remoteStorageManagerPrefix, that.remoteStorageManagerPrefix) - && Objects.equals(remoteLogMetadataManagerPrefix, that.remoteLogMetadataManagerPrefix) - && remoteLogManagerCopyMaxBytesPerSecond == that.remoteLogManagerCopyMaxBytesPerSecond - && remoteLogManagerCopyNumQuotaSamples == that.remoteLogManagerCopyNumQuotaSamples - && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds - && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond - && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds - && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; - } - - @Override - public int hashCode() { - return Objects.hash( - enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath, - remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName, - remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, - remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs, - remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter, - remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, - remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, - remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); + return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); } public static void main(String[] args) { - System.out.println(CONFIG_DEF.toHtml(4, config -> "remote_log_manager_" + config)); + System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 058f81626d158..3e6702f84c063 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,184 +16,98 @@ */ package org.apache.kafka.server.log.remote.storage; -import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RemoteLogManagerConfigTest { - private static class TestConfig extends AbstractConfig { - public TestConfig(Map originals) { - super(RemoteLogManagerConfig.CONFIG_DEF, originals, true); - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { + @Test + public void testValidConfigs() { String rsmPrefix = "__custom.rsm."; String rlmmPrefix = "__custom.rlmm."; Map rsmProps = Collections.singletonMap("rsm.prop", "val"); Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); - RemoteLogManagerConfig expectedRemoteLogManagerConfig = getRemoteLogManagerConfig(useDefaultRemoteLogMetadataManagerClass, - rsmPrefix, - rlmmPrefix, - rsmProps, - rlmmProps); - Map props = extractProps(expectedRemoteLogManagerConfig); + Map props = getRLMProps(rsmPrefix, rlmmPrefix); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); + + RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); + // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. - if (useDefaultRemoteLogMetadataManagerClass) { - props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); - } - TestConfig config = new TestConfig(props); - RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config); - assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig); + props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + + RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); + assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); + + assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); + assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); } - private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefaultRemoteLogMetadataManagerClass, - String rsmPrefix, - String rlmmPrefix, - Map rsmProps, - Map rlmmProps) { - String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class"; - return new RemoteLogManagerConfig(true, - "dummy.remote.storage.class", - "dummy.remote.storage.class.path", - remoteLogMetadataManagerClass, - "dummy.remote.log.metadata.class.path", - "listener.name", - 1024 * 1024L, - 1, - 1, - 1, - 60000L, - 100L, - 60000L, - 0.3, - 10, - 100, - 100, - rsmPrefix, - rsmProps, - rlmmPrefix, - rlmmProps, - Long.MAX_VALUE, - 11, - 1, - Long.MAX_VALUE, - 11, - 1, - 500); + @Test + public void testDefaultConfigs() { + // Even with empty properties, RemoteLogManagerConfig has default values + Map emptyProps = new HashMap<>(); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); + } + + @Test + public void testValidateEmptyStringConfig() { + // Test with a empty string props should throw ConfigException + Map emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); + assertThrows(ConfigException.class, () -> + new RemoteLogManagerConfig(emptyStringProps)); } - private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) { + private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { + Map props = new HashMap<>(); - props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, - remoteLogManagerConfig.enableRemoteStorageSystem()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, - remoteLogManagerConfig.remoteStorageManagerClassName()); + "dummy.remote.storage.class"); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, - remoteLogManagerConfig.remoteStorageManagerClassPath()); + "dummy.remote.storage.class.path"); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerClassName()); + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerClassPath()); + "dummy.remote.log.metadata.class.path"); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerListenerName()); + "listener.name"); props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, - remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()); + 1024 * 1024L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskIntervalMs()); + 60000L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMs()); + 100L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMaxMs()); + 60000L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryJitter()); + 0.3); props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, - remoteLogManagerConfig.remoteLogReaderThreads()); + 10); props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, - remoteLogManagerConfig.remoteLogReaderMaxPendingTasks()); + 100); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP, - remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes()); + 100); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, - remoteLogManagerConfig.remoteStorageManagerPrefix()); + rsmPrefix); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerPrefix()); + rlmmPrefix); return props; } - - @Test - public void testHashCodeAndEquals_ForAllAndTwoFields() { - String rsmPrefix = "__custom.rsm."; - String rlmmPrefix = "__custom.rlmm."; - Map rsmProps = Collections.singletonMap("rsm.prop", "val"); - Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); - RemoteLogManagerConfig config1 = getRemoteLogManagerConfig(false, - rsmPrefix, - rlmmPrefix, - rsmProps, - rlmmProps); - RemoteLogManagerConfig config2 = getRemoteLogManagerConfig(false, - rsmPrefix, - rlmmPrefix, - rsmProps, - rlmmProps); - - // Initially, hash codes should be equal for default objects - assertEquals(config1.hashCode(), config2.hashCode()); - - // Initially, objects should be equal - assertEquals(config1, config2); - - // Test for specific field remoteLogManagerCopierThreadPoolSize - RemoteLogManagerConfig config3 = new RemoteLogManagerConfig(true, "dummy.remote.storage.class", - "dummy.remote.storage.class.path", - "dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path", - "listener.name", - 1024 * 1024L, - 1, - 2, // Change here - 2, // Change here - 60000L, - 100L, - 60000L, - 0.3, - 10, - 100, - 100, - rsmPrefix, - rsmProps, - rlmmPrefix, - rlmmProps, - Long.MAX_VALUE, - 11, - 1, - Long.MAX_VALUE, - 11, - 1, - 500); - - assertNotEquals(config1.hashCode(), config3.hashCode()); - assertNotEquals(config1, config3); - } -} +} \ No newline at end of file