Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sum(distinct) support #2405

Merged
merged 5 commits into from
May 4, 2022
Merged

sum(distinct) support #2405

merged 5 commits into from
May 4, 2022

Conversation

WinkerDu
Copy link
Contributor

@WinkerDu WinkerDu commented May 2, 2022

Which issue does this PR close?

Closes #2404 .

Rationale for this change

For now, DataFusion hasn't support sum(distinct) yet. Though optimizer SingleDistinctToGroupBy (#1315) supports single distinct usage, there are more SQL scenes have not been covered.
For example, runs the following unit test,

#[tokio::test]
async fn query_sum_distinct() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("c1", DataType::Int64, true),
        Field::new("c2", DataType::Int64, true),
    ]));

    let data = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(Int64Array::from(vec![
                Some(0),
                Some(1),
                None,
                Some(3),
                Some(3),
            ])),
            Arc::new(Int64Array::from(vec![
                None,
                Some(1),
                Some(1),
                Some(2),
                Some(2),
            ])),
        ],
    )?;

    let table = MemTable::try_new(schema, vec![vec![data]])?;
    let ctx = SessionContext::new();
    ctx.register_table("test", Arc::new(table))?;

    let sql = "SELECT AVG(c1), SUM(DISTINCT c2) FROM test";
    let actual = execute_to_batches(&ctx, sql).await;
    Ok(())
}

then error raises like

NotImplemented(\"SUM(DISTINCT) aggregations are not available\") at Creating physical plan for 'SELECT AVG(c1), SUM(DISTINCT c2) FROM test': Projection: #AVG(test.c1), #SUM(DISTINCT test.c2)\n  Aggregate: groupBy=[[]], aggr=[[AVG(#test.c1), SUM(DISTINCT #test.c2)]]\n    TableScan: test projection=Some([0, 1])

What changes are included in this PR?

Introduces expressions::DistinctSum into DF

  • Maintains a HashSet to record unique numeric list, update HashSet when new item input.
  • Aggregate state stores in ScalarValue::List, which Built from HashSet.
  • During Final aggregate, evaluates DistinctSum by computing sum from numeric stored in HashSet

Are there any user-facing changes?

No.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label May 2, 2022
@WinkerDu WinkerDu changed the title sum(distinct) support sum(distinct) support May 2, 2022
@WinkerDu
Copy link
Contributor Author

WinkerDu commented May 2, 2022

cc @andygrove @alamb @yjshen
Please have a review, thank you.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- thanks @WinkerDu

"+--------------+-----------------------+",
"| AVG(test.c1) | SUM(DISTINCT test.c2) |",
"+--------------+-----------------------+",
"| 1.75 | 3 |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -297,6 +297,18 @@ pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
(ScalarValue::Int64(lhs), ScalarValue::Int8(rhs)) => {
typed_sum!(lhs, rhs, Int64, i64)
}
(ScalarValue::Int64(lhs), ScalarValue::UInt64(rhs)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a fine change in this PR -- though it is strange to me that we have to be doing these casts in sum.rs as it duplicates some non trivial amount of the logic in coercion -- maybe it would be possible to make this code cleaner / consolidate more of the coercion logic.

Again, no changes needed for this PR but I figured I would point it out while reading this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb
I'd look into this coercion logic in some follow up pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coercion u64 to i64 seems irrational to me. Why do we need this kind of coercion in sum distinct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @WinkerDu is just following the same pre-existing pattern with this PR

I agree that the pre-existing pattern doesn't make sense to me (it should have already been done by the time the distinct aggregate code is being executed)


fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
values.iter().for_each(|v| {
// If the value is NULL, it is not included in the final sum.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

generic_test_sum_distinct!(
array,
DataType::Int32,
ScalarValue::from(6i64),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

generic_test_sum_distinct!(
array,
DataType::UInt32,
ScalarValue::from(4i64),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@WinkerDu
Copy link
Contributor Author

WinkerDu commented May 3, 2022

cc @andygrove @tustvold @Dandandan
PTAL, thank you

@alamb
Copy link
Contributor

alamb commented May 4, 2022

Thanks @WinkerDu for the code and @yjshen @Ted-Jiang and @xudong963 for the reviews.

It will be nice to clean up some of the coercion logic -- @WinkerDu let me know if I should file a ticket to track that work

@alamb alamb merged commit 6b4bbd0 into apache:master May 4, 2022
@WinkerDu
Copy link
Contributor Author

WinkerDu commented May 4, 2022

@WinkerDu let me know if I should file a ticket to track that work

Thank you all

It would be good if you can help me filing the ticket, thank you @alamb

@alamb
Copy link
Contributor

alamb commented May 5, 2022

It would be good if you can help me filing the ticket, thank you @alamb

I filed #2447 though in so doing I realized I don't fully understand if there is any change needed or if the coercion is required.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sum(distinct) support
5 participants