Skip to content

Commit

Permalink
Merge branch 'cubesql-3-04-2022' into m-d-ns-intervals-mul
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-str committed Jun 18, 2024
2 parents a842c10 + e61a28a commit e413c5f
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 53 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,9 @@ impl ScalarValue {
DataType::Interval(IntervalUnit::YearMonth) => {
build_array_primitive!(IntervalYearMonthArray, IntervalYearMonth)
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
}
DataType::List(fields) if fields.data_type() == &DataType::Int8 => {
build_array_list_primitive!(Int8Type, Int8, i8)
}
Expand Down
42 changes: 38 additions & 4 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use ahash::RandomState;

use arrow::{
array::{
ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray,
UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder,
ArrayData, ArrayRef, BooleanArray, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeStringArray,
PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampSecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder,
UInt64Builder,
},
compute,
datatypes::{UInt32Type, UInt64Type},
datatypes::{IntervalUnit, UInt32Type, UInt64Type},
};
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
Expand Down Expand Up @@ -925,6 +927,38 @@ fn equal_rows(
)
}
},
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
equal_rows_elem!(
IntervalYearMonthArray,
l,
r,
left,
right,
null_equals_null
)
}
IntervalUnit::DayTime => {
equal_rows_elem!(
IntervalDayTimeArray,
l,
r,
left,
right,
null_equals_null
)
}
IntervalUnit::MonthDayNano => {
equal_rows_elem!(
IntervalMonthDayNanoArray,
l,
r,
left,
right,
null_equals_null
)
}
},
DataType::Utf8 => {
equal_rows_elem!(StringArray, l, r, left, right, null_equals_null)
}
Expand Down
35 changes: 33 additions & 2 deletions datafusion/core/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use ahash::{CallHasher, RandomState};
use arrow::array::{
Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DecimalArray,
DictionaryArray, Float32Array, Float64Array, GenericListArray, Int16Array,
Int32Array, Int64Array, Int8Array, LargeStringArray, OffsetSizeTrait, StringArray,
Int32Array, Int64Array, Int8Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
IntervalYearMonthArray, LargeStringArray, OffsetSizeTrait, StringArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type,
Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
Int8Type, IntervalUnit, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use std::sync::Arc;

Expand Down Expand Up @@ -469,6 +470,36 @@ pub fn create_hashes<'a>(
multi_col
);
}
DataType::Interval(IntervalUnit::YearMonth) => {
hash_array_primitive!(
IntervalYearMonthArray,
col,
i32,
hashes_buffer,
random_state,
multi_col
);
}
DataType::Interval(IntervalUnit::DayTime) => {
hash_array_primitive!(
IntervalDayTimeArray,
col,
i64,
hashes_buffer,
random_state,
multi_col
);
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
hash_array_primitive!(
IntervalMonthDayNanoArray,
col,
i128,
hashes_buffer,
random_state,
multi_col
);
}
DataType::Date32 => {
hash_array_primitive!(
Date32Array,
Expand Down
145 changes: 114 additions & 31 deletions datafusion/cube_ext/src/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

use arrow::array::{Array, Float64Array, Int32Array, Int32Builder, PrimitiveArray};
use arrow::compute::kernels::arity::unary;
use arrow::datatypes::{ArrowNumericType, ArrowTemporalType, DataType, TimeUnit};
use arrow::datatypes::{
ArrowNumericType, ArrowPrimitiveType, ArrowTemporalType, DataType, Date32Type,
Date64Type, Float64Type, IntervalDayTimeType, IntervalMonthDayNanoType,
IntervalYearMonthType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow::error::{ArrowError, Result};

use chrono::format::strftime::StrftimeItems;
Expand Down Expand Up @@ -145,39 +150,117 @@ where
Ok(b.finish())
}

fn postgres_months_epoch(n: i32) -> f64 {
let years = n / 12;
let remainder = n % 12;
// Note that this arithmetic produces exact integer calculations with no floating point error.
let seconds_in_a_day = 86400_f64;
(years as f64) * (seconds_in_a_day * 365.25)
+ (remainder as f64) * (seconds_in_a_day * 30.0)
}

pub trait Epochable: ArrowPrimitiveType + Sized {
fn get_epoch(array: &PrimitiveArray<Self>) -> PrimitiveArray<Float64Type>;
}

impl Epochable for TimestampSecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampSecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64)
}
}

impl Epochable for TimestampMillisecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampMillisecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000.0)
}
}

impl Epochable for TimestampMicrosecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampMicrosecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000_000.0)
}
}

impl Epochable for TimestampNanosecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampNanosecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000_000_000.0)
}
}

impl Epochable for Date32Type {
fn get_epoch(array: &PrimitiveArray<Date32Type>) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
let seconds_in_a_day = 86400.0;
n as f64 * seconds_in_a_day
})
}
}

impl Epochable for Date64Type {
fn get_epoch(array: &PrimitiveArray<Date64Type>) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000.0)
}
}

impl Epochable for IntervalYearMonthType {
fn get_epoch(
array: &PrimitiveArray<IntervalYearMonthType>,
) -> PrimitiveArray<Float64Type> {
unary(array, postgres_months_epoch)
}
}

impl Epochable for IntervalDayTimeType {
fn get_epoch(
array: &PrimitiveArray<IntervalDayTimeType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
// Implemented based on scalar_timestamp_add_interval_day_time
let sign = n.signum();
let n = n.abs();
// i64::MIN would work okay in release mode after the bitmask
// in the days variable (but TODO: what is Postgres' exact behavior?)

let days: i64 = (n >> 32) & 0xFFFF_FFFF;
let millis: i64 = n & 0xFFFF_FFFF;

let seconds_in_a_day = 86400.0;
sign as f64 * ((days as f64) * seconds_in_a_day + (millis as f64) / 1_000.0)
})
}
}

impl Epochable for IntervalMonthDayNanoType {
fn get_epoch(
array: &PrimitiveArray<IntervalMonthDayNanoType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
let seconds_in_a_day = 86400_f64;
let n: i128 = n;
let month = (n >> 96) & 0xFFFF_FFFF;
let day = (n >> 64) & 0xFFFF_FFFF;
let nano = n & 0xFFFF_FFFF_FFFF_FFFF;
let month_epoch: f64 = postgres_months_epoch(month as i32);
month_epoch
+ (day as f64) * seconds_in_a_day
+ (nano as f64) / 1_000_000_000.0
})
}
}

pub fn epoch<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
T: Epochable,
{
let b = match array.data_type() {
DataType::Timestamp(tu, _) => {
let scale = match tu {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
} as f64;
unary(array, |n| {
let n: i64 = n.into();
n as f64 / scale
})
}
DataType::Date32 => {
let seconds_in_a_day = 86400_f64;
unary(array, |n| {
let n: i64 = n.into();
n as f64 * seconds_in_a_day
})
}
DataType::Date64 => unary(array, |n| {
let n: i64 = n.into();
n as f64 / 1_000_f64
}),
_ => {
return_compute_error_with!("Can not convert {:?} to epoch", array.data_type())
}
};
let b = Epochable::get_epoch(array);
Ok(b)
}

Expand Down
Loading

0 comments on commit e413c5f

Please sign in to comment.