Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DataFusion to arrow 6.0 #984

Merged
merged 10 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ edition = "2018"
publish = false

[dependencies]
arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
prost = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tonic = "0.5"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"

arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }

datafusion = { path = "../../../datafusion", version = "5.1.0" }

Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fractional: *fractional as u64,
})
}
DataType::Map(_, _) => {
unimplemented!("Ballista does not yet support Map data type")
Copy link
Contributor Author

@alamb alamb Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed due to apache/arrow-rs#491 -- I am not enough of an expert to implement protobuf serialization in Ballista for a new DataType at this time but I suspect it is not very hard

}
}
}
}
Expand Down Expand Up @@ -490,6 +493,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::Struct(_)
| DataType::Union(_)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
return Err(proto_error(format!(
"Error converting to Datatype to scalar type, {:?} is invalid as a datafusion scalar.",
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ edition = "2018"
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { version = "^5.3" }
arrow-flight = { version = "^5.3" }
arrow = { version = "6.0.0" }
arrow-flight = { version = "6.0.0" }
anyhow = "1"
async-trait = "0.1.36"
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion", version = "5.1.0" }
arrow = { version = "^5.3" }
arrow = { version = "6.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }
datafusion = { path = "../datafusion" }
prost = "0.8"
tonic = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = "0.7"
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "^5.3", features = ["prettyprint"] }
parquet = { version = "^5.3", features = ["arrow"] }
arrow = { version = "6.0.0", features = ["prettyprint"] }
parquet = { version = "6.0.0", features = ["arrow"] }
sqlparser = "0.12"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
8 changes: 6 additions & 2 deletions datafusion/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.add_buffer(bool_values.into())
.null_bit_buffer(bool_nulls.into())
.build()
.unwrap()
}
DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
Expand Down Expand Up @@ -569,6 +570,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.null_bit_buffer(buf)
.child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
.build()
.unwrap()
}
datatype => {
return Err(ArrowError::SchemaError(format!(
Expand All @@ -583,7 +585,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
.null_bit_buffer(list_nulls.into())
.build();
.build()
.unwrap();
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
}

Expand Down Expand Up @@ -776,7 +779,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.child_data(
arrays.into_iter().map(|a| a.data().clone()).collect(),
)
.build();
.build()
.unwrap();
Ok(make_array(data))
}
_ => Err(ArrowError::SchemaError(format!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Struct(_) => "struct",
DataType::Union(_) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
}
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,12 @@ mod tests {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;

let expected = vec!["++", "||", "++", "++"];
let expected = vec![
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required due to apache/arrow-rs#656

"+----+--------------+",
"| c1 | AVG(test.c2) |",
"+----+--------------+",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
Expand Down
20 changes: 11 additions & 9 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ macro_rules! compare_op_scalar {
// same as $left.len()
let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) };

let data = ArrayData::new(
DataType::Boolean,
$left.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from(buffer)],
vec![],
);
let data = unsafe {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to apache/arrow-rs#822 (this can lead to unsafe behavior here if the buffers are not correctly created). The alternate is to use the try_new function and skip the (eventual) validation required.

Thoughts?

ArrayData::new_unchecked(
DataType::Boolean,
$left.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from(buffer)],
vec![],
)
};
Ok(BooleanArray::from(data))
}};
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,11 +678,13 @@ fn build_join_indexes(
let left = ArrayData::builder(DataType::UInt64)
.len(left_indices.len())
.add_buffer(left_indices.finish())
.build();
.build()
.unwrap();
let right = ArrayData::builder(DataType::UInt32)
.len(right_indices.len())
.add_buffer(right_indices.finish())
.build();
.build()
.unwrap();

Ok((
PrimitiveArray::<UInt64Type>::from(left),
Expand Down
18 changes: 15 additions & 3 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,10 @@ mod tests {
expr: col("c7", &schema).unwrap(),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to make the sort output deterministic

Without this change, there is one row that comes out in a slightly different order (same values of c2 and c7)

Screen Shot 2021-10-18 at 12 55 18 PM

options: SortOptions::default(),
},
];

let basic = basic_sort(csv.clone(), sort.clone()).await;
Expand All @@ -971,7 +975,11 @@ mod tests {
let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();

assert_eq!(basic, partition);
assert_eq!(
basic, partition,
"basic:\n\n{}\n\npartition:\n\n{}\n\n",
basic, partition
);
}

// Split the provided record batch into multiple batch_size record batches
Expand Down Expand Up @@ -1183,7 +1191,7 @@ mod tests {
async fn test_async() {
let schema = test::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c7", &schema).unwrap(),
expr: col("c12", &schema).unwrap(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed because c7 is not unique and this since arrow-rs 6.0 sort is no longer stable the output is not deterministic.

options: SortOptions::default(),
}];

Expand Down Expand Up @@ -1234,7 +1242,11 @@ mod tests {
let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
let partition = arrow::util::pretty::pretty_format_batches(&[merged]).unwrap();

assert_eq!(basic, partition);
assert_eq!(
basic, partition,
"basic:\n\n{}\n\npartition:\n\n{}\n\n",
basic, partition
);
}

#[tokio::test]
Expand Down