From 0c296a1bf2adf13810695ffbaaa375d376d204b7 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 4 Jun 2024 21:29:40 +0200 Subject: [PATCH 01/13] Refactor RemoteLogManagerConfig with AbstractConfig --- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 4 +- .../kafka/server/ReplicaManagerTest.scala | 8 +- .../storage/RemoteLogManagerConfig.java | 238 +++--------------- .../storage/RemoteLogManagerConfigTest.java | 163 +++--------- 5 files changed, 73 insertions(+), 342 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 45ec15b1008f8..da923b84a36a8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props) def remoteLogManagerConfig = _remoteLogManagerConfig private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 50b581fdf4ee5..9badaabf41907 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; @@ -2742,8 +2741,7 @@ private RemoteLogManagerConfig createRLMConfig(Properties props) { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); - AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props); - return new RemoteLogManagerConfig(config); + return new RemoteLogManagerConfig(props); } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bd78d967801fc..b36d3b262942f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -4092,8 +4092,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) - val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) - val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val mockLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( @@ -4192,8 +4191,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) - val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) - val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val dummyLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index e41cc011c313f..d14716e61d7e3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -20,9 +20,7 @@ import org.apache.kafka.common.config.ConfigDef; import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import java.util.Objects; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; @@ -34,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; -public final class RemoteLogManagerConfig { +public final class RemoteLogManagerConfig extends AbstractConfig { /** * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having @@ -358,288 +356,124 @@ public final class RemoteLogManagerConfig { REMOTE_FETCH_MAX_WAIT_MS_DOC); } - private final boolean enableRemoteStorageSystem; - private final String remoteStorageManagerClassName; - private final String remoteStorageManagerClassPath; - private final String remoteLogMetadataManagerClassName; - private final String remoteLogMetadataManagerClassPath; - private final long remoteLogIndexFileCacheTotalSizeBytes; - private final int remoteLogManagerThreadPoolSize; - private final int remoteLogManagerCopierThreadPoolSize; - private final int remoteLogManagerExpirationThreadPoolSize; - private final long remoteLogManagerTaskIntervalMs; - private final long remoteLogManagerTaskRetryBackoffMs; - private final long remoteLogManagerTaskRetryBackoffMaxMs; - private final double remoteLogManagerTaskRetryJitter; - private final int remoteLogReaderThreads; - private final int remoteLogReaderMaxPendingTasks; - private final String remoteStorageManagerPrefix; - private final HashMap remoteStorageManagerProps; - private final String remoteLogMetadataManagerPrefix; - private final HashMap remoteLogMetadataManagerProps; - private final String remoteLogMetadataManagerListenerName; - private final int remoteLogMetadataCustomMetadataMaxBytes; - private final long remoteLogManagerCopyMaxBytesPerSecond; - private final int remoteLogManagerCopyNumQuotaSamples; - private final int remoteLogManagerCopyQuotaWindowSizeSeconds; - private final long remoteLogManagerFetchMaxBytesPerSecond; - private final int remoteLogManagerFetchNumQuotaSamples; - private final int remoteLogManagerFetchQuotaWindowSizeSeconds; - private final int remoteFetchMaxWaitMs; - - public RemoteLogManagerConfig(AbstractConfig config) { - this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP), - config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP), - config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP), - config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP), - config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP), - config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP), - config.getInt(REMOTE_LOG_READER_THREADS_PROP), - config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), - config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null - ? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap(), - config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP), - config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null - ? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap(), - config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), - config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), - config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); + public RemoteLogManagerConfig(Map props) { + this(props, false); } - // Visible for testing - public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, - String remoteStorageManagerClassName, - String remoteStorageManagerClassPath, - String remoteLogMetadataManagerClassName, - String remoteLogMetadataManagerClassPath, - String remoteLogMetadataManagerListenerName, - long remoteLogIndexFileCacheTotalSizeBytes, - int remoteLogManagerThreadPoolSize, - int remoteLogManagerCopierThreadPoolSize, - int remoteLogManagerExpirationThreadPoolSize, - long remoteLogManagerTaskIntervalMs, - long remoteLogManagerTaskRetryBackoffMs, - long remoteLogManagerTaskRetryBackoffMaxMs, - double remoteLogManagerTaskRetryJitter, - int remoteLogReaderThreads, - int remoteLogReaderMaxPendingTasks, - int remoteLogMetadataCustomMetadataMaxBytes, - String remoteStorageManagerPrefix, - Map remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */ - String remoteLogMetadataManagerPrefix, - Map remoteLogMetadataManagerProps, /* properties having keys stripped out with remoteLogMetadataManagerPrefix */ - long remoteLogManagerCopyMaxBytesPerSecond, - int remoteLogManagerCopyNumQuotaSamples, - int remoteLogManagerCopyQuotaWindowSizeSeconds, - long remoteLogManagerFetchMaxBytesPerSecond, - int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds, - int remoteFetchMaxWaitMs) { - this.enableRemoteStorageSystem = enableRemoteStorageSystem; - this.remoteStorageManagerClassName = remoteStorageManagerClassName; - this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; - this.remoteLogMetadataManagerClassName = remoteLogMetadataManagerClassName; - this.remoteLogMetadataManagerClassPath = remoteLogMetadataManagerClassPath; - this.remoteLogIndexFileCacheTotalSizeBytes = remoteLogIndexFileCacheTotalSizeBytes; - this.remoteLogManagerThreadPoolSize = remoteLogManagerThreadPoolSize; - this.remoteLogManagerCopierThreadPoolSize = remoteLogManagerCopierThreadPoolSize; - this.remoteLogManagerExpirationThreadPoolSize = remoteLogManagerExpirationThreadPoolSize; - this.remoteLogManagerTaskIntervalMs = remoteLogManagerTaskIntervalMs; - this.remoteLogManagerTaskRetryBackoffMs = remoteLogManagerTaskRetryBackoffMs; - this.remoteLogManagerTaskRetryBackoffMaxMs = remoteLogManagerTaskRetryBackoffMaxMs; - this.remoteLogManagerTaskRetryJitter = remoteLogManagerTaskRetryJitter; - this.remoteLogReaderThreads = remoteLogReaderThreads; - this.remoteLogReaderMaxPendingTasks = remoteLogReaderMaxPendingTasks; - this.remoteStorageManagerPrefix = remoteStorageManagerPrefix; - this.remoteStorageManagerProps = new HashMap<>(remoteStorageManagerProps); - this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix; - this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps); - this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName; - this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes; - this.remoteLogManagerCopyMaxBytesPerSecond = remoteLogManagerCopyMaxBytesPerSecond; - this.remoteLogManagerCopyNumQuotaSamples = remoteLogManagerCopyNumQuotaSamples; - this.remoteLogManagerCopyQuotaWindowSizeSeconds = remoteLogManagerCopyQuotaWindowSizeSeconds; - this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; - this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; - this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; - this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; + protected RemoteLogManagerConfig(Map props, boolean doLog) { + super(CONFIG_DEF, props, doLog); } public boolean enableRemoteStorageSystem() { - return enableRemoteStorageSystem; - } - - public String remoteStorageManagerClassName() { - return remoteStorageManagerClassName; + return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); } public String remoteStorageManagerClassPath() { - return remoteStorageManagerClassPath; + return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); } public String remoteLogMetadataManagerClassName() { - return remoteLogMetadataManagerClassName; + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } public String remoteLogMetadataManagerClassPath() { - return remoteLogMetadataManagerClassPath; + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } public long remoteLogIndexFileCacheTotalSizeBytes() { - return remoteLogIndexFileCacheTotalSizeBytes; + return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); } public int remoteLogManagerThreadPoolSize() { - return remoteLogManagerThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerCopierThreadPoolSize() { - return remoteLogManagerCopierThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { - return remoteLogManagerExpirationThreadPoolSize; + return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); } public long remoteLogManagerTaskIntervalMs() { - return remoteLogManagerTaskIntervalMs; + return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMs() { - return remoteLogManagerTaskRetryBackoffMs; + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMaxMs() { - return remoteLogManagerTaskRetryBackoffMaxMs; + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); } public double remoteLogManagerTaskRetryJitter() { - return remoteLogManagerTaskRetryJitter; + return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); } public int remoteLogReaderThreads() { - return remoteLogReaderThreads; + return getInt(REMOTE_LOG_READER_THREADS_PROP); } public int remoteLogReaderMaxPendingTasks() { - return remoteLogReaderMaxPendingTasks; + return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); } public String remoteLogMetadataManagerListenerName() { - return remoteLogMetadataManagerListenerName; + return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); } public int remoteLogMetadataCustomMetadataMaxBytes() { - return remoteLogMetadataCustomMetadataMaxBytes; + return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); } public String remoteStorageManagerPrefix() { - return remoteStorageManagerPrefix; + return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public String remoteLogMetadataManagerPrefix() { - return remoteLogMetadataManagerPrefix; + return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); } public Map remoteStorageManagerProps() { - return Collections.unmodifiableMap(remoteStorageManagerProps); + return Collections.unmodifiableMap(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null + ? originalsWithPrefix(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap()); } public Map remoteLogMetadataManagerProps() { - return Collections.unmodifiableMap(remoteLogMetadataManagerProps); + return Collections.unmodifiableMap(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null + ? originalsWithPrefix(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap()); } public long remoteLogManagerCopyMaxBytesPerSecond() { - return remoteLogManagerCopyMaxBytesPerSecond; + return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); } public int remoteLogManagerCopyNumQuotaSamples() { - return remoteLogManagerCopyNumQuotaSamples; + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerCopyQuotaWindowSizeSeconds() { - return remoteLogManagerCopyQuotaWindowSizeSeconds; + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public long remoteLogManagerFetchMaxBytesPerSecond() { - return remoteLogManagerFetchMaxBytesPerSecond; + return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); } public int remoteLogManagerFetchNumQuotaSamples() { - return remoteLogManagerFetchNumQuotaSamples; + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerFetchQuotaWindowSizeSeconds() { - return remoteLogManagerFetchQuotaWindowSizeSeconds; + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public int remoteFetchMaxWaitMs() { - return remoteFetchMaxWaitMs; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof RemoteLogManagerConfig)) return false; - RemoteLogManagerConfig that = (RemoteLogManagerConfig) o; - return enableRemoteStorageSystem == that.enableRemoteStorageSystem - && remoteLogIndexFileCacheTotalSizeBytes == that.remoteLogIndexFileCacheTotalSizeBytes - && remoteLogManagerThreadPoolSize == that.remoteLogManagerThreadPoolSize - && remoteLogManagerCopierThreadPoolSize == that.remoteLogManagerCopierThreadPoolSize - && remoteLogManagerExpirationThreadPoolSize == that.remoteLogManagerExpirationThreadPoolSize - && remoteLogManagerTaskIntervalMs == that.remoteLogManagerTaskIntervalMs - && remoteLogManagerTaskRetryBackoffMs == that.remoteLogManagerTaskRetryBackoffMs - && remoteLogManagerTaskRetryBackoffMaxMs == that.remoteLogManagerTaskRetryBackoffMaxMs - && remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter - && remoteLogReaderThreads == that.remoteLogReaderThreads - && remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks - && remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes - && Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName) - && Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath) - && Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName) - && Objects.equals(remoteLogMetadataManagerClassPath, that.remoteLogMetadataManagerClassPath) - && Objects.equals(remoteLogMetadataManagerListenerName, that.remoteLogMetadataManagerListenerName) - && Objects.equals(remoteStorageManagerProps, that.remoteStorageManagerProps) - && Objects.equals(remoteLogMetadataManagerProps, that.remoteLogMetadataManagerProps) - && Objects.equals(remoteStorageManagerPrefix, that.remoteStorageManagerPrefix) - && Objects.equals(remoteLogMetadataManagerPrefix, that.remoteLogMetadataManagerPrefix) - && remoteLogManagerCopyMaxBytesPerSecond == that.remoteLogManagerCopyMaxBytesPerSecond - && remoteLogManagerCopyNumQuotaSamples == that.remoteLogManagerCopyNumQuotaSamples - && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds - && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond - && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds - && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; - } - - @Override - public int hashCode() { - return Objects.hash( - enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath, - remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName, - remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, - remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs, - remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter, - remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, - remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, - remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); + return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 058f81626d158..1c3eb46895a27 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.server.log.remote.storage; -import org.apache.kafka.common.config.AbstractConfig; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -27,17 +25,13 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; public class RemoteLogManagerConfigTest { - private static class TestConfig extends AbstractConfig { - public TestConfig(Map originals) { - super(RemoteLogManagerConfig.CONFIG_DEF, originals, true); - } - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { @@ -45,155 +39,62 @@ public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { String rlmmPrefix = "__custom.rlmm."; Map rsmProps = Collections.singletonMap("rsm.prop", "val"); Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); - RemoteLogManagerConfig expectedRemoteLogManagerConfig = getRemoteLogManagerConfig(useDefaultRemoteLogMetadataManagerClass, - rsmPrefix, - rlmmPrefix, - rsmProps, - rlmmProps); - Map props = extractProps(expectedRemoteLogManagerConfig); + Map props = getRLMProps(useDefaultRemoteLogMetadataManagerClass, rsmProps, rlmmProps); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); + + RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); + // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. if (useDefaultRemoteLogMetadataManagerClass) { props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } - TestConfig config = new TestConfig(props); - RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config); - assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig); + RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); + assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); } - private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefaultRemoteLogMetadataManagerClass, - String rsmPrefix, - String rlmmPrefix, - Map rsmProps, - Map rlmmProps) { + private Map getRLMProps(boolean useDefaultRemoteLogMetadataManagerClass, Map rsmProps, Map rlmmProps) { String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class"; - return new RemoteLogManagerConfig(true, - "dummy.remote.storage.class", - "dummy.remote.storage.class.path", - remoteLogMetadataManagerClass, - "dummy.remote.log.metadata.class.path", - "listener.name", - 1024 * 1024L, - 1, - 1, - 1, - 60000L, - 100L, - 60000L, - 0.3, - 10, - 100, - 100, - rsmPrefix, - rsmProps, - rlmmPrefix, - rlmmProps, - Long.MAX_VALUE, - 11, - 1, - Long.MAX_VALUE, - 11, - 1, - 500); - } - private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) { Map props = new HashMap<>(); - props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, - remoteLogManagerConfig.enableRemoteStorageSystem()); + props.put(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, - remoteLogManagerConfig.remoteStorageManagerClassName()); - props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, - remoteLogManagerConfig.remoteStorageManagerClassPath()); + "dummy.remote.storage.class"); + props.put(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, + "dummy.remote.storage.class.path"); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerClassName()); + remoteLogMetadataManagerClass); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerClassPath()); - props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerListenerName()); + "dummy.remote.log.metadata.class.path"); + props.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + "listener.name"); props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, - remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()); + 1024 * 1024L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, - remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize()); + 1); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskIntervalMs()); + 60000L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMs()); + 100L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMaxMs()); + 60000L); props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP, - remoteLogManagerConfig.remoteLogManagerTaskRetryJitter()); + 0.3); props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, - remoteLogManagerConfig.remoteLogReaderThreads()); + 10); props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, - remoteLogManagerConfig.remoteLogReaderMaxPendingTasks()); + 100); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP, - remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes()); + 100); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, - remoteLogManagerConfig.remoteStorageManagerPrefix()); + rsmProps); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, - remoteLogManagerConfig.remoteLogMetadataManagerPrefix()); - return props; - } - - @Test - public void testHashCodeAndEquals_ForAllAndTwoFields() { - String rsmPrefix = "__custom.rsm."; - String rlmmPrefix = "__custom.rlmm."; - Map rsmProps = Collections.singletonMap("rsm.prop", "val"); - Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); - RemoteLogManagerConfig config1 = getRemoteLogManagerConfig(false, - rsmPrefix, - rlmmPrefix, - rsmProps, - rlmmProps); - RemoteLogManagerConfig config2 = getRemoteLogManagerConfig(false, - rsmPrefix, - rlmmPrefix, - rsmProps, rlmmProps); - - // Initially, hash codes should be equal for default objects - assertEquals(config1.hashCode(), config2.hashCode()); - - // Initially, objects should be equal - assertEquals(config1, config2); - - // Test for specific field remoteLogManagerCopierThreadPoolSize - RemoteLogManagerConfig config3 = new RemoteLogManagerConfig(true, "dummy.remote.storage.class", - "dummy.remote.storage.class.path", - "dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path", - "listener.name", - 1024 * 1024L, - 1, - 2, // Change here - 2, // Change here - 60000L, - 100L, - 60000L, - 0.3, - 10, - 100, - 100, - rsmPrefix, - rsmProps, - rlmmPrefix, - rlmmProps, - Long.MAX_VALUE, - 11, - 1, - Long.MAX_VALUE, - 11, - 1, - 500); - - assertNotEquals(config1.hashCode(), config3.hashCode()); - assertNotEquals(config1, config3); + return props; } -} +} \ No newline at end of file From 1d0d83150861c4257ff612dc0bf91b852b83877a Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 4 Jun 2024 22:03:24 +0200 Subject: [PATCH 02/13] Refactor RemoteLogManagerConfig with AbstractConfig --- .../server/log/remote/storage/RemoteLogManagerConfig.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index d14716e61d7e3..07bacff42271a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -368,6 +368,10 @@ public boolean enableRemoteStorageSystem() { return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); } + public String remoteStorageManagerClassName() { + return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); + } + public String remoteStorageManagerClassPath() { return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); } From 4344ed625870fbd63a3846df93237bb357e47c38 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 4 Jun 2024 23:00:57 +0200 Subject: [PATCH 03/13] Refactor RemoteLogManagerConfig with AbstractConfig --- .../kafka/server/ReplicaManagerTest.scala | 2 +- .../storage/RemoteLogManagerConfig.java | 112 ++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b36d3b262942f..63eb6e2dd6c58 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.config.{TopicConfig} import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 07bacff42271a..a81a691a2a49b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -356,6 +356,118 @@ public final class RemoteLogManagerConfig extends AbstractConfig { REMOTE_FETCH_MAX_WAIT_MS_DOC); } + public boolean enableRemoteStorageSystem() { + return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + } + + public String remoteStorageManagerClassName() { + return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); + } + + public String remoteStorageManagerClassPath() { + return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); + } + + public String remoteLogMetadataManagerClassName() { + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + } + + public String remoteLogMetadataManagerClassPath() { + return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); + } + + public long remoteLogIndexFileCacheTotalSizeBytes() { + return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); + } + + public int remoteLogManagerThreadPoolSize() { + return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); + } + + public int remoteLogManagerCopierThreadPoolSize() { + return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + } + + public int remoteLogManagerExpirationThreadPoolSize() { + return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + } + + public long remoteLogManagerTaskIntervalMs() { + return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); + } + + public long remoteLogManagerTaskRetryBackoffMs() { + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); + } + + public long remoteLogManagerTaskRetryBackoffMaxMs() { + return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); + } + + public double remoteLogManagerTaskRetryJitter() { + return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); + } + + public int remoteLogReaderThreads() { + return getInt(REMOTE_LOG_READER_THREADS_PROP); + } + + public int remoteLogReaderMaxPendingTasks() { + return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); + } + + public String remoteLogMetadataManagerListenerName() { + return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); + } + + public int remoteLogMetadataCustomMetadataMaxBytes() { + return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); + } + + public String remoteStorageManagerPrefix() { + return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); + } + + public String remoteLogMetadataManagerPrefix() { + return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); + } + + public Map remoteStorageManagerProps() { + return Collections.unmodifiableMap(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null + ? originalsWithPrefix(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap()); + } + + public Map remoteLogMetadataManagerProps() { + return Collections.unmodifiableMap(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null + ? originalsWithPrefix(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap()); + } + + public long remoteLogManagerCopyMaxBytesPerSecond() { + return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); + } + + public int remoteLogManagerCopyNumQuotaSamples() { + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); + } + + public int remoteLogManagerCopyQuotaWindowSizeSeconds() { + return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); + } + + public long remoteLogManagerFetchMaxBytesPerSecond() { + return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); + } + + public int remoteLogManagerFetchNumQuotaSamples() { + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); + } + + public int remoteLogManagerFetchQuotaWindowSizeSeconds() { + return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); + } + public RemoteLogManagerConfig(Map props) { this(props, false); } From 9e69e8bdb622379b30e1b658426c799c680070ff Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 17:06:21 +0200 Subject: [PATCH 04/13] Fix from review --- .../storage/RemoteLogManagerConfig.java | 120 +----------------- 1 file changed, 6 insertions(+), 114 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index a81a691a2a49b..6dbe5f0f399a5 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -186,10 +186,14 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; - public static final ConfigDef CONFIG_DEF = new ConfigDef(); + public static final ConfigDef CONFIG_DEF; + + public static ConfigDef configDef() { + return CONFIG_DEF; + } static { - CONFIG_DEF + CONFIG_DEF = new ConfigDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, @@ -356,118 +360,6 @@ public final class RemoteLogManagerConfig extends AbstractConfig { REMOTE_FETCH_MAX_WAIT_MS_DOC); } - public boolean enableRemoteStorageSystem() { - return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); - } - - public String remoteStorageManagerClassName() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); - } - - public String remoteStorageManagerClassPath() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); - } - - public String remoteLogMetadataManagerClassName() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); - } - - public String remoteLogMetadataManagerClassPath() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); - } - - public long remoteLogIndexFileCacheTotalSizeBytes() { - return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); - } - - public int remoteLogManagerThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); - } - - public int remoteLogManagerCopierThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); - } - - public int remoteLogManagerExpirationThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); - } - - public long remoteLogManagerTaskIntervalMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); - } - - public long remoteLogManagerTaskRetryBackoffMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); - } - - public long remoteLogManagerTaskRetryBackoffMaxMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); - } - - public double remoteLogManagerTaskRetryJitter() { - return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); - } - - public int remoteLogReaderThreads() { - return getInt(REMOTE_LOG_READER_THREADS_PROP); - } - - public int remoteLogReaderMaxPendingTasks() { - return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); - } - - public String remoteLogMetadataManagerListenerName() { - return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); - } - - public int remoteLogMetadataCustomMetadataMaxBytes() { - return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); - } - - public String remoteStorageManagerPrefix() { - return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); - } - - public String remoteLogMetadataManagerPrefix() { - return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); - } - - public Map remoteStorageManagerProps() { - return Collections.unmodifiableMap(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null - ? originalsWithPrefix(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap()); - } - - public Map remoteLogMetadataManagerProps() { - return Collections.unmodifiableMap(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null - ? originalsWithPrefix(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap()); - } - - public long remoteLogManagerCopyMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); - } - - public int remoteLogManagerCopyNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); - } - - public int remoteLogManagerCopyQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); - } - - public long remoteLogManagerFetchMaxBytesPerSecond() { - return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); - } - - public int remoteLogManagerFetchNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); - } - - public int remoteLogManagerFetchQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); - } - public RemoteLogManagerConfig(Map props) { this(props, false); } From 208a254e7e71db2f31d117b81d2998a1184b52c8 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 17:40:30 +0200 Subject: [PATCH 05/13] Fix from review --- .../storage/RemoteLogManagerConfig.java | 6 +---- .../storage/RemoteLogManagerConfigTest.java | 27 +++++++++---------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6dbe5f0f399a5..6055611a07309 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -361,11 +361,7 @@ public static ConfigDef configDef() { } public RemoteLogManagerConfig(Map props) { - this(props, false); - } - - protected RemoteLogManagerConfig(Map props, boolean doLog) { - super(CONFIG_DEF, props, doLog); + super(CONFIG_DEF, props); } public boolean enableRemoteStorageSystem() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 1c3eb46895a27..e9a7abfbe1f3e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,8 +16,7 @@ */ package org.apache.kafka.server.log.remote.storage; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -32,30 +31,30 @@ public class RemoteLogManagerConfigTest { - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { + @Test + public void testValidConfigs() { String rsmPrefix = "__custom.rsm."; String rlmmPrefix = "__custom.rlmm."; Map rsmProps = Collections.singletonMap("rsm.prop", "val"); Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); - Map props = getRLMProps(useDefaultRemoteLogMetadataManagerClass, rsmProps, rlmmProps); + Map props = getRLMProps(rsmPrefix, rlmmPrefix); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. - if (useDefaultRemoteLogMetadataManagerClass) { - props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); - } + props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); + + assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); + assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); } - private Map getRLMProps(boolean useDefaultRemoteLogMetadataManagerClass, Map rsmProps, Map rlmmProps) { - String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class"; + private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { Map props = new HashMap<>(); props.put(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); @@ -64,7 +63,7 @@ private Map getRLMProps(boolean useDefaultRemoteLogMetadataManag props.put(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, "dummy.remote.storage.class.path"); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - remoteLogMetadataManagerClass); + DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, "dummy.remote.log.metadata.class.path"); props.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, @@ -92,9 +91,9 @@ private Map getRLMProps(boolean useDefaultRemoteLogMetadataManag props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP, 100); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, - rsmProps); + rsmPrefix); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, - rlmmProps); + rlmmPrefix); return props; } } \ No newline at end of file From 75fe88877bd9b618e8c2c3bd4d603202a5724822 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 18:25:44 +0200 Subject: [PATCH 06/13] Add empty string test --- .../remote/storage/RemoteLogManagerConfigTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index e9a7abfbe1f3e..a14eae2df17e3 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.log.remote.storage; +import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -28,6 +29,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RemoteLogManagerConfigTest { @@ -52,6 +54,17 @@ public void testValidConfigs() { assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); + + // Even with empty properties, RemoteLogManagerConfig has default values + Map emptyProps = new HashMap<>(); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); + assertEquals(remoteLogManagerConfigEmptyConfig.values().size(), 28); + + // Test with a empty string props should throw ConfigException + Map emptyStringProps = new HashMap<>(); + emptyStringProps.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); + assertThrows(ConfigException.class, () -> + new RemoteLogManagerConfig(emptyStringProps)); } private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { From f0359787aa314eb2a8dabff8d6b055ae665f6f44 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 18:31:38 +0200 Subject: [PATCH 07/13] Updating configdef --- .../server/log/remote/storage/RemoteLogManagerConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6055611a07309..052f41d8cf143 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -189,11 +189,11 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final ConfigDef CONFIG_DEF; public static ConfigDef configDef() { - return CONFIG_DEF; + return new ConfigDef(CONFIG_DEF); } static { - CONFIG_DEF = new ConfigDef() + CONFIG_DEF = configDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, From 7b9809a228a2a6b45c4f28a6c65aba950353068c Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 19:34:55 +0200 Subject: [PATCH 08/13] Removing CONFIG_DEF --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../log/remote/storage/RemoteLogManagerConfig.java | 12 +++--------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index da923b84a36a8..d1f5bf43c0f3d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -455,7 +455,7 @@ object KafkaConfig { } /** ********* Remote Log Management Configuration *********/ - RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) + RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key)) def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 052f41d8cf143..1f825a6347d99 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -186,14 +186,8 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; - public static final ConfigDef CONFIG_DEF; - public static ConfigDef configDef() { - return new ConfigDef(CONFIG_DEF); - } - - static { - CONFIG_DEF = configDef() + return new ConfigDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, @@ -361,7 +355,7 @@ public static ConfigDef configDef() { } public RemoteLogManagerConfig(Map props) { - super(CONFIG_DEF, props); + super(configDef(), props); } public boolean enableRemoteStorageSystem() { @@ -481,6 +475,6 @@ public int remoteFetchMaxWaitMs() { } public static void main(String[] args) { - System.out.println(CONFIG_DEF.toHtml(4, config -> "remote_log_manager_" + config)); + System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); } } From 4f36cddc1c07bce9d5b80576c8e909c26b53569e Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 20:29:41 +0200 Subject: [PATCH 09/13] Splitting tests --- .../remote/storage/RemoteLogManagerConfigTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index a14eae2df17e3..ee2f2b4aade2c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP; @@ -54,17 +55,22 @@ public void testValidConfigs() { assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); + } + @Test + public void testDefaultConfigs() { // Even with empty properties, RemoteLogManagerConfig has default values Map emptyProps = new HashMap<>(); RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); - assertEquals(remoteLogManagerConfigEmptyConfig.values().size(), 28); + assertEquals(remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize(), DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE); + } + @Test + public void testValidateEmptyStringConfig() { // Test with a empty string props should throw ConfigException - Map emptyStringProps = new HashMap<>(); - emptyStringProps.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); + Map emptyStringProps = Collections.singletonMap(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); assertThrows(ConfigException.class, () -> - new RemoteLogManagerConfig(emptyStringProps)); + new RemoteLogManagerConfig(emptyStringProps)); } private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { From f89c240bc0b9185c392937faa00c6775c3fa2f00 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 20:34:50 +0200 Subject: [PATCH 10/13] Adding another assertion --- .../server/log/remote/storage/RemoteLogManagerConfigTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index ee2f2b4aade2c..57339be10a5b6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; @@ -63,6 +64,7 @@ public void testDefaultConfigs() { Map emptyProps = new HashMap<>(); RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); assertEquals(remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize(), DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE); + assertEquals(remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples(), DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM); } @Test From f67a73470cee89d787e5b391f20019c004de6a18 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 22:11:15 +0200 Subject: [PATCH 11/13] Removing unnecessary unmodifiableMap call --- .../remote/storage/RemoteLogManagerConfig.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 1f825a6347d99..3722bd72edfd6 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -435,15 +435,19 @@ public String remoteLogMetadataManagerPrefix() { } public Map remoteStorageManagerProps() { - return Collections.unmodifiableMap(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null - ? originalsWithPrefix(getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap()); + return getConfigProps(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public Map remoteLogMetadataManagerProps() { - return Collections.unmodifiableMap(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null - ? originalsWithPrefix(getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) - : Collections.emptyMap()); + return getConfigProps(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); + } + + public Map getConfigProps(String configPrefixProp) { + String prefixProp = getString(configPrefixProp); + Map configProps = (prefixProp != null) + ? originalsWithPrefix(prefixProp) + : Collections.emptyMap(); + return configProps.isEmpty() ? configProps : Collections.unmodifiableMap(configProps); } public long remoteLogManagerCopyMaxBytesPerSecond() { From 81a69663f4d7d854e72ea6dd13a084d9c90a7694 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Wed, 5 Jun 2024 22:51:40 +0200 Subject: [PATCH 12/13] update condition for empty map --- .../server/log/remote/storage/RemoteLogManagerConfig.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 3722bd72edfd6..8224ab2382549 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -444,10 +444,7 @@ public Map remoteLogMetadataManagerProps() { public Map getConfigProps(String configPrefixProp) { String prefixProp = getString(configPrefixProp); - Map configProps = (prefixProp != null) - ? originalsWithPrefix(prefixProp) - : Collections.emptyMap(); - return configProps.isEmpty() ? configProps : Collections.unmodifiableMap(configProps); + return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); } public long remoteLogManagerCopyMaxBytesPerSecond() { From a70391f9e9ada941438c6913b50191b593bf7f33 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Thu, 6 Jun 2024 09:49:11 +0200 Subject: [PATCH 13/13] Fix tests from review --- .../storage/RemoteLogManagerConfigTest.java | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 57339be10a5b6..3e6702f84c063 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -23,13 +23,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; -import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -49,7 +42,7 @@ public void testValidConfigs() { RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. - props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); @@ -63,14 +56,14 @@ public void testDefaultConfigs() { // Even with empty properties, RemoteLogManagerConfig has default values Map emptyProps = new HashMap<>(); RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); - assertEquals(remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize(), DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE); - assertEquals(remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples(), DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); } @Test public void testValidateEmptyStringConfig() { // Test with a empty string props should throw ConfigException - Map emptyStringProps = Collections.singletonMap(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); + Map emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); assertThrows(ConfigException.class, () -> new RemoteLogManagerConfig(emptyStringProps)); } @@ -78,16 +71,16 @@ public void testValidateEmptyStringConfig() { private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { Map props = new HashMap<>(); - props.put(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "dummy.remote.storage.class"); - props.put(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, "dummy.remote.storage.class.path"); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME); + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, "dummy.remote.log.metadata.class.path"); - props.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "listener.name"); props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, 1024 * 1024L);