-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support timestamps and steps of less than a day for range/generate_series #12400
Changes from 3 commits
e9fc1be
2596358
ab2a913
b2004bd
8be72ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,19 +18,31 @@ | |||||
//! [`ScalarUDFImpl`] definitions for range and gen_series functions. | ||||||
|
||||||
use crate::utils::make_scalar_function; | ||||||
use arrow::array::{Array, ArrayRef, Date32Builder, Int64Array, ListArray, ListBuilder}; | ||||||
use arrow::array::{Array, ArrayRef, Int64Array, ListArray, ListBuilder}; | ||||||
use arrow::datatypes::{DataType, Field}; | ||||||
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; | ||||||
use arrow_array::NullArray; | ||||||
use arrow_array::builder::{Date32Builder, TimestampNanosecondBuilder}; | ||||||
use arrow_array::temporal_conversions::as_datetime_with_timezone; | ||||||
use arrow_array::timezone::Tz; | ||||||
use arrow_array::types::{ | ||||||
Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT, | ||||||
}; | ||||||
use arrow_array::{NullArray, TimestampNanosecondArray}; | ||||||
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; | ||||||
use arrow_schema::DataType::*; | ||||||
use arrow_schema::IntervalUnit::MonthDayNano; | ||||||
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; | ||||||
use datafusion_common::{exec_err, not_impl_datafusion_err, Result}; | ||||||
use arrow_schema::TimeUnit::Nanosecond; | ||||||
use datafusion_common::cast::{ | ||||||
as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array, | ||||||
}; | ||||||
use datafusion_common::{ | ||||||
exec_err, internal_err, not_impl_datafusion_err, DataFusionError, Result, | ||||||
}; | ||||||
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; | ||||||
use itertools::Itertools; | ||||||
use std::any::Any; | ||||||
use std::cmp::Ordering; | ||||||
use std::iter::from_fn; | ||||||
use std::str::FromStr; | ||||||
use std::sync::Arc; | ||||||
|
||||||
make_udf_expr_and_func!( | ||||||
|
@@ -78,7 +90,7 @@ impl ScalarUDFImpl for Range { | |||||
UInt16 => Ok(Int64), | ||||||
UInt32 => Ok(Int64), | ||||||
UInt64 => Ok(Int64), | ||||||
Timestamp(_, _) => Ok(Date32), | ||||||
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())), | ||||||
Date32 => Ok(Date32), | ||||||
Date64 => Ok(Date32), | ||||||
Utf8 => Ok(Date32), | ||||||
|
@@ -109,8 +121,11 @@ impl ScalarUDFImpl for Range { | |||||
match args[0].data_type() { | ||||||
Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args), | ||||||
Date32 => make_scalar_function(|args| gen_range_date(args, false))(args), | ||||||
_ => { | ||||||
exec_err!("unsupported type for range") | ||||||
Timestamp(_, _) => { | ||||||
make_scalar_function(|args| gen_range_timestamp(args, false))(args) | ||||||
} | ||||||
dt => { | ||||||
exec_err!("unsupported type for range. Expected Int64, Date32 or Timestamp, got: {dt}") | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -165,7 +180,7 @@ impl ScalarUDFImpl for GenSeries { | |||||
UInt16 => Ok(Int64), | ||||||
UInt32 => Ok(Int64), | ||||||
UInt64 => Ok(Int64), | ||||||
Timestamp(_, _) => Ok(Date32), | ||||||
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())), | ||||||
Date32 => Ok(Date32), | ||||||
Date64 => Ok(Date32), | ||||||
Utf8 => Ok(Date32), | ||||||
|
@@ -196,9 +211,12 @@ impl ScalarUDFImpl for GenSeries { | |||||
match args[0].data_type() { | ||||||
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args), | ||||||
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args), | ||||||
Timestamp(_, _) => { | ||||||
make_scalar_function(|args| gen_range_timestamp(args, true))(args) | ||||||
} | ||||||
dt => { | ||||||
exec_err!( | ||||||
"unsupported type for gen_series. Expected Int64 or Date32, got: {}", | ||||||
"unsupported type for gen_series. Expected Int64, Date32 or Timestamp, got: {}", | ||||||
dt | ||||||
) | ||||||
} | ||||||
|
@@ -394,3 +412,136 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> { | |||||
|
||||||
Ok(arr) | ||||||
} | ||||||
|
||||||
fn gen_range_timestamp(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if args.len() != 3 { | ||||||
return exec_err!( | ||||||
"arguments length must be 3 for {}", | ||||||
if include_upper { | ||||||
"generate_series" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} else { | ||||||
"range" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
); | ||||||
} | ||||||
|
||||||
// coerce_types fn should coerce all types to Timestamp(Nanosecond, tz) | ||||||
let (start_arr, start_tz_opt) = cast_timestamp_arg(&args[0], include_upper)?; | ||||||
let (stop_arr, stop_tz_opt) = cast_timestamp_arg(&args[1], include_upper)?; | ||||||
let step_arr = as_interval_mdn_array(&args[2])?; | ||||||
let start_tz = parse_tz(start_tz_opt)?; | ||||||
let stop_tz = parse_tz(stop_tz_opt)?; | ||||||
|
||||||
// values are timestamps | ||||||
let values_builder = if let Some(start_tz_str) = start_tz_opt { | ||||||
TimestampNanosecondBuilder::new().with_timezone(&**start_tz_str) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that looks somewhat weird, do we really need so many indirections here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it was written, yes, it was required. I changed it up to use map_or_else to accomplish the same thing. |
||||||
} else { | ||||||
TimestampNanosecondBuilder::new() | ||||||
}; | ||||||
let mut list_builder = ListBuilder::new(values_builder); | ||||||
|
||||||
for idx in 0..start_arr.len() { | ||||||
if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) { | ||||||
list_builder.append_null(); | ||||||
continue; | ||||||
} | ||||||
|
||||||
let start = start_arr.value(idx); | ||||||
let stop = stop_arr.value(idx); | ||||||
let step = step_arr.value(idx); | ||||||
|
||||||
let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step); | ||||||
if months == 0 && days == 0 && ns == 0 { | ||||||
return exec_err!( | ||||||
"Interval argument to {} must not be 0", | ||||||
if include_upper { | ||||||
"generate_series" | ||||||
Omega359 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} else { | ||||||
"range" | ||||||
} | ||||||
); | ||||||
} | ||||||
|
||||||
let neg = TSNT::add_month_day_nano(start, step, start_tz) | ||||||
.ok_or(DataFusionError::Execution( | ||||||
Omega359 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
"Cannot generate timestamp range where start + step overflows" | ||||||
.to_string(), | ||||||
))? | ||||||
.cmp(&start) | ||||||
== Ordering::Less; | ||||||
|
||||||
let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or( | ||||||
DataFusionError::Execution(format!( | ||||||
Omega359 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
"Cannot generate timestamp for stop: {}: {:?}", | ||||||
stop, stop_tz | ||||||
)), | ||||||
)?; | ||||||
|
||||||
let mut current = start; | ||||||
let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or( | ||||||
DataFusionError::Execution(format!( | ||||||
"Cannot generate timestamp for start: {}: {:?}", | ||||||
Omega359 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
current, start_tz | ||||||
)), | ||||||
)?; | ||||||
|
||||||
let values = from_fn(|| { | ||||||
if (include_upper | ||||||
&& ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt))) | ||||||
|| (!include_upper | ||||||
&& ((neg && current_dt <= stop_dt) | ||||||
|| (!neg && current_dt >= stop_dt))) | ||||||
{ | ||||||
return None; | ||||||
} | ||||||
|
||||||
let prev_current = current; | ||||||
|
||||||
if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) { | ||||||
current = ts; | ||||||
current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?; | ||||||
|
||||||
Some(Some(prev_current)) | ||||||
} else { | ||||||
// we failed to parse the timestamp here so terminate the series | ||||||
None | ||||||
} | ||||||
}); | ||||||
|
||||||
list_builder.append_value(values); | ||||||
} | ||||||
|
||||||
let arr = Arc::new(list_builder.finish()); | ||||||
|
||||||
Ok(arr) | ||||||
} | ||||||
|
||||||
fn cast_timestamp_arg( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: You could potentially use And then call I personally found the use of the word There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My programming roots are showing through :) I can take a look at that |
||||||
arg: &ArrayRef, | ||||||
include_upper: bool, | ||||||
) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> { | ||||||
match arg.data_type() { | ||||||
Timestamp(Nanosecond, tz_opt) => { | ||||||
Ok((as_timestamp_nanosecond_array(arg)?, tz_opt)) | ||||||
} | ||||||
_ => { | ||||||
internal_err!( | ||||||
"Unexpected argument type for {} : {}", | ||||||
if include_upper { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this error block is third time showing up, perhaps we can factor it out |
||||||
"generate_series" | ||||||
} else { | ||||||
"range" | ||||||
}, | ||||||
arg.data_type() | ||||||
) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> { | ||||||
let tz = tz.as_ref().map_or_else(|| "+00", |s| s); | ||||||
|
||||||
Tz::from_str(tz).map_err(|op| { | ||||||
DataFusionError::Execution(format!("failed on timezone {tz}: {:?}", op)) | ||||||
}) | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably factor our this block to avoid make a modifications twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is so much duplication in these UDF's. I think it's best to push the cleanup of that to a separate PR and keep this one as small as possible.