Skip to content

Commit

Permalink
feat: Add support for Timestamp data types in data page statistics. (#…
Browse files Browse the repository at this point in the history
…11123)

* feat: Add support for Timestamp data types in data page statistics.

* Simplify array creation using with_timezone_opt.

---------

Co-authored-by: Eric Fredine <[email protected]>
  • Loading branch information
efredine and Eric Fredine authored Jun 26, 2024
1 parent 7adc940 commit dd56dbe
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 41 deletions.
38 changes: 13 additions & 25 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,32 +354,11 @@ macro_rules! get_statistics {
))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());

Ok(match unit {
TimeUnit::Second => {
Arc::new(match timezone {
Some(tz) => TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()),
None => TimestampSecondArray::from_iter(iter),
})
}
TimeUnit::Millisecond => {
Arc::new(match timezone {
Some(tz) => TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()),
None => TimestampMillisecondArray::from_iter(iter),
})
}
TimeUnit::Microsecond => {
Arc::new(match timezone {
Some(tz) => TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()),
None => TimestampMicrosecondArray::from_iter(iter),
})
}
TimeUnit::Nanosecond => {
Arc::new(match timezone {
Some(tz) => TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()),
None => TimestampNanosecondArray::from_iter(iter),
})
}
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
DataType::Time32(unit) => {
Expand Down Expand Up @@ -713,6 +692,15 @@ macro_rules! get_data_page_statistics {
)),
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Timestamp(unit, timezone)) => {
let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
_ => unimplemented!()
}
}
Expand Down
32 changes: 16 additions & 16 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ async fn test_timestamp() {
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "nanos",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand Down Expand Up @@ -776,7 +776,7 @@ async fn test_timestamp() {
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "nanos_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -798,7 +798,7 @@ async fn test_timestamp() {
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "micros",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand Down Expand Up @@ -827,7 +827,7 @@ async fn test_timestamp() {
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "micros_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -849,7 +849,7 @@ async fn test_timestamp() {
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "millis",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand Down Expand Up @@ -878,7 +878,7 @@ async fn test_timestamp() {
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "millis_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -900,7 +900,7 @@ async fn test_timestamp() {
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "seconds",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand Down Expand Up @@ -929,7 +929,7 @@ async fn test_timestamp() {
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "seconds_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down Expand Up @@ -975,7 +975,7 @@ async fn test_timestamp_diff_rg_sizes() {
// row counts are [8, 8, 4]
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "nanos",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1002,7 +1002,7 @@ async fn test_timestamp_diff_rg_sizes() {
// row counts are [8, 8, 4]
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "nanos_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1022,7 +1022,7 @@ async fn test_timestamp_diff_rg_sizes() {
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "micros",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1049,7 +1049,7 @@ async fn test_timestamp_diff_rg_sizes() {
// row counts are [8, 8, 4]
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "micros_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1069,7 +1069,7 @@ async fn test_timestamp_diff_rg_sizes() {
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "millis",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1096,7 +1096,7 @@ async fn test_timestamp_diff_rg_sizes() {
// row counts are [8, 8, 4]
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "millis_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1116,7 +1116,7 @@ async fn test_timestamp_diff_rg_sizes() {
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "seconds",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1143,7 +1143,7 @@ async fn test_timestamp_diff_rg_sizes() {
// row counts are [8, 8, 4]
expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])),
column_name: "seconds_timezoned",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down

0 comments on commit dd56dbe

Please sign in to comment.