Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetk88 committed May 24, 2024
1 parent 35066ad commit 48f6b51
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 20 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class RemoteLogManager implements Closeable {
* @param fetchLog function to get UnifiedLog instance for a given topic.
* @param updateRemoteLogStartOffset function to update the log-start-offset for a given topic partition.
* @param brokerTopicStats BrokerTopicStats instance to update the respective metrics.
* @param metrics Metrics instance
*/
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
int brokerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType
this.description = description;
this.time = time;

this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
this.quota = new Quota(config.quotaBytesPerSecond(), true);
this.sensorAccess = new SensorAccess(lock, metrics);
}

Expand All @@ -69,7 +69,7 @@ public void updateQuota(Quota newQuota) {
MetricName quotaMetricName = metricName();
KafkaMetric metric = allMetrics.get(quotaMetricName);
if (metric != null) {
LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota);
LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota);
metric.config(getQuotaMetricConfig(newQuota));
}
} finally {
Expand All @@ -95,8 +95,8 @@ public void record(double value) {

private MetricConfig getQuotaMetricConfig(Quota quota) {
return new MetricConfig()
.timeWindow(config.getQuotaWindowSizeSeconds(), TimeUnit.SECONDS)
.samples(config.getNumQuotaSamples())
.timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
.samples(config.numQuotaSamples())
.quota(quota);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ public class RLMQuotaManagerConfig {
private final int numQuotaSamples;
private final int quotaWindowSizeSeconds;

public long getQuotaBytesPerSecond() {
public long quotaBytesPerSecond() {
return quotaBytesPerSecond;
}

public int getNumQuotaSamples() {
public int numQuotaSamples() {
return numQuotaSamples;
}

public int getQuotaWindowSizeSeconds() {
public int quotaWindowSizeSeconds() {
return quotaWindowSizeSeconds;
}

Expand Down
24 changes: 12 additions & 12 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2543,39 +2543,39 @@ public void testCopyQuotaManagerConfig() {
Properties defaultProps = new Properties();
RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
assertEquals(Long.MAX_VALUE, defaultConfig.getQuotaBytesPerSecond());
assertEquals(61, defaultConfig.getNumQuotaSamples());
assertEquals(1, defaultConfig.getQuotaWindowSizeSeconds());
assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
assertEquals(61, defaultConfig.numQuotaSamples());
assertEquals(1, defaultConfig.quotaWindowSizeSeconds());

Properties customProps = new Properties();
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
assertEquals(100L, rlmCopyQuotaManagerConfig.getQuotaBytesPerSecond());
assertEquals(31, rlmCopyQuotaManagerConfig.getNumQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.getQuotaWindowSizeSeconds());
assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
}

@Test
public void testFetchQuotaManagerConfig() {
Properties defaultProps = new Properties();
RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
assertEquals(Long.MAX_VALUE, defaultConfig.getQuotaBytesPerSecond());
assertEquals(11, defaultConfig.getNumQuotaSamples());
assertEquals(1, defaultConfig.getQuotaWindowSizeSeconds());
assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
assertEquals(11, defaultConfig.numQuotaSamples());
assertEquals(1, defaultConfig.quotaWindowSizeSeconds());

Properties customProps = new Properties();
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
assertEquals(100L, rlmFetchQuotaManagerConfig.getQuotaBytesPerSecond());
assertEquals(31, rlmFetchQuotaManagerConfig.getNumQuotaSamples());
assertEquals(1, rlmFetchQuotaManagerConfig.getQuotaWindowSizeSeconds());
assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
}

private Partition mockPartition(TopicIdPartition topicIdPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public final class RemoteLogManagerConfig {

public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second";
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " +
"This is a global limit for all the partitions that are being copied from remote storage to local storage. " +
"This is a global limit for all the partitions that are being copied from local storage to remote storage. " +
"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second.";
public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;

Expand Down

0 comments on commit 48f6b51

Please sign in to comment.