-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15265: Add Remote Log Manager quota manager #15625
Conversation
577b442
to
9ecd20d
Compare
9ecd20d
to
0d29c4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question, in fact, what this PR does is to provide a standard configuration and generate corresponding rate limiters and related monitoring indicators, right? Then it needs to be used in the corresponding RemoteStorageManager, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the integration of the quota manager will come in the follow-up PRs. I have mentioned it in the description of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, left minor comments to address. Thanks for the patch!
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
Outdated
Show resolved
Hide resolved
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some comments.
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { | |||
"less than or equal to `log.retention.bytes` value."; | |||
public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; | |||
|
|||
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; | |||
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + | |||
"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a global limit for all the partitions that are being copied from remote storage to local storage.
<-- is it right? Copied from local storage to remote storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Thanks for catching this.
MetricName quotaMetricName = metricName(); | ||
KafkaMetric metric = allMetrics.get(quotaMetricName); | ||
if (metric != null) { | ||
LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to know why we set WARN logs here. It looks to me if we want to update quota dynamically, it is expected the metric is already existed, right? If so, I don't think this is unexpected. So maybe INFO or DEBUG level, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, INFO should be the right level, because quota update is a significant change in the application state and will decide how fast copies/fetches from remote storage can happen. Also, quota updates are infrequent, hence it will not cause excessive logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhijeetk88 : Thanks for the PR. Made a pass of non-testing files. Left a few comments.
private final int numQuotaSamples; | ||
private final int quotaWindowSizeSeconds; | ||
|
||
public long getQuotaBytesPerSecond() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, we don't typically use getters. So this can just be quotaBytesPerSecond. Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will change these.
} | ||
|
||
public void record(double value) { | ||
sensor().record(value, time.milliseconds(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we turn the quota checking off during record()
? In other implementations like ClientQuotaManager.recordAndGetThrottleTimeMs()
, we call record()
by turning on quota checking and get back the amount of time to throttle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In KIP-956, we do not utilize the throttle time provided by the quota manager to regulate fetches and copies. For fetch operations, we initially verify quota availability before initiating the retrieval of remote data. If the quota is unavailable, our priority is to serve partitions requiring local data, rather than throttling the client. Therefore, we focus on fulfilling data requests for other partitions in the queue, eliminating the need for throttle time in fetch operations.
Similarly, when a RLM Task attempts to copy a segment, it first checks if the write quota is available. If the quota is not available, the thread waits until the quota becomes available. As a result, we do not require throttle time for copies either.
public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; | ||
public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + | ||
"The default value is 11, which means there are 10 whole windows + 1 current window."; | ||
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we choose the default fetch window different from copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For fetches, the default window size was chosen to match the default window size used for other quotas, such as ClientQuota and ReplicationQuota.
Using an 11-second (10 whole + 1 current) window size for copies, similar to other quotas, does seem to be a better option. Consider this:
The broker-level quota for copying may be set to 250 MBps. The RLM task records the log segment size with the quota manager when uploading a log segment. The typical log segment size is 500 MB, meaning only one log segment can be uploaded every 2 seconds without breaching the quota. If uploads occur faster, the quota will be exceeded. Therefore, as long as the window size is greater than 2 seconds, either a 10-second or 60-second (whole) window should work.
However, a shorter window (10 seconds) has advantages. It tracks data uploads more precisely and prevents large spikes in data upload more effectively. For example:
With a 10-second window:
Buckets: b1, b2, ..., b10
In the 10th second, 5 segments can be uploaded without breaching the average quota (5 * 500 MB / 10 seconds = 250 MBps), though the spike will be 2.5 GB in that second.
With a 60-second window:
Buckets: b1, b2, ..., b60
In the 60th second, 30 segments can be uploaded without breaching the average quota (30 * 500 MB / 60 seconds = 250 MBps), but the spike will be 15 GB in that second.
Given the need to avoid quota breaches, a 10-second window is preferable to a 60-second window.
Let me know if it makes sense. I can change the default copy window to be the same as the default fetch window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no good reach, perhaps it's better to use the same default window number for copy too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@abhijeetk88 , do we have any update on this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abhijeetk88 for addressing the review comments. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to change the default copy quota window num samples from 61 to 11?
@abhijeetk88 , there is a merge conflict. Please help resolve it. Thanks. |
done
Waiting for a confirmation from Jun. |
@junrao , since this PR blocks other follow-up PRs and v3.8.0 release date is approaching, I'd like to merge it tomorrow if you don't have any other comments. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhijeetk88 : Sorry for the delay. Just a minor followup comment. Otherwise, the PR LGTM.
public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; | ||
public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + | ||
"The default value is 11, which means there are 10 whole windows + 1 current window."; | ||
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no good reach, perhaps it's better to use the same default window number for copy too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhijeetk88 : Thanks for the updated PR. LGTM
Can we look at testCopyQuotaManagerConfig() – kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty consistently. https://issues.apache.org/jira/browse/KAFKA-16866 |
@jolshan : Thanks for pointing this out. Sorry that I didn't look at the test results carefully before merging. |
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956 Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash <[email protected]>, Jun Rao <[email protected]>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956 Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash <[email protected]>, Jun Rao <[email protected]>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956 Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash <[email protected]>, Jun Rao <[email protected]>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956 Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash <[email protected]>, Jun Rao <[email protected]>
In this PR, I have added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956
Added unit-tests for the Quota Manager implementation
In follow-up PRs, the integration of the copy/fetch quota managers with copy and fetch paths will be added. Also the quota configs will be made dynamically configurable.
Committer Checklist (excluded from commit message)