-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement hash-partitioned hash aggregate #27
Comments
Does your |
I think yes, for cases where the table is queried directly and we have statistics for distinct values available, those could be using heuristics for To support the more general case we probably need a way to estimate the cardinality of the intermediate results, based on sampling one or a couple of batches with the particular group by expression. I added this requirement in this design doc: |
I also realize I may have implemented this in an inefficient way. Looking at Spark it does first a partial aggregate followed by a hash repartition followed by another aggregation. I think this will not slow down the low cardinality query that much (as the result set will be small already after first aggregate) while being fast - maybe even faster - for high cardinality aggregates. |
* Initial commit * initial commit * failing test * table scan projection * closer * test passes, with some hacks * use DataFrame (#2) * update README * update dependency * code cleanup (#3) * Add support for Filter operator and BinaryOp expressions (#4) * GitHub action (#5) * Split code into producer and consumer modules (#6) * Support more functions and scalar types (#7) * Use substrait 0.1 and datafusion 8.0 (#8) * use substrait 0.1 * use datafusion 8.0 * update datafusion to 10.0 and substrait to 0.2 (#11) * Add basic join support (#12) * Added fetch support (#23) Added fetch to consumer Added limit to producer Added unit tests for limit Added roundtrip_fill_none() for testing when None input can be converted to 0 Update src/consumer.rs Co-authored-by: Andy Grove <[email protected]> Co-authored-by: Andy Grove <[email protected]> * Upgrade to DataFusion 13.0.0 (#25) * Add sort consumer and producer (#24) Add consumer Add producer and test Modified error string * Add serializer/deserializer (#26) * Add plan and function extension support (#27) * Add plan and function extension support * Removed unwraps * Implement GROUP BY (#28) * Add consumer, producer and tests for aggregate relation Change function extension registration from absolute to relative anchor (reference) Remove operator to/from reference * Fixed function registration bug * Add test * Addressed PR comments * Changed field reference from mask to direct reference (#29) * Changed field reference from masked reference to direct reference * Handle unsupported case (struct with child) * Handle SubqueryAlias (#30) Fixed aggregate function register bug * Add support for SELECT DISTINCT (#31) Add test case * Implement BETWEEN (#32) * Add case (#33) * Implement CASE WHEN * Add more case to test * Addressed comments * feat: support explicit catalog/schema names in ReadRel (#34) * feat: support explicit catalog/schema names in ReadRel Signed-off-by: Ruihang Xia <[email protected]> * fix: use re-exported expr crate Signed-off-by: Ruihang Xia <[email protected]> Signed-off-by: Ruihang Xia <[email protected]> * move files to subfolder * RAT * remove rust.yaml * revert .gitignore changes * tomlfmt * tomlfmt Signed-off-by: Ruihang Xia <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: JanKaul <[email protected]> Co-authored-by: nseekhao <[email protected]> Co-authored-by: Ruihang Xia <[email protected]>
This is (up to 4x in my earlier tests) faster than the current implementation that collects all parts to one "full" for cases with very high cardinality in the aggregate (think deduplication code). However, not hash partitioning is faster for very "simple" aggregates as less work needs to be done.
We probably need some fast way to have a rough estimate on the number of distinct values in the aggregate keys, maybe dynamically based on the first batch(es).
Also this work creates a building block for ballista to distribute data across workers, parallelizing it, avoiding collecting it to one worker, and making it scale to bigger datasets.
The text was updated successfully, but these errors were encountered: