Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to can_grow_directly when waiting for the Condvar. #1921

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion datafusion/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use crate::error::{DataFusionError, Result};
use async_trait::async_trait;
use core::time::Duration;
use hashbrown::HashSet;
use log::debug;
use parking_lot::{Condvar, Mutex};
Expand Down Expand Up @@ -340,7 +341,13 @@ impl MemoryManager {
} else if current < min_per_rqt {
// if we cannot acquire at lease 1/2n memory, just wait for others
// to spill instead spill self frequently with limited total mem
self.cv.wait(&mut rqt_current_used);
let timeout = self
.cv
.wait_for(&mut rqt_current_used, Duration::from_secs(5));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it configurable from MemoryManagerConfig?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this wait causing infinite wait in your query? If so, is it a possible leak somewhere else that exists, and we should fix it?

The primary reason it's waiting forever here for a notify signal is to avoid repeated self-spilling with little memory share, producing many spill files and degrading performance.

If a consumer cannot get at least 1/2n of memory among the total, perhaps blocking the thread and yielding computational resources to others is better? Then the huge consumers can progress faster. Either they trigger self-spilling or finish their jobs.

Working with a minimum memory is not ideal because it will harm the overall query processing throughput. Even if the repeated spilling consumer gets enough memory later, it will need a lot of effort reading spills and merging partial results.

We are working with an assumption of a batch engine currently. May one day, we can have a customized task scheduler, self-tuning based on CPU memory usages, and even with an adaptive partition size tuning capability.

Nevertheless, if we must add a timeout, please also print a warning log, as this could be a symptom of a potential bug. And yes, I think to make it a configurable wait is necessary. And please choose a longer timeout as default if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't hit something like infinite wait. But as I'm looking at the code, it's a bit strange to wait forever here.

How long it waits for is unpredictable and seems it could be possibly a long time. It seems more reasonable to have a timeout to prevent it? Even most of time we won't reach the timeout.

Yea, current value is picked arbitrarily. Just to know the idea. I will make it configurable and use a longer default value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Spark is still taking infinite wait in the 1/2n situation. Do you think we will have some new cases to deal with or limitations in the current design? I'm not implying we should strictly follow Spark's way, since the model is different (such as we forbid triggering others to spill, and we tries to share memory similarly among all consumers), but since you are a Spark committer, I might be asking the right person ☺️

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's interesting. I'm surprised that you can find some related code quickly, so are you familiar with Spark codebase and also maybe work on it too? 😄

Spark codebase is quite complicated nowadays. Actually I've not read the code you link to. What in my mind when I looked at can_grow_directly, is where one memory consumer acquires execution memory (code). If the consumer cannot get as much as it needs at first, the manager will actively ask other consumers (in the same task) to do spilling. No waiting here. Actually this is better mode in my mind for the consumer model. I actually plan to make similar changes to this part of datafusion.

For the code you quoted, I just quickly read it. It seems to be for different purpose. It is used to manage memory pools on executors for tasks. So it is reasonable that once a task asks for more memory from a memory pool. If there is no enough memory in the pool and it's in 1/2n (n is number of tasks, not consumers here) situation, the task will be blocked infinitely until other tasks free up memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, the very initial memory managing was modeled after Spark. The initial design doc which is quite similar to Spark's with a prototyping yjshen#3. But we got more chances to analyze the OOM and tuning pains in Spark, and head to the current approach gradually

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we block a consumer belongs to a task, we are blocking the task for one partition as well, no?

You don't get my point above. There are two difference situations, one is between tasks and one is between consumers in same task. In Spark codebase, a task could be blocked when it cannot requires necessary memory and it will wait infinitely for other tasks to release memory (i.e., the code you quoted). This makes sense. One task could be blocked due to insufficient memory and it just waits for other tasks to finish and release memory. It also doesn't make sense to ask other tasks spilling for the task.

For the consumers in same task, Spark doesn't do blocking for a consumer which cannot require its memory but actively asks other consumers (and also the consumer itself but at the last one) to do spilling and release memory if possible.

The point is not whether block the consumer but when it will be unblocked. The code here seems to block a consumer and passively wait infinitely for other consumers. How do we know when other consumers will release? If no consumers release? Then the task is just stuck there. All consumers in same task have the same goal: finish the task. They should help others finish their jobs, by spilling and releasing memory when possible if other consumers need. Maybe there are some concerns I don't get so far, but that's why I think the current model looks strange to me.

I'm not arguing about the tracking consumers and requesting consumers. This looks clever to me too. So your reply above I have no question about it. But seems it's not related to the issue I'm talking in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I must say that I'm not expert in DataFusion codebase so I'm sorry if misunderstand some points about DataFusion.

Copy link
Member

@yjshen yjshen Mar 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By introducing tracking consumers, and a mem_used() method for all consumers, we need each memory-consuming operator implementation to acquire memory and release it eagerly when it's not used.
For example, the external sorter releases its requester's memory to zero and transfer that to a tracking consumer.

How do we know when other consumers will release? If no consumers release?

Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine.

Spilling others in the same task won't make the case easier. It has limited scope and will also cause chained window operators in Spark task to deteriorate performance badly, resulting in more spills and even crash or triggering another speculative task.

Copy link
Member Author

@viirya viirya Mar 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine.

I must say that you don't convince me (because basically you're unable to answer the question) as I don't think the dynamic during consumers interaction is understood here and should be treated like that. It looks like an issue in the model, not a potential bug in some consumers. As seems I cannot also convince you at the point so the discussion will be endless. I don't prepare to continue on it. So I'd close this, leave this as is, and thanks for above discussion.

if timeout.timed_out() {
granted = false;
break;
}
} else {
granted = false;
break;
Expand Down