Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DateTimeParts #284

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ members = [
"vortex-alloc",
"vortex-alp",
"vortex-array",
# TODO(ngates): bring back with new Extension API
# "vortex-datetime-parts",
"vortex-datetime-parts",
"vortex-dict",
"vortex-error",
"vortex-fastlanes",
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tokio = { workspace = true }
uuid = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
# vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-dict = { path = "../vortex-dict" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
vortex-fastlanes = { path = "../vortex-fastlanes" }
Expand Down
3 changes: 2 additions & 1 deletion bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use vortex::compress::{CompressConfig, CompressCtx};
use vortex::encoding::{EncodingRef, VORTEX_ENCODINGS};
use vortex::{IntoArray, OwnedArray, ToArrayData};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_dtype::DType;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
Expand Down Expand Up @@ -113,7 +114,7 @@ pub fn enumerate_arrays() -> Vec<EncodingRef> {
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
// &DateTimePartsEncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&REEEncoding,
&RoaringBoolEncoding,
Expand Down
20 changes: 10 additions & 10 deletions vortex-array/src/array/datetime/localdatetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ impl LocalDateTime {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalDateTimeMetadata {
timestamp_dtype: DType,
timestamps_dtype: DType,
}

impl LocalDateTimeArray<'_> {
pub fn new(time_unit: TimeUnit, timestamps: Array) -> Self {
Self::try_from_parts(
LocalDateTime::dtype(time_unit, timestamps.dtype().nullability()),
LocalDateTimeMetadata {
timestamp_dtype: timestamps.dtype().clone(),
timestamps_dtype: timestamps.dtype().clone(),
},
[timestamps.into_array_data()].into(),
Default::default(),
Expand All @@ -61,10 +61,10 @@ impl LocalDateTimeArray<'_> {
TimeUnit::try_from(byte[0]).expect("Invalid time unit")
}

pub fn timestamp(&self) -> Array {
pub fn timestamps(&self) -> Array {
self.array()
.child(0, &self.metadata().timestamp_dtype)
.expect("Missing timestamp array")
.child(0, &self.metadata().timestamps_dtype)
.expect("Missing timestamps array")
}
}

Expand All @@ -77,7 +77,7 @@ impl ArrayCompute for LocalDateTimeArray<'_> {
impl AsArrowArray for LocalDateTimeArray<'_> {
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
// A LocalDateTime maps to an Arrow Timestamp array with no timezone.
let timestamps = cast(&self.timestamp(), PType::I64.into())?.flatten_primitive()?;
let timestamps = cast(&self.timestamps(), PType::I64.into())?.flatten_primitive()?;
let validity = timestamps.logical_validity().to_null_buffer()?;
let buffer = timestamps.scalar_buffer::<i64>();

Expand All @@ -103,17 +103,17 @@ impl ArrayFlatten for LocalDateTimeArray<'_> {

impl ArrayValidity for LocalDateTimeArray<'_> {
fn is_valid(&self, index: usize) -> bool {
self.timestamp().with_dyn(|a| a.is_valid(index))
self.timestamps().with_dyn(|a| a.is_valid(index))
}

fn logical_validity(&self) -> LogicalValidity {
self.timestamp().with_dyn(|a| a.logical_validity())
self.timestamps().with_dyn(|a| a.logical_validity())
}
}

impl AcceptArrayVisitor for LocalDateTimeArray<'_> {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("timestamp", &self.timestamp())
visitor.visit_child("timestamps", &self.timestamps())
}
}

Expand All @@ -123,7 +123,7 @@ impl ArrayStatisticsCompute for LocalDateTimeArray<'_> {

impl ArrayTrait for LocalDateTimeArray<'_> {
fn len(&self) -> usize {
self.timestamp().len()
self.timestamps().len()
}
}

Expand Down
36 changes: 12 additions & 24 deletions vortex-datetime-parts/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use vortex::stats::ArrayStatisticsCompute;
use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{impl_encoding, ArrayDType, ArrayFlatten, ToArrayData};
use vortex_error::vortex_bail;
Expand All @@ -9,10 +9,11 @@ impl_encoding!("vortex.datetimeparts", DateTimeParts);

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DateTimePartsMetadata {
// Validity lives in the days array
// TODO(ngates): we should actually model this with a Tuple array when we have one.
days_dtype: DType,
seconds_dtype: DType,
subseconds_dtype: DType,
validity: ValidityMetadata,
}

impl DateTimePartsArray<'_> {
Expand All @@ -21,7 +22,6 @@ impl DateTimePartsArray<'_> {
days: Array,
seconds: Array,
subsecond: Array,
validity: Validity,
) -> VortexResult<Self> {
if !days.dtype().is_int() {
vortex_bail!(MismatchedTypes: "any integer", days.dtype());
Expand All @@ -43,26 +43,19 @@ impl DateTimePartsArray<'_> {
);
}

let mut children = Vec::with_capacity(4);
children.extend([
days.to_array_data(),
seconds.to_array_data(),
subsecond.to_array_data(),
]);
let validity_metadata = validity.to_metadata(length)?;
if let Some(validity) = validity.into_array_data() {
children.push(validity);
}

Self::try_from_parts(
dtype,
DateTimePartsMetadata {
days_dtype: days.dtype().clone(),
seconds_dtype: seconds.dtype().clone(),
subseconds_dtype: subsecond.dtype().clone(),
validity: validity_metadata,
},
children.into(),
[
days.to_array_data(),
seconds.to_array_data(),
subsecond.to_array_data(),
]
.into(),
StatsSet::new(),
)
}
Expand All @@ -84,30 +77,25 @@ impl DateTimePartsArray<'_> {
.child(2, &self.metadata().subseconds_dtype)
.expect("Missing subsecond array")
}

pub fn validity(&self) -> Validity {
self.metadata()
.validity
.to_validity(self.array().child(3, &Validity::DTYPE))
}
}

impl ArrayFlatten for DateTimePartsArray<'_> {
fn flatten<'a>(self) -> VortexResult<Flattened<'a>>
where
Self: 'a,
{
// TODO(ngates): flatten into vortex.localdatetime or appropriate per dtype
todo!()
}
}

impl ArrayValidity for DateTimePartsArray<'_> {
fn is_valid(&self, index: usize) -> bool {
self.validity().is_valid(index)
self.days().with_dyn(|a| a.is_valid(index))
}

fn logical_validity(&self) -> LogicalValidity {
self.validity().to_logical(self.len())
self.days().with_dyn(|a| a.logical_validity())
}
}

Expand Down
46 changes: 16 additions & 30 deletions vortex-datetime-parts/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use vortex::array::composite::{Composite, CompositeArray};
use vortex::array::datetime::{LocalDateTimeArray, LocalDateTimeExtension, TimeUnit};
use vortex::array::datetime::{LocalDateTimeArray, TimeUnit};
use vortex::array::primitive::PrimitiveArray;
use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::cast::cast;
use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray};
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray, OwnedArray};
use vortex_dtype::PType;
use vortex_error::VortexResult;

Expand All @@ -15,16 +14,10 @@ impl EncodingCompression for DateTimePartsEncoding {
array: &Array,
_config: &CompressConfig,
) -> Option<&dyn EncodingCompression> {
if array.encoding().id() != Composite::ID {
return None;
if LocalDateTimeArray::try_from(array).is_ok() {
return Some(self);
}

let composite = CompositeArray::try_from(array).unwrap();
if !matches!(composite.id(), LocalDateTimeExtension::ID) {
return None;
}

Some(self)
None
}

fn compress(
Expand All @@ -33,17 +26,11 @@ impl EncodingCompression for DateTimePartsEncoding {
like: Option<&Array>,
ctx: CompressCtx,
) -> VortexResult<OwnedArray> {
let array = CompositeArray::try_from(array)?;
match array.id() {
LocalDateTimeExtension::ID => compress_localdatetime(
array
.as_typed()
.expect("Can only compress LocalDateTimeArray"),
like.map(|l| DateTimePartsArray::try_from(l).unwrap()),
ctx,
),
_ => panic!("Unsupported composite ID {}", array.id()),
}
compress_localdatetime(
LocalDateTimeArray::try_from(array)?,
like.map(|l| DateTimePartsArray::try_from(l).unwrap()),
ctx,
)
}
}

Expand All @@ -52,30 +39,30 @@ fn compress_localdatetime(
like: Option<DateTimePartsArray>,
ctx: CompressCtx,
) -> VortexResult<OwnedArray> {
let underlying = cast(array.underlying(), PType::I64.into())?.flatten_primitive()?;
let timestamps = cast(&array.timestamps(), PType::I64.into())?.flatten_primitive()?;

let divisor = match array.underlying_metadata().time_unit() {
let divisor = match array.time_unit() {
TimeUnit::Ns => 1_000_000_000,
TimeUnit::Us => 1_000_000,
TimeUnit::Ms => 1_000,
TimeUnit::S => 1,
};

let length = underlying.len();
let length = timestamps.len();
let mut days = Vec::with_capacity(length);
let mut seconds = Vec::with_capacity(length);
let mut subsecond = Vec::with_capacity(length);

for &t in underlying.typed_data::<i64>().iter() {
for &t in timestamps.typed_data::<i64>().iter() {
days.push(t / (86_400 * divisor));
seconds.push((t % (86_400 * divisor)) / divisor);
subsecond.push((t % (86_400 * divisor)) % divisor);
}

Ok(DateTimePartsArray::try_new(
LocalDateTimeExtension::dtype(underlying.dtype().nullability()),
array.dtype().clone(),
ctx.named("days").compress(
&PrimitiveArray::from(days).into_array(),
&PrimitiveArray::from_vec(days, timestamps.validity()).into_array(),
like.as_ref().map(|l| l.days()).as_ref(),
)?,
ctx.named("seconds").compress(
Expand All @@ -86,7 +73,6 @@ fn compress_localdatetime(
&PrimitiveArray::from(subsecond).into_array(),
like.as_ref().map(|l| l.subsecond()).as_ref(),
)?,
ctx.compress_validity(underlying.validity())?,
)?
.into_array())
}
2 changes: 0 additions & 2 deletions vortex-datetime-parts/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl TakeFn for DateTimePartsArray<'_> {
take(&self.days(), indices)?,
take(&self.seconds(), indices)?,
take(&self.subsecond(), indices)?,
self.validity(),
)?
.into_array())
}
Expand All @@ -36,7 +35,6 @@ impl SliceFn for DateTimePartsArray<'_> {
slice(&self.days(), start, stop)?,
slice(&self.seconds(), start, stop)?,
slice(&self.subsecond(), start, stop)?,
self.validity().slice(start, stop)?,
)?
.into_array())
}
Expand Down