-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update DataFusion to arrow 6.0 #984
Conversation
@@ -1779,7 +1779,12 @@ mod tests { | |||
let results = | |||
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; | |||
|
|||
let expected = vec!["++", "||", "++", "++"]; | |||
let expected = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required due to apache/arrow-rs#656
expr: col("c7", &schema).unwrap(), | ||
options: SortOptions::default(), | ||
}]; | ||
let sort = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required to ensure a consistent sort between the two sorting strategies due to the change to use unstable sorting, introduced in apache/arrow-rs#552. The SortPreservingMerge
operator happens to be a stable sort but the sort kernel used by the Sort
Operator no longer is.
The issue here is that the original sort key, c7
has duplicated values as can be seen in this screenshot (e.g the value 18
is repeated in several rows):
The full output is in basic.txt
and partition.txt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @tustvold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good plan if it works 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this PR to use c12
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to add a stable sort option in arrow sort kernel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure @jimexist -- I think it is fairly common that analytic systems don't really have the notion of "stable" sort (because the data doesn't have any well defined sort order in storage).
In DataFusion, for example, the order that the rows are produced (and how they are partitioned) depends on the DataSource (as well as they may be re-arranged by a repartition / coalesce operator). This stable sorting really only is useful for testing when the output has a single RecordBatch
I think
It may be best not to get used to / rely on that stable sorting
expr: col("c7", &schema).unwrap(), | ||
options: SortOptions::default(), | ||
}]; | ||
let sort = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @tustvold
datafusion/tests/sql.rs
Outdated
first_value(cast(c4 as Int)) over (partition by c3), \ | ||
last_value(cast(c4 as Int)) over (partition by c3), \ | ||
nth_value(cast(c4 as Int), 2) over (partition by c3) \ | ||
first_value(cast(c4 as Int)) over (partition by c3 order by c3, c4), \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is also required due to the change to use unstable sorting in apache/arrow-rs#552 but the test is non deterministic according to the sql spec (the query output depends on implementation details).
Specifically, it is computing first_value
last_value
and nth_value
for partitions that have more than one value of the partition by value c3
but does not specify an order by clause to determine how those values should be sorted. :
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi @jimexist
6122563
to
281d6bc
Compare
@@ -407,6 +407,9 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { | |||
fractional: *fractional as u64, | |||
}) | |||
} | |||
DataType::Map(_, _) => { | |||
unimplemented!("Ballista does not yet support Map data type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed due to apache/arrow-rs#491 -- I am not enough of an expert to implement protobuf serialization in Ballista for a new DataType
at this time but I suspect it is not very hard
8231f95
to
f089217
Compare
f089217
to
5971729
Compare
5971729
to
89d214b
Compare
5375d91
to
95a8c58
Compare
null_bit_buffer, | ||
0, | ||
vec![Buffer::from(buffer)], | ||
let data = unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to apache/arrow-rs#822 (this can lead to unsafe behavior here if the buffers are not correctly created). The alternate is to use the try_new
function and skip the (eventual) validation required.
Thoughts?
@@ -1183,7 +1191,7 @@ mod tests { | |||
async fn test_async() { | |||
let schema = test::aggr_test_schema(); | |||
let sort = vec![PhysicalSortExpr { | |||
expr: col("c7", &schema).unwrap(), | |||
expr: col("c12", &schema).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed because c7
is not unique and this since arrow-rs
6.0 sort is no longer stable the output is not deterministic.
@@ -963,6 +963,10 @@ mod tests { | |||
expr: col("c7", &schema).unwrap(), | |||
options: SortOptions::default(), | |||
}, | |||
PhysicalSortExpr { | |||
expr: col("c12", &schema).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
🎉 |
Which issue does this PR close?
Closes #1144
Rationale for this change
Pickup improvements in upstream arrow-rs crate and allow projects downstream of datafusion to use new arrow-rs
This PR is intended for demonstration only; I don't intend to merge this PR as is, but I plan to create a real PR once arrow 6.0 has been released officially.What changes are included in this PR?
Are there any user-facing changes?
The biggest user facing change I think is that sort in arrow-rs is no longer stable, so thus sort in DataFusion will no longer be stable either