From 48f6b513c3550fa1e79f5e2886d23f7cc21a756d Mon Sep 17 00:00:00 2001 From: abhijeetk88 Date: Fri, 24 May 2024 12:28:47 +0530 Subject: [PATCH] Address review comments --- .../kafka/log/remote/RemoteLogManager.java | 1 + .../log/remote/quota/RLMQuotaManager.java | 8 +++---- .../remote/quota/RLMQuotaManagerConfig.java | 6 ++--- .../log/remote/RemoteLogManagerTest.java | 24 +++++++++---------- .../storage/RemoteLogManagerConfig.java | 2 +- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 689ccb9c114c8..aada1bf59f907 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -184,6 +184,7 @@ public class RemoteLogManager implements Closeable { * @param fetchLog function to get UnifiedLog instance for a given topic. * @param updateRemoteLogStartOffset function to update the log-start-offset for a given topic partition. * @param brokerTopicStats BrokerTopicStats instance to update the respective metrics. + * @param metrics Metrics instance */ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, int brokerId, diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java index d083ba48d37fc..e21f00f1f5b0e 100644 --- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java +++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java @@ -56,7 +56,7 @@ public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType this.description = description; this.time = time; - this.quota = new Quota(config.getQuotaBytesPerSecond(), true); + this.quota = new Quota(config.quotaBytesPerSecond(), true); this.sensorAccess = new SensorAccess(lock, metrics); } @@ -69,7 +69,7 @@ public void updateQuota(Quota newQuota) { MetricName quotaMetricName = metricName(); KafkaMetric metric = allMetrics.get(quotaMetricName); if (metric != null) { - LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); + LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); metric.config(getQuotaMetricConfig(newQuota)); } } finally { @@ -95,8 +95,8 @@ public void record(double value) { private MetricConfig getQuotaMetricConfig(Quota quota) { return new MetricConfig() - .timeWindow(config.getQuotaWindowSizeSeconds(), TimeUnit.SECONDS) - .samples(config.getNumQuotaSamples()) + .timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS) + .samples(config.numQuotaSamples()) .quota(quota); } diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java index 3df5410d9cb8c..ead47267e29b6 100644 --- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java +++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java @@ -23,15 +23,15 @@ public class RLMQuotaManagerConfig { private final int numQuotaSamples; private final int quotaWindowSizeSeconds; - public long getQuotaBytesPerSecond() { + public long quotaBytesPerSecond() { return quotaBytesPerSecond; } - public int getNumQuotaSamples() { + public int numQuotaSamples() { return numQuotaSamples; } - public int getQuotaWindowSizeSeconds() { + public int quotaWindowSizeSeconds() { return quotaWindowSizeSeconds; } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 0c0b730f23a76..da88c0687cb2d 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -2543,9 +2543,9 @@ public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); - assertEquals(Long.MAX_VALUE, defaultConfig.getQuotaBytesPerSecond()); - assertEquals(61, defaultConfig.getNumQuotaSamples()); - assertEquals(1, defaultConfig.getQuotaWindowSizeSeconds()); + assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); + assertEquals(61, defaultConfig.numQuotaSamples()); + assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); @@ -2553,9 +2553,9 @@ public void testCopyQuotaManagerConfig() { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); - assertEquals(100L, rlmCopyQuotaManagerConfig.getQuotaBytesPerSecond()); - assertEquals(31, rlmCopyQuotaManagerConfig.getNumQuotaSamples()); - assertEquals(1, rlmCopyQuotaManagerConfig.getQuotaWindowSizeSeconds()); + assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); + assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); + assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); } @Test @@ -2563,9 +2563,9 @@ public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); - assertEquals(Long.MAX_VALUE, defaultConfig.getQuotaBytesPerSecond()); - assertEquals(11, defaultConfig.getNumQuotaSamples()); - assertEquals(1, defaultConfig.getQuotaWindowSizeSeconds()); + assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); + assertEquals(11, defaultConfig.numQuotaSamples()); + assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); @@ -2573,9 +2573,9 @@ public void testFetchQuotaManagerConfig() { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); - assertEquals(100L, rlmFetchQuotaManagerConfig.getQuotaBytesPerSecond()); - assertEquals(31, rlmFetchQuotaManagerConfig.getNumQuotaSamples()); - assertEquals(1, rlmFetchQuotaManagerConfig.getQuotaWindowSizeSeconds()); + assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); + assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); + assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } private Partition mockPartition(TopicIdPartition topicIdPartition) { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 1b4ceefa04892..619941d2e94e1 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -145,7 +145,7 @@ public final class RemoteLogManagerConfig { public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + - "This is a global limit for all the partitions that are being copied from remote storage to local storage. " + + "This is a global limit for all the partitions that are being copied from local storage to remote storage. " + "The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;