-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MemTrackingMetrics
to ease memory tracking for non-limited memory consumers
#1691
Conversation
); | ||
self.inner_metrics.mem_used().set(0); | ||
// TODO: the result size is not tracked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the reason I wrote this PR in the first place 🤔. A large sorted record batch whose memory is not tracked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks really nice @yjshen 👍
I do think it is worth considering consolidating MemTrackingMetrics
and BaselineMetrics
but that could also be done in a later PR
Nice work
} else { | ||
0 | ||
} | ||
*self.trackers_total.lock().unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well that sure looks nicer 👍
@@ -245,10 +245,10 @@ The memory management architecture is the following: | |||
/// Manage memory usage during physical plan execution | |||
#[derive(Debug)] | |||
pub struct MemoryManager { | |||
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, | |||
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, | |||
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>, | |||
pool_size: usize, | |||
requesters_total: Arc<Mutex<usize>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as a follow on PR this can be changed to be an AtomicUsize
and avoid the mutex and I think the fetch and update code will be nicer.
I think that would be a nice to have - not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the requester_total is combined with the later Condvar
, to stop late arrived requesters frequently spilling (since the earlier consumers may already occupy much memory). They wait for notification when holding less than 1/2n memory. Any suggestions on this?
The code here would be much simplified when substituted Arc<Mutex> by AtomicUsize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this lock only guarantees the two operations updating requesters_total and calling cv.notify_all will be performed atomically, but it looks like this doesn't really buy us anything? The waiter on self.cv can wake up and get preempted right away by other threads that might update requesters_total. I am curious from your point of view what benefit this critical region provides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind, I was wrong, considering the cv will reacquire the lock on wake up, a mutex is needed if we need to make sure the woken up thread will not be operating with a different requesters_total
value.
// MemTrackingMetrics as an easy way to track memory | ||
let ms = ExecutionPlanMetricsSet::new(); | ||
let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone()); | ||
tracking_metric.init_mem_used(15); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 --very nice
/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`. | ||
/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation), | ||
/// and which are intermediate metrics that we only account for elapsed_compute time. | ||
pub struct CompositeMetricsSet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nicer name and a good description for Aggregated
metrics
/// Simplified version of tracking memory consumer, | ||
/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking) | ||
/// | ||
/// You could use this to replace [BaselineMetrics], report the memory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense tocombine MemTrackingMetrics
and BaselineMetrics
-- for example make all put the runtime
and id
fields into BaselineMetrics
The rationale is that we would eventually like all ExecutionPlan
operations to report their memory usage (as well as row count, and time spent) so using the existing BaselineMetrics
would make it easy
self.metrics.try_done(); | ||
if self.mem_used() != 0 { | ||
if let Some(rt) = self.runtime.as_ref() { | ||
rt.drop_consumer(&self.id, self.mem_used()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice simplification @yjshen :D
I've changed |
*total -= delta; | ||
let update = | ||
self.trackers_total | ||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this potentially end up in an infinite loop if x
is zero as the closure would always return None?
After seeing this way, I think I agree that the original implementation using Mutex
was better -- sorry about that @yjshen I didn't realize the subtleties involved here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think fetch_update won't let infinite loop here. A simple case zero minus ten results in underflow:
https://play.rust-lang.org/?version=stable&mode=release&edition=2021
I agree the mutex version is easier to write and reason. I can revert the last commit if preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is ready to go I think -- we can always keep iterating in the next one. Sorry for the delay in merging @yjshen
* feat: add join type for logical plan display (#1674) * (minor) Reduce memory manager and disk manager logs from `info!` to `debug!` (#1689) * Move `information_schema` tests out of execution/context.rs to `sql_integration` tests (#1684) * Move tests from context.rs to information_schema.rs * Fix up tests to compile * Move timestamp related tests out of context.rs and into sql integration test (#1696) * Move some tests out of context.rs and into sql * Move support test out of context.rs and into sql tests * Fixup tests and make them compile * Fix parquet projection * fix pruning casting * fix test based on debug strings * revert read_spill method by getting schema from file * Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers (#1691) * Memory manager no longer track consumers, update aggregatedMetricsSet * Easy memory tracking with metrics * use tracking metrics in SPMS * tests * fix * doc * Update datafusion/src/physical_plan/sorts/sort.rs Co-authored-by: Andrew Lamb <[email protected]> * make tracker AtomicUsize Co-authored-by: Andrew Lamb <[email protected]> * Implement TableProvider for DataFrameImpl (#1699) * Add TableProvider impl for DataFrameImpl * Add physical plan in * Clean up plan construction and names construction * Remove duplicate comments * Remove unused parameter * Add test * Remove duplicate limit comment * Use cloned instead of individual clone * Reduce the amount of code to get a schema Co-authored-by: Andrew Lamb <[email protected]> * Add comments to test * Fix plan comparison * Compare only the results of execution * Remove println * Refer to df_impl instead of table in test Co-authored-by: Andrew Lamb <[email protected]> * Fix the register_table test to use the correct result set for comparison * Consolidate group/agg exprs * Format * Remove outdated comment Co-authored-by: Andrew Lamb <[email protected]> * refine test in repartition.rs & coalesce_batches.rs (#1707) * Fuzz test for spillable sort (#1706) * Lazy TempDir creation in DiskManager (#1695) * Incorporate dyn scalar kernels (#1685) * Rebase * impl ToNumeric for ScalarValue * Update macro to be based on * Add floats * Cleanup * Newline * add annotation for select_to_plan (#1714) * Support `create_physical_expr` and `ExecutionContextState` or `DefaultPhysicalPlanner` for faster speed (#1700) * Change physical_expr creation API * Refactor API usage to avoid creating ExecutionContextState * Fixup ballista * clippy! * Fix can not load parquet table form spark in datafusion-cli. (#1665) * fix can not load parquet table form spark * add Invalid file in log. * fix fmt * add upper bound for pub fn (#1713) Signed-off-by: remzi <[email protected]> * Create SchemaAdapter trait to map table schema to file schemas (#1709) * Create SchemaAdapter trait to map table schema to file schemas * Linting fix * Remove commented code * approx_quantile() aggregation function (#1539) * feat: implement TDigest for approx quantile Adds a [TDigest] implementation providing approximate quantile estimations of large inputs using a small amount of (bounded) memory. A TDigest is most accurate near either "end" of the quantile range (that is, 0.1, 0.9, 0.95, etc) due to the use of a scalaing function that increases resolution at the tails. The paper claims single digit part per million errors for q ≤ 0.001 or q ≥ 0.999 using 100 centroids, and in practice I have found accuracy to be more than acceptable for an apprixmate function across the entire quantile range. The implementation is a modified copy of https://github.com/MnO2/t-digest, itself a Rust port of [Facebook's C++ implementation]. Both Facebook's implementation, and Mn02's Rust port are Apache 2.0 licensed. [TDigest]: https://arxiv.org/abs/1902.04023 [Facebook's C++ implementation]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h * feat: approx_quantile aggregation Adds the ApproxQuantile physical expression, plumbing & test cases. The function signature is: approx_quantile(column, quantile) Where column can be any numeric type (that can be cast to a float64) and quantile is a float64 literal between 0 and 1. * feat: approx_quantile dataframe function Adds the approx_quantile() dataframe function, and exports it in the prelude. * refactor: bastilla approx_quantile support Adds bastilla wire encoding for approx_quantile. Adding support for this required modifying the AggregateExprNode proto message to support propigating multiple LogicalExprNode aggregate arguments - all the existing aggregations take a single argument, so this wasn't needed before. This commit adds "repeated" to the expr field, which I believe is backwards compatible as described here: https://developers.google.com/protocol-buffers/docs/proto3#updating Specifically, adding "repeated" to an existing message field: "For ... message fields, optional is compatible with repeated" No existing tests needed fixing, and a new roundtrip test is included that covers the change to allow multiple expr. * refactor: use input type as return type Casts the calculated quantile value to the same type as the input data. * fixup! refactor: bastilla approx_quantile support * refactor: rebase onto main * refactor: validate quantile value Ensures the quantile values is between 0 and 1, emitting a plan error if not. * refactor: rename to approx_percentile_cont * refactor: clippy lints * suppport bitwise and as an example (#1653) * suppport bitwise and as an example * Use $OP in macro rather than `&` * fix: change signature to &dyn Array * fmt Co-authored-by: Andrew Lamb <[email protected]> * fix: substr - correct behaivour with negative start pos (#1660) * minor: fix cargo run --release error (#1723) * Convert boolean case expressions to boolean logic (#1719) * Convert boolean case expressions to boolean logic * Review feedback * substitute `parking_lot::Mutex` for `std::sync::Mutex` (#1720) * Substitute parking_lot::Mutex for std::sync::Mutex * enable parking_lot feature in tokio * Add Expression Simplification API (#1717) * Add Expression Simplification API * fmt * use from_slice(&[T]) instead of from_slice(Vec<T>) to prevent future merge conflicts * fix decimal add because arrow2 doesn't include decimal add in arithmetics::add * fix decimal scale for cast test * fix parquet file format adapted projection by providing the proper schema to the RecordBatch Co-authored-by: xudong.w <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Yijie Shen <[email protected]> Co-authored-by: Phillip Cloud <[email protected]> Co-authored-by: Matthew Turner <[email protected]> Co-authored-by: Yang <[email protected]> Co-authored-by: Remzi Yang <[email protected]> Co-authored-by: Dan Harris <[email protected]> Co-authored-by: Dom <[email protected]> Co-authored-by: Kun Liu <[email protected]> Co-authored-by: Dmitry Patsura <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]>
* feat: add join type for logical plan display (#1674) * (minor) Reduce memory manager and disk manager logs from `info!` to `debug!` (#1689) * Move `information_schema` tests out of execution/context.rs to `sql_integration` tests (#1684) * Move tests from context.rs to information_schema.rs * Fix up tests to compile * Move timestamp related tests out of context.rs and into sql integration test (#1696) * Move some tests out of context.rs and into sql * Move support test out of context.rs and into sql tests * Fixup tests and make them compile * Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers (#1691) * Memory manager no longer track consumers, update aggregatedMetricsSet * Easy memory tracking with metrics * use tracking metrics in SPMS * tests * fix * doc * Update datafusion/src/physical_plan/sorts/sort.rs Co-authored-by: Andrew Lamb <[email protected]> * make tracker AtomicUsize Co-authored-by: Andrew Lamb <[email protected]> * Implement TableProvider for DataFrameImpl (#1699) * Add TableProvider impl for DataFrameImpl * Add physical plan in * Clean up plan construction and names construction * Remove duplicate comments * Remove unused parameter * Add test * Remove duplicate limit comment * Use cloned instead of individual clone * Reduce the amount of code to get a schema Co-authored-by: Andrew Lamb <[email protected]> * Add comments to test * Fix plan comparison * Compare only the results of execution * Remove println * Refer to df_impl instead of table in test Co-authored-by: Andrew Lamb <[email protected]> * Fix the register_table test to use the correct result set for comparison * Consolidate group/agg exprs * Format * Remove outdated comment Co-authored-by: Andrew Lamb <[email protected]> * refine test in repartition.rs & coalesce_batches.rs (#1707) * Fuzz test for spillable sort (#1706) * Lazy TempDir creation in DiskManager (#1695) * Incorporate dyn scalar kernels (#1685) * Rebase * impl ToNumeric for ScalarValue * Update macro to be based on * Add floats * Cleanup * Newline * add annotation for select_to_plan (#1714) * Support `create_physical_expr` and `ExecutionContextState` or `DefaultPhysicalPlanner` for faster speed (#1700) * Change physical_expr creation API * Refactor API usage to avoid creating ExecutionContextState * Fixup ballista * clippy! * Fix can not load parquet table form spark in datafusion-cli. (#1665) * fix can not load parquet table form spark * add Invalid file in log. * fix fmt * add upper bound for pub fn (#1713) Signed-off-by: remzi <[email protected]> * Create SchemaAdapter trait to map table schema to file schemas (#1709) * Create SchemaAdapter trait to map table schema to file schemas * Linting fix * Remove commented code * approx_quantile() aggregation function (#1539) * feat: implement TDigest for approx quantile Adds a [TDigest] implementation providing approximate quantile estimations of large inputs using a small amount of (bounded) memory. A TDigest is most accurate near either "end" of the quantile range (that is, 0.1, 0.9, 0.95, etc) due to the use of a scalaing function that increases resolution at the tails. The paper claims single digit part per million errors for q ≤ 0.001 or q ≥ 0.999 using 100 centroids, and in practice I have found accuracy to be more than acceptable for an apprixmate function across the entire quantile range. The implementation is a modified copy of https://github.com/MnO2/t-digest, itself a Rust port of [Facebook's C++ implementation]. Both Facebook's implementation, and Mn02's Rust port are Apache 2.0 licensed. [TDigest]: https://arxiv.org/abs/1902.04023 [Facebook's C++ implementation]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h * feat: approx_quantile aggregation Adds the ApproxQuantile physical expression, plumbing & test cases. The function signature is: approx_quantile(column, quantile) Where column can be any numeric type (that can be cast to a float64) and quantile is a float64 literal between 0 and 1. * feat: approx_quantile dataframe function Adds the approx_quantile() dataframe function, and exports it in the prelude. * refactor: bastilla approx_quantile support Adds bastilla wire encoding for approx_quantile. Adding support for this required modifying the AggregateExprNode proto message to support propigating multiple LogicalExprNode aggregate arguments - all the existing aggregations take a single argument, so this wasn't needed before. This commit adds "repeated" to the expr field, which I believe is backwards compatible as described here: https://developers.google.com/protocol-buffers/docs/proto3#updating Specifically, adding "repeated" to an existing message field: "For ... message fields, optional is compatible with repeated" No existing tests needed fixing, and a new roundtrip test is included that covers the change to allow multiple expr. * refactor: use input type as return type Casts the calculated quantile value to the same type as the input data. * fixup! refactor: bastilla approx_quantile support * refactor: rebase onto main * refactor: validate quantile value Ensures the quantile values is between 0 and 1, emitting a plan error if not. * refactor: rename to approx_percentile_cont * refactor: clippy lints * suppport bitwise and as an example (#1653) * suppport bitwise and as an example * Use $OP in macro rather than `&` * fix: change signature to &dyn Array * fmt Co-authored-by: Andrew Lamb <[email protected]> * fix: substr - correct behaivour with negative start pos (#1660) * minor: fix cargo run --release error (#1723) * Convert boolean case expressions to boolean logic (#1719) * Convert boolean case expressions to boolean logic * Review feedback * substitute `parking_lot::Mutex` for `std::sync::Mutex` (#1720) * Substitute parking_lot::Mutex for std::sync::Mutex * enable parking_lot feature in tokio * Add Expression Simplification API (#1717) * Add Expression Simplification API * fmt * Add tests and CI for optional pyarrow module (#1711) * Implement other side of conversion * Add test workflow * Add (failing) tests * Get unit tests passing * Use python -m pip * Debug LD_LIBRARY_PATH * Set LIBRARY_PATH * Update help with better info * Update parking_lot requirement from 0.11 to 0.12 (#1735) Updates the requirements on [parking_lot](https://github.com/Amanieu/parking_lot) to permit the latest version. - [Release notes](https://github.com/Amanieu/parking_lot/releases) - [Changelog](https://github.com/Amanieu/parking_lot/blob/master/CHANGELOG.md) - [Commits](Amanieu/parking_lot@0.11.0...0.12.0) --- updated-dependencies: - dependency-name: parking_lot dependency-type: direct:production ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Prevent repartitioning of certain operator's direct children (#1731) (#1732) * Prevent repartitioning of certain operator's direct children (#1731) * Update ballista tests * Don't repartition children of RepartitionExec * Revert partition restriction on Repartition and Projection * Review feedback * Lint * API to get Expr's type and nullability without a `DFSchema` (#1726) * API to get Expr type and nullability without a `DFSchema` * Add test * publically export * Improve docs * Fix typos in crate documentation (#1739) * add `cargo check --release` to ci (#1737) * remote test * Update .github/workflows/rust.yml Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> * Move optimize test out of context.rs (#1742) * Move optimize test out of context.rs * Update * use clap 3 style args parsing for datafusion cli (#1749) * use clap 3 style args parsing for datafusion cli * upgrade cli version * Add partitioned_csv setup code to sql_integration test (#1743) * use ordered-float 2.10 (#1756) Signed-off-by: Andy Grove <[email protected]> * #1768 Support TimeUnit::Second in hasher (#1769) * Support TimeUnit::Second in hasher * fix linter * format (#1745) * Create built-in scalar functions programmatically (#1734) * create build-in scalar functions programatically Signed-off-by: remzi <[email protected]> * solve conflict Signed-off-by: remzi <[email protected]> * fix spelling mistake Signed-off-by: remzi <[email protected]> * rename to call_fn Signed-off-by: remzi <[email protected]> * [split/1] split datafusion-common module (#1751) * split datafusion-common module * pyarrow * Update datafusion-common/README.md Co-authored-by: Andy Grove <[email protected]> * Update datafusion/Cargo.toml * include publishing Co-authored-by: Andy Grove <[email protected]> * fix: Case insensitive unquoted identifiers (#1747) * move dfschema and column (#1758) * add datafusion-expr module (#1759) * move column, dfschema, etc. to common module (#1760) * include window frames and operator into datafusion-expr (#1761) * move signature, type signature, and volatility to split module (#1763) * [split/10] split up expr for rewriting, visiting, and simplification traits (#1774) * split up expr for rewriting, visiting, and simplification * add docs * move built-in scalar functions (#1764) * split expr type and null info to be expr-schemable (#1784) * rewrite predicates before pushing to union inputs (#1781) * move accumulator and columnar value (#1765) * move accumulator and columnar value (#1762) * fix bad data type in test_try_cast_decimal_to_decimal * added projections for avro columns Co-authored-by: xudong.w <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Yijie Shen <[email protected]> Co-authored-by: Phillip Cloud <[email protected]> Co-authored-by: Matthew Turner <[email protected]> Co-authored-by: Yang <[email protected]> Co-authored-by: Remzi Yang <[email protected]> Co-authored-by: Dan Harris <[email protected]> Co-authored-by: Dom <[email protected]> Co-authored-by: Kun Liu <[email protected]> Co-authored-by: Dmitry Patsura <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]> Co-authored-by: Will Jones <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: r.4ntix <[email protected]> Co-authored-by: Jiayu Liu <[email protected]> Co-authored-by: Andy Grove <[email protected]> Co-authored-by: Rich <[email protected]> Co-authored-by: Marko Mikulicic <[email protected]> Co-authored-by: Eduard Karacharov <[email protected]>
Which issue does this PR close?
Closes #1569.
Rationale for this change
The kinds of requesting memory consumers are pretty limited. As shown in #587, we only have 3 or 4 types of requesting memory consumers (join / sort / agg / repartition). All other consumers that take non-neglectable memory are considered tracking consumers.
Tracking consumers always have a relatively fixed pattern for memory usage. They claim some memory, use it during execution, and free it when finished. The situation for growing or shrinking memory usage can be rare.
Considering the potentially large number of tracking consumers and the simple use case, we'd better have a simple method to achieve this kind of tracking; therefore,
MemTrackingMetrics
is proposed in this PR.What changes are included in this PR?
MemTrackingMetrics
introduced, act similar toBaselineMetrics
, report memory usage withinit_mem_used
and free memory when it's been dropped.MemTrackingMetrics
in SortPreservingMergeStream and SizedRecordBatchStream, simplify the tracking logic.AggregatedMetricsSet
toCompositeMetricsSet
, fix start/end time aggregation.Are there any user-facing changes?
No.