Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: querying on date partitions (fixes #1445) #1481

Closed
wants to merge 10 commits into from
13 changes: 9 additions & 4 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ impl DeltaTableState {
let field = ArrowField::try_from(f)?;
let corrected = if wrap_partitions {
match field.data_type() {
// Dictionary encoding boolean types does not yield benefits
// https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997
DataType::Boolean => field.data_type().clone(),
_ => wrap_partition_type_in_dict(field.data_type().clone()),
// Only dictionary-encode types that may be large
// // https://github.com/apache/arrow-datafusion/pull/5545
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary => {
wrap_partition_type_in_dict(field.data_type().clone())
}
_ => field.data_type().clone(),
}
} else {
field.data_type().clone()
Expand Down
103 changes: 97 additions & 6 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
#![cfg(feature = "datafusion")]

use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array::*;
use arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit,
};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field};
use common::datafusion::context_with_delta_table_factory;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::TableProvider;
Expand All @@ -23,6 +20,12 @@ use datafusion_expr::Expr;
use datafusion_proto::bytes::{
physical_plan_from_bytes_with_extension_codec, physical_plan_to_bytes_with_extension_codec,
};
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{DeltaTableError, SchemaDataType, SchemaField};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::path::PathBuf;
use std::sync::Arc;
use url::Url;

use deltalake::action::SaveMode;
Expand Down Expand Up @@ -813,7 +816,7 @@ async fn test_datafusion_scan_timestamps() -> Result<()> {
#[tokio::test]
async fn test_issue_1292_datafusion_sql_projection() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
let table = deltalake::open_table("./tests/data/issue_1292")
.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;
Expand Down Expand Up @@ -844,7 +847,7 @@ async fn test_issue_1292_datafusion_sql_projection() -> Result<()> {
#[tokio::test]
async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/http_requests")
let table = deltalake::open_table("./tests/data/issue_1291")
Copy link
Collaborator

@wjones127 wjones127 Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of committing a table to the repo, could we write one in a temp directory? I think we want to reserve committing tables to ones that have to be generated by outside tools like Spark.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChewingGlass you willing to update this part soon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be some tests that need to be updated so they no longer expect dictionary encoded columns.
Also need to add the setup where you create a temporary directory and write the partitioned table :)

.await
.unwrap();
ctx.register_table("http_requests", Arc::new(table))?;
Expand Down Expand Up @@ -910,3 +913,91 @@ async fn test_issue_1374() -> Result<()> {

Ok(())
}

async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
let columns = vec![
SchemaField::new(
"id".to_owned(),
SchemaDataType::primitive("integer".to_owned()),
false,
HashMap::new(),
),
SchemaField::new(
"date".to_owned(),
SchemaDataType::primitive("date".to_owned()),
false,
HashMap::new(),
),
];

let tmp_dir = tempdir::TempDir::new("opt_table").unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
.with_columns(columns)
.with_partition_columns(["date"])
.await?;

Ok(dt)
}

fn get_batch(ids: Vec<i32>, dates: Vec<i32>) -> Result<RecordBatch, Box<dyn Error>> {
let ids_array: PrimitiveArray<arrow_array::types::Int32Type> = Int32Array::from(ids);
let date_array = Date32Array::from(dates);

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
])),
vec![Arc::new(ids_array), Arc::new(date_array)],
)?)
}

async fn write(
writer: &mut RecordBatchWriter,
table: &mut DeltaTable,
batch: RecordBatch,
) -> Result<(), DeltaTableError> {
writer.write(batch).await?;
writer.flush_and_commit(table).await?;
Ok(())
}

#[tokio::test]
async fn test_issue_1445_date_partition() -> Result<()> {
let ctx = SessionContext::new();
let mut dt = setup_test().await.unwrap();
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
&mut dt,
get_batch(vec![2], vec![19517]).unwrap(),
)
.await?;
ctx.register_table("t", Arc::new(dt))?;

let batches = ctx
.sql(
r#"SELECT *
FROM t
WHERE date > '2023-06-07'
"#,
)
.await?
.collect()
.await?;

let expected = vec![
"+----+------------+",
"| id | date |",
"+----+------------+",
"| 2 | 2023-06-09 |",
"+----+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}