Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: querying on date partitions (fixes #1445) #1594

Merged
merged 17 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,13 @@ pub(crate) fn delta_log_schema_for_table(
]
];
static ref REMOVE_FIELDS: Vec<ArrowField> = arrow_defs![
path:Utf8,
deletionTimestamp:Int64,
dataChange:Boolean,
extendedFileMetadata:Boolean
];
static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec<ArrowField> = arrow_defs![
size:Int64,
partitionValues,
tags
path: Utf8,
deletionTimestamp: Int64,
dataChange: Boolean,
extendedFileMetadata: Boolean
];
static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec<ArrowField> =
arrow_defs![size: Int64, partitionValues, tags];
};

// create add fields according to the specific data table schema
Expand Down
13 changes: 9 additions & 4 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ impl DeltaTableState {
let field = ArrowField::try_from(f)?;
let corrected = if wrap_partitions {
match field.data_type() {
// Dictionary encoding boolean types does not yield benefits
// https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997
DataType::Boolean => field.data_type().clone(),
_ => wrap_partition_type_in_dict(field.data_type().clone()),
// Only dictionary-encode types that may be large
// // https://github.com/apache/arrow-datafusion/pull/5545
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary => {
wrap_partition_type_in_dict(field.data_type().clone())
}
_ => field.data_type().clone(),
}
} else {
field.data_type().clone()
Expand Down
116 changes: 105 additions & 11 deletions rust/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit,
};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::TableProvider;
Expand All @@ -34,10 +35,12 @@ use deltalake::action::SaveMode;
use deltalake::delta_datafusion::{DeltaPhysicalCodec, DeltaScan};
use deltalake::operations::create::CreateBuilder;
use deltalake::storage::DeltaObjectStore;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{
operations::{write::WriteBuilder, DeltaOps},
DeltaTable, Schema,
DeltaTable, DeltaTableError, Schema, SchemaDataType, SchemaField,
};
use std::error::Error;

mod common;

Expand Down Expand Up @@ -572,7 +575,7 @@ mod local {
if column == "decimal" || column == "date" || column == "binary" {
continue;
}
println!("Test Column: {} value: {}", column, file1_value);
println!("[Unwrapped] Test Column: {} value: {}", column, file1_value);

// Equality
let e = col(column).eq(file1_value.clone());
Expand Down Expand Up @@ -648,7 +651,11 @@ mod local {
// binary fails since arrow does not implement a natural order
// The current Datafusion pruning implementation does not work for binary columns since they do not have a natural order. See #1214
// Timestamp and date are disabled since the hive path contains illegal Windows values. see #1215
if column == "float32"
if column == "int64"
|| column == "int32"
|| column == "int16"
|| column == "int8"
|| column == "float32"
|| column == "float64"
|| column == "decimal"
|| column == "binary"
Expand All @@ -658,6 +665,8 @@ mod local {
continue;
}

println!("[Wrapped] Test Column: {} value: {}", column, file1_value);

let partitions = vec![column.to_owned()];
let batch = create_all_types_batch(3, 0, 0);
let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await;
Expand Down Expand Up @@ -775,14 +784,7 @@ mod local {

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("c3", ArrowDataType::Int32, true),
ArrowField::new(
"c1",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Int32),
),
false,
),
ArrowField::new("c1", ArrowDataType::Int32, false),
ArrowField::new(
"c2",
ArrowDataType::Dictionary(
Expand Down Expand Up @@ -1023,3 +1025,95 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {

Ok(())
}

mod date_partitions {
use super::*;

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
let columns = vec![
SchemaField::new(
"id".to_owned(),
SchemaDataType::primitive("integer".to_owned()),
false,
HashMap::new(),
),
SchemaField::new(
"date".to_owned(),
SchemaDataType::primitive("date".to_owned()),
false,
HashMap::new(),
),
];

let tmp_dir = tempdir::TempDir::new("opt_table").unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
.with_columns(columns)
.with_partition_columns(["date"])
.await?;

Ok(dt)
}

fn get_batch(ids: Vec<i32>, dates: Vec<i32>) -> Result<RecordBatch, Box<dyn Error>> {
let ids_array: PrimitiveArray<arrow_array::types::Int32Type> = Int32Array::from(ids);
let date_array = Date32Array::from(dates);

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
])),
vec![Arc::new(ids_array), Arc::new(date_array)],
)?)
}

async fn write(
writer: &mut RecordBatchWriter,
table: &mut DeltaTable,
batch: RecordBatch,
) -> Result<(), DeltaTableError> {
writer.write(batch).await?;
writer.flush_and_commit(table).await?;
Ok(())
}

#[tokio::test]
async fn test_issue_1445_date_partition() -> Result<()> {
let ctx = SessionContext::new();
let mut dt = setup_test().await.unwrap();
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
&mut dt,
get_batch(vec![2], vec![19517]).unwrap(),
)
.await?;
ctx.register_table("t", Arc::new(dt))?;

let batches = ctx
.sql(
r#"SELECT *
FROM t
WHERE date > '2023-06-07'
"#,
)
.await?
.collect()
.await?;

let expected = vec![
"+----+------------+",
"| id | date |",
"+----+------------+",
"| 2 | 2023-06-09 |",
"+----+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}
}
Loading