Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push parquet select to leaves, add correct reordering #271

Merged
merged 54 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
62772f1
test for json
nicklan Jun 27, 2024
3e9665c
checkpoint before ReorderIndex
nicklan Jun 28, 2024
16c7846
working with ReorderIndex enum
nicklan Jun 28, 2024
7625af1
factor out common arrow schema usage
nicklan Jun 28, 2024
014a5a8
child types need an index too
nicklan Jun 28, 2024
3328457
reordering is back, but some things broken, no nested reorder yet
nicklan Jun 28, 2024
e48d3ae
checkpoint
nicklan Jun 28, 2024
f562715
actually fix column ordering
nicklan Jun 28, 2024
4134e9a
checkpoint, properly skipping inner structs
nicklan Jul 1, 2024
b5f5983
working! other than re-ordering children
nicklan Jul 1, 2024
1d72be8
comment fixup
nicklan Jul 1, 2024
9cac2c5
actually reorder children
nicklan Jul 2, 2024
83836cb
no need to swap back
nicklan Jul 2, 2024
2c48f0c
cleaner Vec initialization
nicklan Jul 2, 2024
b695cef
DON'T PANIC
nicklan Jul 8, 2024
baf80e7
just use as_struct
nicklan Jul 8, 2024
49e2581
handle list of struct sorting
nicklan Jul 9, 2024
7aa9870
handle deeper list nesting
nicklan Jul 9, 2024
f76c44e
Fix comment
nicklan Jul 9, 2024
4cd8aaf
use Into::into
nicklan Jul 10, 2024
01d2c07
make rii! a const fn
nicklan Jul 10, 2024
3c55f88
arc earlier
nicklan Jul 10, 2024
07a750c
move ensure_data_types, and use in computation of incicies
nicklan Jul 10, 2024
2937f77
use ok_or_else for map key/val
nicklan Jul 10, 2024
5bcc2ee
parquet files actually contain integers, not logns
nicklan Jul 10, 2024
ab84388
add simple map test
nicklan Jul 10, 2024
189767f
iflet better
nicklan Jul 10, 2024
94b1dc9
no need for ref
nicklan Jul 10, 2024
41d177e
Null -> Missing
nicklan Jul 10, 2024
0210bb7
refactor ReorderIndex into a struct
nicklan Jul 10, 2024
c040827
some cleanup
nicklan Jul 10, 2024
b54a791
add timestamp special case + no_matches test
nicklan Jul 10, 2024
dfd662f
Add empty_requested_schema test
nicklan Jul 10, 2024
1b372a5
flatten and unzip
nicklan Jul 11, 2024
c355100
add a giant doc comment
nicklan Jul 11, 2024
6b92d74
undo the horrible hack, bump dat version
nicklan Jul 12, 2024
56fd9d4
handle timestamp conversions
nicklan Jul 12, 2024
cda2834
add basic_decimal test
nicklan Jul 12, 2024
dd127a4
add timestamp_ntz test, fix test bug
nicklan Jul 12, 2024
d772be5
fmt + test fix
nicklan Jul 12, 2024
b5157d7
add new test data
nicklan Jul 12, 2024
e299ed3
comment fixups
nicklan Jul 12, 2024
f5ca39e
rename new_index -> new_none
nicklan Jul 12, 2024
b580857
fix tranform checks since we now inverted
nicklan Jul 12, 2024
8ad1933
Apply suggestions from code review
nicklan Jul 15, 2024
fa61467
addressing lots of comments
nicklan Jul 15, 2024
f6ff0f3
None -> Identity
nicklan Jul 15, 2024
68bec02
cleaner new methods for ReorderIndex
nicklan Jul 15, 2024
62fb031
renaming to ordering_needs_transform
nicklan Jul 15, 2024
38e9e9f
address a few comments
nicklan Jul 15, 2024
cf47ea9
clean up iterators
nicklan Jul 15, 2024
c477e1e
add and use InternalError type
nicklan Jul 15, 2024
74530f1
use swap_remove
nicklan Jul 15, 2024
2f42578
minor ffi fixes
nicklan Jul 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ version = "0.1.1"
arrow = { version = "^52.0" }
arrow-arith = { version = "^52.0" }
arrow-array = { version = "^52.0" }
arrow-cast = { version = "^52.0" }
arrow-data = { version = "^52.0" }
arrow-ord = { version = "^52.0" }
arrow-json = { version = "^52.0" }
Expand Down
11 changes: 6 additions & 5 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult<RecordBatch> {
Ok(RecordBatch::try_new(batch.schema(), columns)?)
}

static SKIPPED_TESTS: &[&str; 1] = &[
// For multi_partitioned_2: The golden table stores the timestamp as an INT96 (which is
// nanosecond precision), while the spec says we should read partition columns as
// microseconds. This means the read and golden data don't line up. When this is released in
// `dat` upstream, we can stop skipping this test
static SKIPPED_TESTS: &[&str; 2] = &[
// For all_primitive_types and multi_partitioned_2: The golden table stores the timestamp as an
// INT96 (which is nanosecond precision), while the spec says we should read partition columns
// as microseconds. This means the read and golden data don't line up. When this is released in
// `dat` upstream, we can stop skipping these tests
"all_primitive_types",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we handle timestamps correctly, the golden data is "wrong" for this table. This will be fixed by updating to data 0.0.3, and I'll do that shortly after this merges. I've verified manually that we correctly read these two tables.

"multi_partitioned_2",
];

Expand Down
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub enum KernelError {
InvalidTableLocationError,
InvalidDecimalError,
InvalidStructDataError,
InternalError,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -303,6 +304,7 @@ impl From<Error> for KernelError {
Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError,
Error::InvalidDecimal(_) => KernelError::InvalidDecimalError,
Error::InvalidStructData(_) => KernelError::InvalidStructDataError,
Error::InternalError(_) => KernelError::InternalError,
Error::Backtraced {
source,
backtrace: _,
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::ffi::c_void;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanBuilder, ScanData};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, EngineData, Error};
Expand Down
5 changes: 4 additions & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.1.1" }
visibility = "0.1.0"

# Used in default engine
arrow-array = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
arrow-select = { workspace = true, optional = true }
arrow-arith = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-json = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
Expand Down Expand Up @@ -67,6 +68,7 @@ default-engine = [
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
Expand All @@ -80,6 +82,7 @@ default-engine = [

developer-visibility = []
sync-engine = [
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
Expand Down
89 changes: 1 addition & 88 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use itertools::Itertools;

use super::arrow_conversion::LIST_ARRAY_ROOT;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::ensure_data_types;
use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::utils::require;
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

// TODO leverage scalars / Datum
Expand Down Expand Up @@ -161,93 +161,6 @@ fn column_as_struct<'a>(
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
}

fn make_arrow_error(s: String) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
}

/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
/// is the same, but does so recursively into structs, and ensures lists and maps have the correct
/// associated types as well. This returns an `Ok(())` if the types are compatible, or an error if
/// the types do not match. If there is a `struct` type included, we only ensure that the named
/// fields that the kernel is asking for exist, and that for those fields the types
/// match. Un-selected fields are ignored.
fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> DeltaResult<()> {
match (kernel_type, arrow_type) {
(DataType::Primitive(_), _) if arrow_type.is_primitive() => Ok(()),
(DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean)
| (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8)
| (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => {
// strings, bools, and binary aren't primitive in arrow
Ok(())
}
(
DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)),
ArrowDataType::Decimal128(arrow_prec, arrow_scale),
) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => {
// decimal isn't primitive in arrow. cast above is okay as we limit range
Ok(())
}
(DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => {
let kernel_array_type = &inner_type.element_type;
let arrow_list_type = arrow_list_type.data_type();
ensure_data_types(kernel_array_type, arrow_list_type)
}
(DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => {
if let ArrowDataType::Struct(fields) = arrow_map_type.data_type() {
let mut fiter = fields.iter();
if let Some(key_type) = fiter.next() {
ensure_data_types(&kernel_map_type.key_type, key_type.data_type())?;
} else {
return Err(make_arrow_error(
"Arrow map struct didn't have a key type".to_string(),
));
}
if let Some(value_type) = fiter.next() {
ensure_data_types(&kernel_map_type.value_type, value_type.data_type())?;
} else {
return Err(make_arrow_error(
"Arrow map struct didn't have a value type".to_string(),
));
}
Ok(())
} else {
Err(make_arrow_error(
"Arrow map type wasn't a struct.".to_string(),
))
}
}
(DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => {
// build a list of kernel fields that matches the order of the arrow fields
let mapped_fields = arrow_fields
.iter()
.flat_map(|f| kernel_fields.fields.get(f.name()));

// keep track of how many fields we matched up
let mut found_fields = 0;
// ensure that for the fields that we found, the types match
for (kernel_field, arrow_field) in mapped_fields.zip(arrow_fields) {
ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?;
found_fields += 1;
}

// require that we found the number of fields that we requested.
require!(kernel_fields.fields.len() == found_fields, {
let kernel_field_names = kernel_fields.fields.keys().join(", ");
let arrow_field_names = arrow_fields.iter().map(|f| f.name()).join(", ");
make_arrow_error(format!(
"Missing Struct fields. Requested: {}, found: {}",
kernel_field_names, arrow_field_names,
))
});
Ok(())
}
_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
kernel_type, arrow_type
))),
}
}

fn evaluate_expression(
expression: &Expression,
batch: &RecordBatch,
Expand Down
Loading
Loading