From d62f2621f6335899dd095b70c2d969320386edaa Mon Sep 17 00:00:00 2001 From: Bruno Volpato Date: Tue, 29 Oct 2024 07:33:11 -0400 Subject: [PATCH] feat(substrait): support order_by in aggregate functions (#13114) --- .../substrait/src/logical_plan/consumer.rs | 17 ++- .../tests/cases/roundtrip_logical_plan.rs | 16 ++- ...aggregate_sorted_no_project.substrait.json | 113 ++++++++++++++++++ 3 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 99e7990df623..e0bb3b4e4f33 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -714,14 +714,27 @@ pub async fn from_substrait_rel( } _ => false, }; + let order_by = if !f.sorts.is_empty() { + Some( + from_substrait_sorts( + ctx, + &f.sorts, + input.schema(), + extensions, + ) + .await?, + ) + } else { + None + }; + from_substrait_agg_func( ctx, f, input.schema(), extensions, filter, - // TODO: Add parsing of order_by also - None, + order_by, distinct, ) .await diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 1f654f1d3c95..8108b9ad6767 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -685,6 +685,19 @@ async fn aggregate_wo_projection_consume() -> Result<()> { .await } +#[tokio::test] +async fn aggregate_wo_projection_sorted_consume() -> Result<()> { + let proto_plan = + read_json("tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json"); + + assert_expected_plan_substrait( + proto_plan, + "Aggregate: groupBy=[[data.a]], aggr=[[count(data.a) ORDER BY [data.a DESC NULLS FIRST] AS countA]]\ + \n TableScan: data projection=[a]", + ) + .await +} + #[tokio::test] async fn simple_intersect_consume() -> Result<()> { let proto_plan = read_json("tests/testdata/test_plans/intersect.substrait.json"); @@ -1025,8 +1038,9 @@ async fn roundtrip_aggregate_udf() -> Result<()> { let ctx = create_context().await?; ctx.register_udaf(dummy_agg); + roundtrip_with_ctx("select dummy_agg(a) from data", ctx.clone()).await?; + roundtrip_with_ctx("select dummy_agg(a order by a) from data", ctx.clone()).await?; - roundtrip_with_ctx("select dummy_agg(a) from data", ctx).await?; Ok(()) } diff --git a/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json new file mode 100644 index 000000000000..d5170223cd65 --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/aggregate_sorted_no_project.substrait.json @@ -0,0 +1,113 @@ +{ + "extensionUris": [ + { + "uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate_generic.yaml" + } + ], + "extensions": [ + { + "extensionFunction": { + "functionAnchor": 185, + "name": "count:any" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "a" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_NULLABLE" + } + }, + "namedTable": { + "names": [ + "data" + ] + } + } + }, + "groupings": [ + { + "groupingExpressions": [ + { + "selection": { + "directReference": { + "structField": {} + }, + "rootReference": {} + } + } + ] + } + ], + "measures": [ + { + "measure": { + "functionReference": 185, + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": {} + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": {} + }, + "rootReference": {} + } + } + } + ], + "sorts": [ + { + "expr": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, + "direction": "SORT_DIRECTION_DESC_NULLS_FIRST" + } + ] + } + } + ] + } + }, + "names": [ + "a", + "countA" + ] + } + } + ], + "version": { + "minorNumber": 54, + "producer": "manual" + } +} \ No newline at end of file