Skip to content

Commit

Permalink
Add MemTrackingMetrics to ease memory tracking for non-limited memo…
Browse files Browse the repository at this point in the history
…ry consumers (#1691)

* Memory manager no longer track consumers, update aggregatedMetricsSet

* Easy memory tracking with metrics

* use tracking metrics in SPMS

* tests

* fix

* doc

* Update datafusion/src/physical_plan/sorts/sort.rs

Co-authored-by: Andrew Lamb <[email protected]>

* make tracker AtomicUsize

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
yjshen and alamb authored Jan 29, 2022
1 parent ab145c8 commit 641338f
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 386 deletions.
134 changes: 70 additions & 64 deletions datafusion/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
use crate::error::{DataFusionError, Result};
use async_trait::async_trait;
use hashbrown::HashMap;
use hashbrown::HashSet;
use log::debug;
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::sync::{Arc, Condvar, Mutex};

static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);

Expand Down Expand Up @@ -245,10 +245,10 @@ The memory management architecture is the following:
/// Manage memory usage during physical plan execution
#[derive(Debug)]
pub struct MemoryManager {
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
pool_size: usize,
requesters_total: Arc<Mutex<usize>>,
trackers_total: AtomicUsize,
cv: Condvar,
}

Expand All @@ -267,41 +267,47 @@ impl MemoryManager {
);

Arc::new(Self {
requesters: Arc::new(Mutex::new(HashMap::new())),
trackers: Arc::new(Mutex::new(HashMap::new())),
requesters: Arc::new(Mutex::new(HashSet::new())),
pool_size,
requesters_total: Arc::new(Mutex::new(0)),
trackers_total: AtomicUsize::new(0),
cv: Condvar::new(),
})
}
}
}

fn get_tracker_total(&self) -> usize {
let trackers = self.trackers.lock().unwrap();
if trackers.len() > 0 {
trackers.values().fold(0usize, |acc, y| match y.upgrade() {
None => acc,
Some(t) => acc + t.mem_used(),
})
} else {
0
}
self.trackers_total.load(Ordering::SeqCst)
}

/// Register a new memory consumer for memory usage tracking
pub(crate) fn register_consumer(&self, consumer: &Arc<dyn MemoryConsumer>) {
let id = consumer.id().clone();
match consumer.type_() {
ConsumerType::Requesting => {
let mut requesters = self.requesters.lock().unwrap();
requesters.insert(id, Arc::downgrade(consumer));
}
ConsumerType::Tracking => {
let mut trackers = self.trackers.lock().unwrap();
trackers.insert(id, Arc::downgrade(consumer));
}
}
pub(crate) fn grow_tracker_usage(&self, delta: usize) {
self.trackers_total.fetch_add(delta, Ordering::SeqCst);
}

pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
let update =
self.trackers_total
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x >= delta {
Some(x - delta)
} else {
None
}
});
update.expect(&*format!(
"Tracker total memory shrink by {} underflow, current value is ",
delta
));
}

fn get_requester_total(&self) -> usize {
*self.requesters_total.lock().unwrap()
}

/// Register a new memory requester
pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
self.requesters.lock().unwrap().insert(requester_id.clone());
}

fn max_mem_for_requesters(&self) -> usize {
Expand All @@ -317,7 +323,6 @@ impl MemoryManager {

let granted;
loop {
let remaining = rqt_max - *rqt_current_used;
let max_per_rqt = rqt_max / num_rqt;
let min_per_rqt = max_per_rqt / 2;

Expand All @@ -326,6 +331,7 @@ impl MemoryManager {
break;
}

let remaining = rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
if remaining >= required {
granted = true;
*rqt_current_used += required;
Expand All @@ -347,46 +353,37 @@ impl MemoryManager {

fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
let mut requesters_total = self.requesters_total.lock().unwrap();
assert!(*requesters_total >= freed);
*requesters_total -= freed;
*requesters_total += acquired;
self.cv.notify_all()
}

/// Drop a memory consumer from memory usage tracking
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId) {
/// Drop a memory consumer and reclaim the memory
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
// find in requesters first
{
let mut requesters = self.requesters.lock().unwrap();
if requesters.remove(id).is_some() {
return;
if requesters.remove(id) {
let mut total = self.requesters_total.lock().unwrap();
assert!(*total >= mem_used);
*total -= mem_used;
}
}
let mut trackers = self.trackers.lock().unwrap();
trackers.remove(id);
self.shrink_tracker_usage(mem_used);
self.cv.notify_all();
}
}

impl Display for MemoryManager {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let requesters =
self.requesters
.lock()
.unwrap()
.values()
.fold(vec![], |mut acc, consumer| match consumer.upgrade() {
None => acc,
Some(c) => {
acc.push(format!("{}", c));
acc
}
});
let tracker_mem = self.get_tracker_total();
write!(f,
"MemoryManager usage statistics: total {}, tracker used {}, total {} requesters detail: \n {},",
human_readable_size(self.pool_size),
human_readable_size(tracker_mem),
&requesters.len(),
requesters.join("\n"))
"MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}",
human_readable_size(self.pool_size),
human_readable_size(self.get_tracker_total()),
self.requesters.lock().unwrap().len(),
human_readable_size(self.get_requester_total()),
)
}
}

Expand Down Expand Up @@ -418,6 +415,8 @@ mod tests {
use super::*;
use crate::error::Result;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::MemoryConsumer;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use async_trait::async_trait;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -487,6 +486,7 @@ mod tests {

impl DummyTracker {
fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self {
runtime.grow_tracker_usage(mem_used);
Self {
id: MemoryConsumerId::new(partition),
runtime,
Expand Down Expand Up @@ -528,23 +528,29 @@ mod tests {
.with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap());
let runtime = Arc::new(RuntimeEnv::new(config).unwrap());

let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>));
DummyTracker::new(0, runtime.clone(), 5);
assert_eq!(runtime.memory_manager.get_tracker_total(), 5);

let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10));
runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>));
let tracker1 = DummyTracker::new(0, runtime.clone(), 10);
assert_eq!(runtime.memory_manager.get_tracker_total(), 15);

let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15));
runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>));
DummyTracker::new(0, runtime.clone(), 15);
assert_eq!(runtime.memory_manager.get_tracker_total(), 30);

runtime.drop_consumer(tracker2.id());
runtime.drop_consumer(tracker1.id(), tracker1.mem_used);
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);

// MemTrackingMetrics as an easy way to track memory
let ms = ExecutionPlanMetricsSet::new();
let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone());
tracking_metric.init_mem_used(15);
assert_eq!(runtime.memory_manager.get_tracker_total(), 35);

drop(tracking_metric);
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);

let requester1 = Arc::new(DummyRequester::new(0, runtime.clone()));
runtime.register_consumer(&(requester1.clone() as Arc<dyn MemoryConsumer>));
let requester1 = DummyRequester::new(0, runtime.clone());
runtime.register_requester(requester1.id());

// first requester entered, should be able to use any of the remaining 80
requester1.do_with_mem(40).await.unwrap();
Expand All @@ -553,8 +559,8 @@ mod tests {
assert_eq!(requester1.mem_used(), 50);
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50);

let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
runtime.register_consumer(&(requester2.clone() as Arc<dyn MemoryConsumer>));
let requester2 = DummyRequester::new(0, runtime.clone());
runtime.register_requester(requester2.id());

requester2.do_with_mem(20).await.unwrap();
requester2.do_with_mem(30).await.unwrap();
Expand Down
24 changes: 16 additions & 8 deletions datafusion/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use crate::{
error::Result,
execution::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_manager::{
MemoryConsumer, MemoryConsumerId, MemoryManager, MemoryManagerConfig,
},
memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
},
};

Expand Down Expand Up @@ -71,13 +69,23 @@ impl RuntimeEnv {
}

/// Register the consumer to get it tracked
pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) {
self.memory_manager.register_consumer(memory_consumer);
pub fn register_requester(&self, id: &MemoryConsumerId) {
self.memory_manager.register_requester(id);
}

/// Drop the consumer from get tracked
pub fn drop_consumer(&self, id: &MemoryConsumerId) {
self.memory_manager.drop_consumer(id)
/// Drop the consumer from get tracked, reclaim memory
pub fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
self.memory_manager.drop_consumer(id, mem_used)
}

/// Grow tracker memory of `delta`
pub fn grow_tracker_usage(&self, delta: usize) {
self.memory_manager.grow_tracker_usage(delta)
}

/// Shrink tracker memory of `delta`
pub fn shrink_tracker_usage(&self, delta: usize) {
self.memory_manager.shrink_tracker_usage(delta)
}
}

Expand Down
12 changes: 7 additions & 5 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
Expand All @@ -43,21 +43,23 @@ pub struct SizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
baseline_metrics: BaselineMetrics,
metrics: MemTrackingMetrics,
}

impl SizedRecordBatchStream {
/// Create a new RecordBatchIterator
pub fn new(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
baseline_metrics: BaselineMetrics,
metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
metrics.init_mem_used(size);
SizedRecordBatchStream {
schema,
index: 0,
batches,
baseline_metrics,
metrics,
}
}
}
Expand All @@ -75,7 +77,7 @@ impl Stream for SizedRecordBatchStream {
} else {
None
});
self.baseline_metrics.record_poll(poll)
self.metrics.record_poll(poll)
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc

use super::SendableRecordBatchStream;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use async_trait::async_trait;

/// Explain execution plan operator. This operator contains the string
Expand Down Expand Up @@ -148,12 +148,12 @@ impl ExecutionPlan for ExplainExec {
)?;

let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, partition);
let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);

Ok(Box::pin(SizedRecordBatchStream::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
baseline_metrics,
tracking_metrics,
)))
}

Expand Down
Loading

0 comments on commit 641338f

Please sign in to comment.