Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new metric type: Gauge + CurrentMemoryUsage to metrics #1682

Merged
merged 4 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions datafusion/src/physical_plan/metrics/aggregated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,31 @@ pub struct AggregatedMetricsSet {
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
}

impl Default for AggregatedMetricsSet {
fn default() -> Self {
Self::new()
}
}

impl AggregatedMetricsSet {
/// Create a new aggregated set
pub(crate) fn new() -> Self {
pub fn new() -> Self {
Copy link
Member Author

@yjshen yjshen Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To create a memory-managed version of ShuffleWriter, we need to expose these four APIs for dependent crate usage.
I'm not sure this should be in its own PR, or I can have it here since this PR is also metric-related. I can remove this if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to be in this PR

Self {
intermediate: Arc::new(std::sync::Mutex::new(vec![])),
final_: Arc::new(std::sync::Mutex::new(vec![])),
}
}

/// create a new intermediate baseline
pub(crate) fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
pub fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.intermediate.lock().unwrap().push(ms);
result
}

/// create a new final baseline
pub(crate) fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.final_.lock().unwrap().push(ms);
Expand Down Expand Up @@ -137,7 +143,7 @@ impl AggregatedMetricsSet {
}

/// Aggregate all metrics into a one
pub(crate) fn aggregate_all(&self) -> MetricsSet {
pub fn aggregate_all(&self) -> MetricsSet {
let metrics = ExecutionPlanMetricsSet::new();
let baseline = BaselineMetrics::new(&metrics, 0);
self.merge_compute_time(baseline.elapsed_compute());
Expand Down
11 changes: 10 additions & 1 deletion datafusion/src/physical_plan/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::task::Poll;

use arrow::{error::ArrowError, record_batch::RecordBatch};

use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp};

/// Helper for creating and tracking common "baseline" metrics for
/// each operator
Expand Down Expand Up @@ -56,6 +56,9 @@ pub struct BaselineMetrics {
/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// current memory usage for the operator
mem_used: Gauge,

/// output rows: the total output rows
output_rows: Count,
}
Expand All @@ -71,6 +74,7 @@ impl BaselineMetrics {
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
mem_used: MetricBuilder::new(metrics).mem_used(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}
Expand All @@ -90,6 +94,11 @@ impl BaselineMetrics {
&self.spilled_bytes
}

/// return the metric for current memory usage
pub fn mem_used(&self) -> &Gauge {
&self.mem_used
}

/// Record a spill of `spilled_bytes` size.
pub fn record_spill(&self, spilled_bytes: usize) {
self.spill_count.add(1);
Expand Down
31 changes: 30 additions & 1 deletion datafusion/src/physical_plan/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::{borrow::Cow, sync::Arc};

use super::{
Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, Timestamp,
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
};

/// Structure for constructing metrics, counters, timers, etc.
Expand Down Expand Up @@ -123,6 +123,14 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consume self and create a new gauge for reporting current memory usage
pub fn mem_used(self, partition: usize) -> Gauge {
let gauge = Gauge::new();
self.with_partition(partition)
.build(MetricValue::CurrentMemoryUsage(gauge.clone()));
gauge
}

/// Consumes self and creates a new [`Count`] for recording some
/// arbitrary metric of an operator.
pub fn counter(
Expand All @@ -133,6 +141,16 @@ impl<'a> MetricBuilder<'a> {
self.with_partition(partition).global_counter(counter_name)
}

/// Consumes self and creates a new [`Gauge`] for reporting some
/// arbitrary metric of an operator.
pub fn gauge(
self,
gauge_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Gauge {
self.with_partition(partition).global_gauge(gauge_name)
}

/// Consumes self and creates a new [`Count`] for recording a
/// metric of an overall operator (not per partition)
pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
Expand All @@ -144,6 +162,17 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consumes self and creates a new [`Gauge`] for reporting a
/// metric of an overall operator (not per partition)
pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
let gauge = Gauge::new();
self.build(MetricValue::Gauge {
name: gauge_name.into(),
gauge: gauge.clone(),
});
gauge
}

/// Consume self and create a new Timer for recording the elapsed
/// CPU time spent by an operator
pub fn elapsed_compute(self, partition: usize) -> Time {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use hashbrown::HashMap;
pub use aggregated::AggregatedMetricsSet;
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};

/// Something that tracks a value of interest (metric) of a DataFusion
/// [`ExecutionPlan`] execution.
Expand Down
94 changes: 90 additions & 4 deletions datafusion/src/physical_plan/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,62 @@ impl Count {
}
}

/// A gauge is the simplest metrics type. It just returns a value.
/// For example, you can easily expose current memory consumption with a gauge.
///
/// Note `clone`ing gauge update the same underlying metrics
#[derive(Debug, Clone)]
pub struct Gauge {
/// value of the metric gauge
value: std::sync::Arc<AtomicUsize>,
}

impl PartialEq for Gauge {
fn eq(&self, other: &Self) -> bool {
self.value().eq(&other.value())
}
}

impl Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.value())
}
}

impl Default for Gauge {
fn default() -> Self {
Self::new()
}
}

impl Gauge {
/// create a new gauge
pub fn new() -> Self {
Self {
value: Arc::new(AtomicUsize::new(0)),
}
}

/// Add `n` to the metric's value
pub fn add(&self, n: usize) {
yjshen marked this conversation as resolved.
Show resolved Hide resolved
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Set the metric's value to `n` and return the previous value
pub fn set(&self, n: usize) -> usize {
// relaxed ordering for operations on `value` poses no issues
// we're purely using atomic ops with no associated memory ops
self.value.swap(n, Ordering::Relaxed)
}

/// Get the current value
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
}

/// Measure a potentially non contiguous duration of time
#[derive(Debug, Clone)]
pub struct Time {
Expand Down Expand Up @@ -287,13 +343,22 @@ pub enum MetricValue {
SpillCount(Count),
/// Total size of spilled bytes produced: "spilled_bytes" metric
SpilledBytes(Count),
/// Current memory used
CurrentMemoryUsage(Gauge),
/// Operator defined count.
Count {
/// The provided name of this metric
name: Cow<'static, str>,
/// The value of the metric
count: Count,
},
/// Operator defined gauge.
Gauge {
/// The provided name of this metric
name: Cow<'static, str>,
/// The value of the metric
gauge: Gauge,
},
/// Operator defined time
Time {
/// The provided name of this metric
Expand All @@ -314,8 +379,10 @@ impl MetricValue {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Gauge { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
Expand All @@ -328,8 +395,10 @@ impl MetricValue {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Gauge { gauge, .. } => gauge.value(),
Self::Time { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
Expand All @@ -349,11 +418,16 @@ impl MetricValue {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
count: Count::new(),
},
Self::Gauge { name, .. } => Self::Gauge {
name: name.clone(),
gauge: Gauge::new(),
},
Self::Time { name, .. } => Self::Time {
name: name.clone(),
time: Time::new(),
Expand Down Expand Up @@ -383,6 +457,13 @@ impl MetricValue {
count: other_count, ..
},
) => count.add(other_count.value()),
(Self::CurrentMemoryUsage(gauge), Self::CurrentMemoryUsage(other_gauge))
| (
Self::Gauge { gauge, .. },
Self::Gauge {
gauge: other_gauge, ..
},
) => gauge.add(other_gauge.value()),
(Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Self::Time { time, .. },
Expand Down Expand Up @@ -415,10 +496,12 @@ impl MetricValue {
Self::ElapsedCompute(_) => 1, // show second
Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
Self::Count { .. } => 4,
Self::Time { .. } => 5,
Self::StartTimestamp(_) => 6, // show timestamps last
Self::EndTimestamp(_) => 7,
Self::CurrentMemoryUsage(_) => 4,
Self::Count { .. } => 5,
Self::Gauge { .. } => 6,
Self::Time { .. } => 7,
Self::StartTimestamp(_) => 8, // show timestamps last
Self::EndTimestamp(_) => 9,
}
}

Expand All @@ -438,6 +521,9 @@ impl std::fmt::Display for MetricValue {
| Self::Count { count, .. } => {
write!(f, "{}", count)
}
Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
write!(f, "{}", gauge)
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
// distinguish between no time recorded and very small
// amount of time recorded
Expand Down
Loading