Skip to content

Commit

Permalink
Update DataFusion to arrow 6.0 (#984)
Browse files Browse the repository at this point in the history
* Update to arrow 6.0

* Update empty result

* Fix up sort_preserving_merge test

* Use column c12, per @yjshen

* Add minimal ballista support

* Update to use new arrow apis

* Fixup sort preserving merge test again

* make sort deterministic

* less whitespace

* patch up avro
  • Loading branch information
alamb authored Oct 19, 2021
1 parent 01f16f4 commit 57d7777
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ edition = "2018"
publish = false

[dependencies]
arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
prost = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tonic = "0.5"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"

arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }

datafusion = { path = "../../../datafusion", version = "5.1.0" }

Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fractional: *fractional as u64,
})
}
DataType::Map(_, _) => {
unimplemented!("Ballista does not yet support Map data type")
}
}
}
}
Expand Down Expand Up @@ -490,6 +493,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::Struct(_)
| DataType::Union(_)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
return Err(proto_error(format!(
"Error converting to Datatype to scalar type, {:?} is invalid as a datafusion scalar.",
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ edition = "2018"
snmalloc = ["snmalloc-rs"]

[dependencies]
arrow = { version = "^5.3" }
arrow-flight = { version = "^5.3" }
arrow = { version = "6.0.0" }
arrow-flight = { version = "6.0.0" }
anyhow = "1"
async-trait = "0.1.36"
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion", version = "5.1.0" }
arrow = { version = "^5.3" }
arrow = { version = "6.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "^5.3" }
arrow-flight = { version = "6.0.0" }
datafusion = { path = "../datafusion" }
prost = "0.8"
tonic = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = "0.7"
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "^5.3", features = ["prettyprint"] }
parquet = { version = "^5.3", features = ["arrow"] }
arrow = { version = "6.0.0", features = ["prettyprint"] }
parquet = { version = "6.0.0", features = ["arrow"] }
sqlparser = "0.12"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
8 changes: 6 additions & 2 deletions datafusion/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.add_buffer(bool_values.into())
.null_bit_buffer(bool_nulls.into())
.build()
.unwrap()
}
DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
Expand Down Expand Up @@ -569,6 +570,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.null_bit_buffer(buf)
.child_data(arrays.into_iter().map(|a| a.data().clone()).collect())
.build()
.unwrap()
}
datatype => {
return Err(ArrowError::SchemaError(format!(
Expand All @@ -583,7 +585,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
.null_bit_buffer(list_nulls.into())
.build();
.build()
.unwrap();
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
}

Expand Down Expand Up @@ -776,7 +779,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.child_data(
arrays.into_iter().map(|a| a.data().clone()).collect(),
)
.build();
.build()
.unwrap();
Ok(make_array(data))
}
_ => Err(ArrowError::SchemaError(format!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Struct(_) => "struct",
DataType::Union(_) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
}
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,12 @@ mod tests {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;

let expected = vec!["++", "||", "++", "++"];
let expected = vec![
"+----+--------------+",
"| c1 | AVG(test.c2) |",
"+----+--------------+",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
Expand Down
20 changes: 11 additions & 9 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ macro_rules! compare_op_scalar {
// same as $left.len()
let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) };

let data = ArrayData::new(
DataType::Boolean,
$left.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from(buffer)],
vec![],
);
let data = unsafe {
ArrayData::new_unchecked(
DataType::Boolean,
$left.len(),
None,
null_bit_buffer,
0,
vec![Buffer::from(buffer)],
vec![],
)
};
Ok(BooleanArray::from(data))
}};
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,11 +678,13 @@ fn build_join_indexes(
let left = ArrayData::builder(DataType::UInt64)
.len(left_indices.len())
.add_buffer(left_indices.finish())
.build();
.build()
.unwrap();
let right = ArrayData::builder(DataType::UInt32)
.len(right_indices.len())
.add_buffer(right_indices.finish())
.build();
.build()
.unwrap();

Ok((
PrimitiveArray::<UInt64Type>::from(left),
Expand Down
18 changes: 15 additions & 3 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,10 @@ mod tests {
expr: col("c7", &schema).unwrap(),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
},
];

let basic = basic_sort(csv.clone(), sort.clone()).await;
Expand All @@ -971,7 +975,11 @@ mod tests {
let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();

assert_eq!(basic, partition);
assert_eq!(
basic, partition,
"basic:\n\n{}\n\npartition:\n\n{}\n\n",
basic, partition
);
}

// Split the provided record batch into multiple batch_size record batches
Expand Down Expand Up @@ -1183,7 +1191,7 @@ mod tests {
async fn test_async() {
let schema = test::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c7", &schema).unwrap(),
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
}];

Expand Down Expand Up @@ -1234,7 +1242,11 @@ mod tests {
let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
let partition = arrow::util::pretty::pretty_format_batches(&[merged]).unwrap();

assert_eq!(basic, partition);
assert_eq!(
basic, partition,
"basic:\n\n{}\n\npartition:\n\n{}\n\n",
basic, partition
);
}

#[tokio::test]
Expand Down

0 comments on commit 57d7777

Please sign in to comment.