-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MemTrackingMetrics
to ease memory tracking for non-limited memory consumers
#1691
Merged
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
0377698
Memory manager no longer track consumers, update aggregatedMetricsSet
yjshen 69a7668
Easy memory tracking with metrics
yjshen fd80e69
Merge remote-tracking branch 'apache/master' into tracker_reporting
yjshen 67e6f2d
use tracking metrics in SPMS
yjshen 2ccb887
tests
yjshen f1988b6
fix
yjshen 014bc68
doc
yjshen e297348
Update datafusion/src/physical_plan/sorts/sort.rs
yjshen 12eba17
make tracker AtomicUsize
yjshen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,12 @@ | |
|
||
use crate::error::{DataFusionError, Result}; | ||
use async_trait::async_trait; | ||
use hashbrown::HashMap; | ||
use hashbrown::HashSet; | ||
use log::debug; | ||
use std::fmt; | ||
use std::fmt::{Debug, Display, Formatter}; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::{Arc, Condvar, Mutex, Weak}; | ||
use std::sync::{Arc, Condvar, Mutex}; | ||
|
||
static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0); | ||
|
||
|
@@ -245,10 +245,10 @@ The memory management architecture is the following: | |
/// Manage memory usage during physical plan execution | ||
#[derive(Debug)] | ||
pub struct MemoryManager { | ||
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, | ||
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, | ||
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>, | ||
pool_size: usize, | ||
requesters_total: Arc<Mutex<usize>>, | ||
trackers_total: Arc<Mutex<usize>>, | ||
cv: Condvar, | ||
} | ||
|
||
|
@@ -267,41 +267,37 @@ impl MemoryManager { | |
); | ||
|
||
Arc::new(Self { | ||
requesters: Arc::new(Mutex::new(HashMap::new())), | ||
trackers: Arc::new(Mutex::new(HashMap::new())), | ||
requesters: Arc::new(Mutex::new(HashSet::new())), | ||
pool_size, | ||
requesters_total: Arc::new(Mutex::new(0)), | ||
trackers_total: Arc::new(Mutex::new(0)), | ||
cv: Condvar::new(), | ||
}) | ||
} | ||
} | ||
} | ||
|
||
fn get_tracker_total(&self) -> usize { | ||
let trackers = self.trackers.lock().unwrap(); | ||
if trackers.len() > 0 { | ||
trackers.values().fold(0usize, |acc, y| match y.upgrade() { | ||
None => acc, | ||
Some(t) => acc + t.mem_used(), | ||
}) | ||
} else { | ||
0 | ||
} | ||
*self.trackers_total.lock().unwrap() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well that sure looks nicer 👍 |
||
} | ||
|
||
/// Register a new memory consumer for memory usage tracking | ||
pub(crate) fn register_consumer(&self, consumer: &Arc<dyn MemoryConsumer>) { | ||
let id = consumer.id().clone(); | ||
match consumer.type_() { | ||
ConsumerType::Requesting => { | ||
let mut requesters = self.requesters.lock().unwrap(); | ||
requesters.insert(id, Arc::downgrade(consumer)); | ||
} | ||
ConsumerType::Tracking => { | ||
let mut trackers = self.trackers.lock().unwrap(); | ||
trackers.insert(id, Arc::downgrade(consumer)); | ||
} | ||
} | ||
pub(crate) fn grow_tracker_usage(&self, delta: usize) { | ||
*self.trackers_total.lock().unwrap() += delta; | ||
} | ||
|
||
pub(crate) fn shrink_tracker_usage(&self, delta: usize) { | ||
let mut total = self.trackers_total.lock().unwrap(); | ||
assert!(*total >= delta); | ||
*total -= delta; | ||
} | ||
|
||
fn get_requester_total(&self) -> usize { | ||
*self.requesters_total.lock().unwrap() | ||
} | ||
|
||
/// Register a new memory requester | ||
pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) { | ||
self.requesters.lock().unwrap().insert(requester_id.clone()); | ||
} | ||
|
||
fn max_mem_for_requesters(&self) -> usize { | ||
|
@@ -317,7 +313,6 @@ impl MemoryManager { | |
|
||
let granted; | ||
loop { | ||
let remaining = rqt_max - *rqt_current_used; | ||
let max_per_rqt = rqt_max / num_rqt; | ||
let min_per_rqt = max_per_rqt / 2; | ||
|
||
|
@@ -326,6 +321,7 @@ impl MemoryManager { | |
break; | ||
} | ||
|
||
let remaining = rqt_max.checked_sub(*rqt_current_used).unwrap_or_default(); | ||
if remaining >= required { | ||
granted = true; | ||
*rqt_current_used += required; | ||
|
@@ -347,46 +343,39 @@ impl MemoryManager { | |
|
||
fn record_free_then_acquire(&self, freed: usize, acquired: usize) { | ||
let mut requesters_total = self.requesters_total.lock().unwrap(); | ||
assert!(*requesters_total >= freed); | ||
*requesters_total -= freed; | ||
*requesters_total += acquired; | ||
self.cv.notify_all() | ||
} | ||
|
||
/// Drop a memory consumer from memory usage tracking | ||
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId) { | ||
/// Drop a memory consumer and reclaim the memory | ||
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) { | ||
// find in requesters first | ||
{ | ||
let mut requesters = self.requesters.lock().unwrap(); | ||
if requesters.remove(id).is_some() { | ||
return; | ||
if requesters.remove(id) { | ||
let mut total = self.requesters_total.lock().unwrap(); | ||
assert!(*total >= mem_used); | ||
*total -= mem_used; | ||
} | ||
} | ||
let mut trackers = self.trackers.lock().unwrap(); | ||
trackers.remove(id); | ||
let mut total = self.trackers_total.lock().unwrap(); | ||
assert!(*total >= mem_used); | ||
*total -= mem_used; | ||
self.cv.notify_all(); | ||
} | ||
} | ||
|
||
impl Display for MemoryManager { | ||
fn fmt(&self, f: &mut Formatter) -> fmt::Result { | ||
let requesters = | ||
self.requesters | ||
.lock() | ||
.unwrap() | ||
.values() | ||
.fold(vec![], |mut acc, consumer| match consumer.upgrade() { | ||
None => acc, | ||
Some(c) => { | ||
acc.push(format!("{}", c)); | ||
acc | ||
} | ||
}); | ||
let tracker_mem = self.get_tracker_total(); | ||
write!(f, | ||
"MemoryManager usage statistics: total {}, tracker used {}, total {} requesters detail: \n {},", | ||
human_readable_size(self.pool_size), | ||
human_readable_size(tracker_mem), | ||
&requesters.len(), | ||
requesters.join("\n")) | ||
"MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}", | ||
human_readable_size(self.pool_size), | ||
human_readable_size(self.get_tracker_total()), | ||
self.requesters.lock().unwrap().len(), | ||
human_readable_size(self.get_requester_total()), | ||
) | ||
} | ||
} | ||
|
||
|
@@ -418,6 +407,8 @@ mod tests { | |
use super::*; | ||
use crate::error::Result; | ||
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; | ||
use crate::execution::MemoryConsumer; | ||
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; | ||
use async_trait::async_trait; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
|
@@ -487,6 +478,7 @@ mod tests { | |
|
||
impl DummyTracker { | ||
fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self { | ||
runtime.grow_tracker_usage(mem_used); | ||
Self { | ||
id: MemoryConsumerId::new(partition), | ||
runtime, | ||
|
@@ -528,23 +520,29 @@ mod tests { | |
.with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap()); | ||
let runtime = Arc::new(RuntimeEnv::new(config).unwrap()); | ||
|
||
let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5)); | ||
runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>)); | ||
DummyTracker::new(0, runtime.clone(), 5); | ||
assert_eq!(runtime.memory_manager.get_tracker_total(), 5); | ||
|
||
let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10)); | ||
runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>)); | ||
let tracker1 = DummyTracker::new(0, runtime.clone(), 10); | ||
assert_eq!(runtime.memory_manager.get_tracker_total(), 15); | ||
|
||
let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15)); | ||
runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>)); | ||
DummyTracker::new(0, runtime.clone(), 15); | ||
assert_eq!(runtime.memory_manager.get_tracker_total(), 30); | ||
|
||
runtime.drop_consumer(tracker2.id()); | ||
runtime.drop_consumer(tracker1.id(), tracker1.mem_used); | ||
assert_eq!(runtime.memory_manager.get_tracker_total(), 20); | ||
|
||
// MemTrackingMetrics as an easy way to track memory | ||
let ms = ExecutionPlanMetricsSet::new(); | ||
let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone()); | ||
tracking_metric.init_mem_used(15); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👌 --very nice |
||
assert_eq!(runtime.memory_manager.get_tracker_total(), 35); | ||
|
||
drop(tracking_metric); | ||
assert_eq!(runtime.memory_manager.get_tracker_total(), 20); | ||
|
||
let requester1 = Arc::new(DummyRequester::new(0, runtime.clone())); | ||
runtime.register_consumer(&(requester1.clone() as Arc<dyn MemoryConsumer>)); | ||
let requester1 = DummyRequester::new(0, runtime.clone()); | ||
runtime.register_requester(requester1.id()); | ||
|
||
// first requester entered, should be able to use any of the remaining 80 | ||
requester1.do_with_mem(40).await.unwrap(); | ||
|
@@ -553,8 +551,8 @@ mod tests { | |
assert_eq!(requester1.mem_used(), 50); | ||
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50); | ||
|
||
let requester2 = Arc::new(DummyRequester::new(0, runtime.clone())); | ||
runtime.register_consumer(&(requester2.clone() as Arc<dyn MemoryConsumer>)); | ||
let requester2 = DummyRequester::new(0, runtime.clone()); | ||
runtime.register_requester(requester2.id()); | ||
|
||
requester2.do_with_mem(20).await.unwrap(); | ||
requester2.do_with_mem(30).await.unwrap(); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as a follow on PR this can be changed to be an
AtomicUsize
and avoid the mutex and I think the fetch and update code will be nicer.I think that would be a nice to have - not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the requester_total is combined with the later
Condvar
, to stop late arrived requesters frequently spilling (since the earlier consumers may already occupy much memory). They wait for notification when holding less than 1/2n memory. Any suggestions on this?The code here would be much simplified when substituted Arc<Mutex> by AtomicUsize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this lock only guarantees the two operations updating requesters_total and calling cv.notify_all will be performed atomically, but it looks like this doesn't really buy us anything? The waiter on self.cv can wake up and get preempted right away by other threads that might update requesters_total. I am curious from your point of view what benefit this critical region provides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind, I was wrong, considering the cv will reacquire the lock on wake up, a mutex is needed if we need to make sure the woken up thread will not be operating with a different
requesters_total
value.