Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track and emit segment loading rate for HttpLoadQueuePeon on Coordinator #16691

Merged
merged 16 commits into from
Aug 3, 2024

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jul 4, 2024

Design

The loading rate is computed as a moving average of at least the last 10 GiB of successful segment loads.

To account for multiple loading threads on a server, we use the concept of a batch to track load times.
A batch is a set of segments added by the coordinator to the load queue of a server in one go.

batchDurationMillis = t(load queue becomes empty) - t(first load request in batch is sent to server)

batchBytes = total bytes successfully loaded in batch

avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis

overall avg loading rate (kbps) = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)

Changes

  • Add LoadingRateTracker which computes a moving average load rate based on
    the last few GBs of successful segment loads.
  • Emit metric segment/loading/rateKbps from the Coordinator. In the future, we may
    also consider emitting this metric from the historicals themselves.
  • Add expectedLoadTimeMillis to response of API /druid/coordinator/v1/loadQueue?simple

Testing done

  • Added new unit tests
  • Performed testing on local cluster with ~1M segments

Release note

  • Emit new metric segment/loading/rateKbps from the Coordinator which tracks
    the current segment loading rate of a server. Dimensions: server
  • Add expectedLoadTimeMillis to response of API /druid/coordinator/v1/loadQueue?simple

Future work

The expectedLoadTimeMillis can be used to show expected time on the web-console.

This PR does not include web-console changes and the screenshot below is only for reference.

load_time

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz added Area - Segment Balancing/Coordination Needs web console change Backend API changes that would benefit from frontend support in the web console labels Jul 4, 2024
@kfaraz kfaraz requested a review from abhishekrb19 July 4, 2024 05:16
@kfaraz kfaraz changed the title Track and emit segment loading rate in HttpLoadQueuePeon on Coordinator Track and emit segment loading rate for a HttpLoadQueuePeon on Coordinator Jul 4, 2024
@kfaraz kfaraz changed the title Track and emit segment loading rate for a HttpLoadQueuePeon on Coordinator Track and emit segment loading rate for HttpLoadQueuePeon on Coordinator Jul 4, 2024
Copy link
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks good to me. I left a few small suggestions and questions (mostly out of curiosity).

Comment on lines 567 to 569
responses.stream()
.filter(response -> response.getStatus().getState() == SegmentChangeStatus.State.SUCCESS)
.map(DataSegmentChangeResponse::getRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor optimization: To minimize the time spent inside the lock, we can avoid iterating over the responses again by computing the loadSize in the caller itself where we are already switching based on the response state.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can make that change.

responses.stream()
.filter(response -> response.getStatus().getState() == SegmentChangeStatus.State.SUCCESS)
.map(DataSegmentChangeResponse::getRequest)
.filter(req -> req instanceof SegmentChangeRequestLoad)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also worth tracking the drop rate separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this, but it is not really going to be useful because:

  • DROP actions rarely spend time in the queue. This is because DROPs are always prioritized over LOADs and are executed immediately.
  • For a DROP action, rate in kbps is not very meaningful since the size of the segment doesn't exactly play a role in the time taken to delete the file from disk. I guess we would have to track drops per second, but that doesn't have any physical significance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the explanation

.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
.put("expectedLoadTimeMillis", expectedLoadTimeMillis)
.build();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why some of these stats are only available in the simple mode. I see with full, the response being returned currently is less flexible to add new things in this code...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the full mode is currently broken. It just returns the entire LoadQueuePeon object in the response.
CuratorLoadQueuePeon had some @JsonProperty fields that were serialized out into the response.
But HttpLoadQueuePeon has never had any @JsonProperty fields since it was first written, so we just get an empty object in the response of the API /druid/v1/coordinator/loadQueue?full.

Since no one has ever reported this issue, I assume no one is using it (the web-console certainly doesn't use it).
I will create a separate PR to either fix it or just get rid of it completely.

updatedTotal.increment(-evictedHead.bytes, -evictedHead.millisElapsed);
}

if (updatedTotal.bytes > 0 && updatedTotal.millisElapsed > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant check that will always be true given we check for input bytes and millisElapsed in line 46


public synchronized void updateProgress(long bytes, long millisElapsed)
{
if (bytes >= 0 && millisElapsed > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be bytes > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that would be fair.

Although, I am not sure about tombstone segments. They would have size zero but it would still take non-zero time to load time. I guess in that case it would make sense to account for the time in the overall window average.

Let me know what you think.

@@ -325,6 +325,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`segment/dropSkipped/count`|Number of segments that could not be dropped from any server.|`dataSource`, `tier`, `description`|Varies|
|`segment/loadQueue/size`|Size in bytes of segments to load.|`server`|Varies|
|`segment/loadQueue/count`|Number of segments to load.|`server`|Varies|
|`segment/loading/rateKbps`|Moving average rate of segment loading on a server in kbps.|`server`|Varies|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like spellcheck is failing because of kbps. The spelling seems valid, so we can ignore it by adding it to the .spelling file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also document the window constraints (window size and bytes) in the user-facing docs? I think mentioning it will help users better interpret this moving average metric.

Comment on lines 567 to 569
responses.stream()
.filter(response -> response.getStatus().getState() == SegmentChangeStatus.State.SUCCESS)
.map(DataSegmentChangeResponse::getRequest)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can make that change.

responses.stream()
.filter(response -> response.getStatus().getState() == SegmentChangeStatus.State.SUCCESS)
.map(DataSegmentChangeResponse::getRequest)
.filter(req -> req instanceof SegmentChangeRequestLoad)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this, but it is not really going to be useful because:

  • DROP actions rarely spend time in the queue. This is because DROPs are always prioritized over LOADs and are executed immediately.
  • For a DROP action, rate in kbps is not very meaningful since the size of the segment doesn't exactly play a role in the time taken to delete the file from disk. I guess we would have to track drops per second, but that doesn't have any physical significance.


public synchronized void updateProgress(long bytes, long millisElapsed)
{
if (bytes >= 0 && millisElapsed > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that would be fair.

Although, I am not sure about tombstone segments. They would have size zero but it would still take non-zero time to load time. I guess in that case it would make sense to account for the time in the overall window average.

Let me know what you think.

.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
.put("expectedLoadTimeMillis", expectedLoadTimeMillis)
.build();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the full mode is currently broken. It just returns the entire LoadQueuePeon object in the response.
CuratorLoadQueuePeon had some @JsonProperty fields that were serialized out into the response.
But HttpLoadQueuePeon has never had any @JsonProperty fields since it was first written, so we just get an empty object in the response of the API /druid/v1/coordinator/loadQueue?full.

Since no one has ever reported this issue, I assume no one is using it (the web-console certainly doesn't use it).
I will create a separate PR to either fix it or just get rid of it completely.

Co-authored-by: Abhishek Radhakrishnan <[email protected]>
@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 8, 2024

@abhishekrb19 , just realized that there is actually an error in the logic to compute the time taken to load. 😛

Imagine the following scenario:

  • t0: Coordinator wants to load 4 segments on Historical.
  • t0: Coordinator sends request to Historical to load the 4 segments.
  • t1: Historical sends back a response as soon as any one segment finishes loading.
  • t1: Coordinator measures time spent in the request = (t1 - t0).
  • t2: Coordinator sends next request to historical. This request again contains the remaining 3 segments which the Coordinator thinks are not loaded yet.
  • t3: Historical sends back a response saying that the remaining 3 segments have been loaded.
  • t3: Coordinator measures time spent in second request = (t3 - t2)

So total time measured by Coordinator = (t3 - t2) + (t1 - t0)
But actual total time taken by Historical to load all segments = (t3 - t0)

I am trying to figure out how to fix this. Will update once I have a solution.

Update:

There is already a activeRequestSegments field which tracks the segments that have been already sent to the server.
We could convert it to a map and keep a stopwatch for each segment sent to the server.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 9, 2024

Update: Logic is fixed, need to test it out a little more.

Once that is done, I will update the description and documentation here.

@kfaraz kfaraz requested a review from abhishekrb19 July 10, 2024 05:44
@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 10, 2024

@abhishekrb19 , I have updated the docs, description in the PR.
Please let me know what you think.

@AmatyaAvadhanula
Copy link
Contributor

Could you please share the motivation behind these changes?
Would a simpler approach such as [sum(segment size) / time] across successful loads in a coordinator cycle not be sufficient?

@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 10, 2024

Thanks for the feedback, @AmatyaAvadhanula !

Could you please share the motivation behind these changes?

Multiple loading threads.

If a server has a single loading thread, the time taken by individual loads can be added up
to get the total time spent in loading. But if there are multiple threads, the loading times
would have some overlap. Computing the total time spent loading a given set of segments
would require having some sense of start and end, hence the concept of a batch.

thread1:                      ta0---------loading segment A------------ta1
thread2:          tb0-------------loading segment B-------tb1

total time for loading batch = ta1 - tb0

That said, if there is only one loading thread on the server (which is often the case as seen in the formula below),
then the naive logic works just fine.

default numLoadingThreads = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 6)

Let me know what you think, @abhishekrb19 , @AmatyaAvadhanula .
If you feel that the new logic is too complicated and that we could get away with the naive logic for now,
I am okay with doing that and I can save this convoluted design for a rainy day 😂 .
Once we have seen the feature in action, we will know for sure. No point over optimizing the logic right now.

(Even the naive logic can be improved in the future to account for numLoadingThreads.)


Would a simpler approach such as [sum(segment size) / time] across successful loads in a coordinator cycle not be sufficient?

Segment loads are not tied to a coordinator cycle.

"Coordinator cycle" or "coordinator run" simply refers to a single invocation of a duty
like RunRules or BalanceSegments. After the duty has run and assigned a bunch
of segments to the load queue, the segments may take any amount of time to finish loading.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 11, 2024

Reverted to simpler logic that works well for single threads.

cc: @AmatyaAvadhanula , @abhishekrb19

@abhishekrb19
Copy link
Contributor

abhishekrb19 commented Jul 11, 2024

@kfaraz, thanks for the updates, I haven't looked into the latest changes yet. But a quick question based on the comments:

Reverted to simpler logic that works well for single threads.

Does this mean that the simpler logic now only accounts for the default case of a single loading thread (druid.segmentCache.numLoadingThreads)? That seems reasonable for now if we don't want the added complexity, but what is reported when there are multiple loading threads? Instead of including potentially incorrect values, I think we can skip reporting the metric and including expectedLoadTimeMillis in the API for the multiple threads case until we have proper support for it.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 11, 2024

Does this mean that the simpler logic now only accounts for the default case of a single loading thread (druid.segmentCache.numLoadingThreads)?

Yes, @abhishekrb19 .

Instead of including potentially incorrect values, I think we can skip reporting the metric and including expectedLoadTimeMillis in the API for the multiple threads case until we have proper support for it.

Currently, there is no way for the coordinator to know the number of loading threads being used by a historical.
Also, I think it is fine to report somewhat incorrect values as this is an experimental feature now anyway.
It would be interesting to see how off the values turn out to be when using multiple loading threads.
Moreover, we would be over-estimating the expected time, never under-estimating it.
It's always a good feeling when downloads take less time than originally expected! 😛

@abhishekrb19
Copy link
Contributor

Got it. Yeah, making it an experimental feature makes sense. We can also call out the caveat about multiple loading threads in the docs. I will try to look at the latest updates soon!

@abhishekrb19
Copy link
Contributor

abhishekrb19 commented Jul 31, 2024

@kfaraz, apologies for the delay in getting back.

Does this mean that the simpler logic now only accounts for the default case of a single loading thread (druid.segmentCache.numLoadingThreads)?
Yes, @abhishekrb19 .

The docs recommend having at least 16 vCPUs for data servers, so there will be at least 2 loading threads by default in production clusters. As to how much overlap there is between the time spent by loading threads, I'm not sure. Here are a few exploratory thoughts/ideas to simplify and track this more accurately:

  1. How about tracking the load rate directly in the historicals/data servers? I see you have listed that as a potential approach for the future. Besides it being useful and more accurate, I think it's also relatively straightforward to implement. Given that the SegmentLoadDropHandler code is already processing batches of change requests, I think we piggyback on the logic to add some tracking there. Also, we don't introduce another notion of "batch" in the coordinator's HttpLoadQueuePeon if we decide to revive that idea.

I think one downside to this is that the rate computed on the historicals won't account for the end-to-end time (e.g., time spent in the queue, etc). If that is significant, we can perhaps track a metric separately on the coordinator?

  1. If we want to compute the aggregated rate from the coordinator, we could perhaps expose an internal API on the historicals that the coordinator can then query to get the requested info (# of loading threads, # of segment batches processed, etc.) if they're available. However, I think this approach might be an overkill.

Please let me know what you think.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jul 31, 2024

@abhishekrb19 , based on our offline discussion, I have updated the PR.
Please let me know what you think.

Copy link
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall to me 👍. Thanks @kfaraz!

/**
* Total stats for the whole window. This includes the total from the current batch as well.
*/
private final AtomicReference<Entry> windowTotal = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: they're equivalent:

Suggested change
private final AtomicReference<Entry> windowTotal = new AtomicReference<>(null);
private final AtomicReference<Entry> windowTotal = new AtomicReference<>();

public class LoadingRateTracker
{
public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that this is 1 GB? Or maybe initialize it as 1024 * 1024 * 1024 which is a bit more straightforward.

@@ -173,7 +175,7 @@ private void doSegmentManagement()
while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentTimeMillis above can be removed as it is unused now

* The loading rate is computed as a moving average of the last
* {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch was
* smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
* segments added to the load queue together.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a few more things should be called out in this class level javadoc, specifically around the usage of functions.

  1. It would be helpful to document what a batch is more concretely in terms of this specific implementation. For example, something like: multiple updates invoked between a markBatchLoadingStarted() and a markBatchLoadingFinished() call constitute a batch.

  2. Clarify the difference between reset() and markBatchLoadingStopped() or when these should be used; this can just be method level javadocs if you will. For instance, in HttpLoadQueuePeon, I expected to see a reset() to reset the tracker's state once there are no more segments to be loaded, but we explicitly call markBatchLoadingStopped(). Also, what do you think about renaming reset() to stop()?

}
}

public void reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this stop() to avoid any confusion with markBatchLoadingFinished()? stop() would also align with how it's used in the LoadQueuePeon. At the very least, a javadoc here will be helpful

/**
* Total stats for the whole window. This includes the total from the current batch as well.
*/
private final AtomicReference<Entry> windowTotal = new AtomicReference<>(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is windowTotal an atomic reference because it's also used by the API getMovingAverageLoadRateKbps()? As the javadoc notes, all the other state keeping is thread safe viz HttpLoadQueuePeon.

Asking because LoadingRateTracker is marked @NotThreadSafe, so it would be useful to add a comment for windowTotal to explain why there is a synchronization primitive to avoid confusion.

@kfaraz
Copy link
Contributor Author

kfaraz commented Aug 3, 2024

Thanks a lot for the feedback, @abhishekrb19 ! 😃
I have updated some javadocs, added some new ones and renamed the reset() to stop() as suggested.

@kfaraz kfaraz merged commit 9dc2569 into apache:master Aug 3, 2024
88 checks passed
@kfaraz kfaraz deleted the remaining_segment_load_time branch August 3, 2024 07:44
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Documentation Area - Segment Balancing/Coordination Needs web console change Backend API changes that would benefit from frontend support in the web console
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants