diff --git a/Cargo.lock b/Cargo.lock index f2c509d7a..fe71e3e83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,6 +977,7 @@ dependencies = [ "uuid", "vortex-alp", "vortex-array", + "vortex-datetime-parts", "vortex-dict", "vortex-dtype", "vortex-error", @@ -5206,6 +5207,19 @@ dependencies = [ "walkdir", ] +[[package]] +name = "vortex-datetime-parts" +version = "0.1.0" +dependencies = [ + "linkme", + "log", + "serde", + "vortex-array", + "vortex-dtype", + "vortex-error", + "vortex-scalar", +] + [[package]] name = "vortex-dict" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 9dadc4176..cfe57d0ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,7 @@ members = [ "vortex-alloc", "vortex-alp", "vortex-array", - # TODO(ngates): bring back with new Extension API - # "vortex-datetime-parts", + "vortex-datetime-parts", "vortex-dict", "vortex-error", "vortex-fastlanes", diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 5345f3f1e..be0ccbc04 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -41,7 +41,7 @@ tokio = { workspace = true } uuid = { workspace = true } vortex-alp = { path = "../vortex-alp" } vortex-array = { path = "../vortex-array" } -# vortex-datetime-parts = { path = "../vortex-datetime-parts" } +vortex-datetime-parts = { path = "../vortex-datetime-parts" } vortex-dict = { path = "../vortex-dict" } vortex-error = { path = "../vortex-error", features = ["parquet"] } vortex-fastlanes = { path = "../vortex-fastlanes" } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index a45019007..d8621fb82 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -18,6 +18,7 @@ use vortex::compress::{CompressConfig, CompressCtx}; use vortex::encoding::{EncodingRef, VORTEX_ENCODINGS}; use vortex::{IntoArray, OwnedArray, ToArrayData}; use vortex_alp::ALPEncoding; +use vortex_datetime_parts::DateTimePartsEncoding; use vortex_dict::DictEncoding; use vortex_dtype::DType; use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; @@ -113,7 +114,7 @@ pub fn enumerate_arrays() -> Vec { &DictEncoding, &BitPackedEncoding, &FoREncoding, - // &DateTimePartsEncoding, + &DateTimePartsEncoding, // &DeltaEncoding, Blows up the search space too much. &REEEncoding, &RoaringBoolEncoding, diff --git a/vortex-array/src/array/datetime/localdatetime.rs b/vortex-array/src/array/datetime/localdatetime.rs index de83c0bf3..35318f939 100644 --- a/vortex-array/src/array/datetime/localdatetime.rs +++ b/vortex-array/src/array/datetime/localdatetime.rs @@ -32,7 +32,7 @@ impl LocalDateTime { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LocalDateTimeMetadata { - timestamp_dtype: DType, + timestamps_dtype: DType, } impl LocalDateTimeArray<'_> { @@ -40,7 +40,7 @@ impl LocalDateTimeArray<'_> { Self::try_from_parts( LocalDateTime::dtype(time_unit, timestamps.dtype().nullability()), LocalDateTimeMetadata { - timestamp_dtype: timestamps.dtype().clone(), + timestamps_dtype: timestamps.dtype().clone(), }, [timestamps.into_array_data()].into(), Default::default(), @@ -61,10 +61,10 @@ impl LocalDateTimeArray<'_> { TimeUnit::try_from(byte[0]).expect("Invalid time unit") } - pub fn timestamp(&self) -> Array { + pub fn timestamps(&self) -> Array { self.array() - .child(0, &self.metadata().timestamp_dtype) - .expect("Missing timestamp array") + .child(0, &self.metadata().timestamps_dtype) + .expect("Missing timestamps array") } } @@ -77,7 +77,7 @@ impl ArrayCompute for LocalDateTimeArray<'_> { impl AsArrowArray for LocalDateTimeArray<'_> { fn as_arrow(&self) -> VortexResult { // A LocalDateTime maps to an Arrow Timestamp array with no timezone. - let timestamps = cast(&self.timestamp(), PType::I64.into())?.flatten_primitive()?; + let timestamps = cast(&self.timestamps(), PType::I64.into())?.flatten_primitive()?; let validity = timestamps.logical_validity().to_null_buffer()?; let buffer = timestamps.scalar_buffer::(); @@ -103,17 +103,17 @@ impl ArrayFlatten for LocalDateTimeArray<'_> { impl ArrayValidity for LocalDateTimeArray<'_> { fn is_valid(&self, index: usize) -> bool { - self.timestamp().with_dyn(|a| a.is_valid(index)) + self.timestamps().with_dyn(|a| a.is_valid(index)) } fn logical_validity(&self) -> LogicalValidity { - self.timestamp().with_dyn(|a| a.logical_validity()) + self.timestamps().with_dyn(|a| a.logical_validity()) } } impl AcceptArrayVisitor for LocalDateTimeArray<'_> { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { - visitor.visit_child("timestamp", &self.timestamp()) + visitor.visit_child("timestamps", &self.timestamps()) } } @@ -123,7 +123,7 @@ impl ArrayStatisticsCompute for LocalDateTimeArray<'_> { impl ArrayTrait for LocalDateTimeArray<'_> { fn len(&self) -> usize { - self.timestamp().len() + self.timestamps().len() } } diff --git a/vortex-datetime-parts/src/array.rs b/vortex-datetime-parts/src/array.rs index 2791fb946..6ab8a450f 100644 --- a/vortex-datetime-parts/src/array.rs +++ b/vortex-datetime-parts/src/array.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex::stats::ArrayStatisticsCompute; -use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; +use vortex::validity::{ArrayValidity, LogicalValidity}; use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor}; use vortex::{impl_encoding, ArrayDType, ArrayFlatten, ToArrayData}; use vortex_error::vortex_bail; @@ -9,10 +9,11 @@ impl_encoding!("vortex.datetimeparts", DateTimeParts); #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DateTimePartsMetadata { + // Validity lives in the days array + // TODO(ngates): we should actually model this with a Tuple array when we have one. days_dtype: DType, seconds_dtype: DType, subseconds_dtype: DType, - validity: ValidityMetadata, } impl DateTimePartsArray<'_> { @@ -21,7 +22,6 @@ impl DateTimePartsArray<'_> { days: Array, seconds: Array, subsecond: Array, - validity: Validity, ) -> VortexResult { if !days.dtype().is_int() { vortex_bail!(MismatchedTypes: "any integer", days.dtype()); @@ -43,26 +43,19 @@ impl DateTimePartsArray<'_> { ); } - let mut children = Vec::with_capacity(4); - children.extend([ - days.to_array_data(), - seconds.to_array_data(), - subsecond.to_array_data(), - ]); - let validity_metadata = validity.to_metadata(length)?; - if let Some(validity) = validity.into_array_data() { - children.push(validity); - } - Self::try_from_parts( dtype, DateTimePartsMetadata { days_dtype: days.dtype().clone(), seconds_dtype: seconds.dtype().clone(), subseconds_dtype: subsecond.dtype().clone(), - validity: validity_metadata, }, - children.into(), + [ + days.to_array_data(), + seconds.to_array_data(), + subsecond.to_array_data(), + ] + .into(), StatsSet::new(), ) } @@ -84,12 +77,6 @@ impl DateTimePartsArray<'_> { .child(2, &self.metadata().subseconds_dtype) .expect("Missing subsecond array") } - - pub fn validity(&self) -> Validity { - self.metadata() - .validity - .to_validity(self.array().child(3, &Validity::DTYPE)) - } } impl ArrayFlatten for DateTimePartsArray<'_> { @@ -97,17 +84,18 @@ impl ArrayFlatten for DateTimePartsArray<'_> { where Self: 'a, { + // TODO(ngates): flatten into vortex.localdatetime or appropriate per dtype todo!() } } impl ArrayValidity for DateTimePartsArray<'_> { fn is_valid(&self, index: usize) -> bool { - self.validity().is_valid(index) + self.days().with_dyn(|a| a.is_valid(index)) } fn logical_validity(&self) -> LogicalValidity { - self.validity().to_logical(self.len()) + self.days().with_dyn(|a| a.logical_validity()) } } diff --git a/vortex-datetime-parts/src/compress.rs b/vortex-datetime-parts/src/compress.rs index 35d56ccc0..cbd14a5f8 100644 --- a/vortex-datetime-parts/src/compress.rs +++ b/vortex-datetime-parts/src/compress.rs @@ -1,9 +1,8 @@ -use vortex::array::composite::{Composite, CompositeArray}; -use vortex::array::datetime::{LocalDateTimeArray, LocalDateTimeExtension, TimeUnit}; +use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; use vortex::compute::cast::cast; -use vortex::{Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, OwnedArray}; +use vortex::{Array, ArrayDType, ArrayTrait, IntoArray, OwnedArray}; use vortex_dtype::PType; use vortex_error::VortexResult; @@ -15,16 +14,10 @@ impl EncodingCompression for DateTimePartsEncoding { array: &Array, _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { - if array.encoding().id() != Composite::ID { - return None; + if LocalDateTimeArray::try_from(array).is_ok() { + return Some(self); } - - let composite = CompositeArray::try_from(array).unwrap(); - if !matches!(composite.id(), LocalDateTimeExtension::ID) { - return None; - } - - Some(self) + None } fn compress( @@ -33,17 +26,11 @@ impl EncodingCompression for DateTimePartsEncoding { like: Option<&Array>, ctx: CompressCtx, ) -> VortexResult { - let array = CompositeArray::try_from(array)?; - match array.id() { - LocalDateTimeExtension::ID => compress_localdatetime( - array - .as_typed() - .expect("Can only compress LocalDateTimeArray"), - like.map(|l| DateTimePartsArray::try_from(l).unwrap()), - ctx, - ), - _ => panic!("Unsupported composite ID {}", array.id()), - } + compress_localdatetime( + LocalDateTimeArray::try_from(array)?, + like.map(|l| DateTimePartsArray::try_from(l).unwrap()), + ctx, + ) } } @@ -52,30 +39,30 @@ fn compress_localdatetime( like: Option, ctx: CompressCtx, ) -> VortexResult { - let underlying = cast(array.underlying(), PType::I64.into())?.flatten_primitive()?; + let timestamps = cast(&array.timestamps(), PType::I64.into())?.flatten_primitive()?; - let divisor = match array.underlying_metadata().time_unit() { + let divisor = match array.time_unit() { TimeUnit::Ns => 1_000_000_000, TimeUnit::Us => 1_000_000, TimeUnit::Ms => 1_000, TimeUnit::S => 1, }; - let length = underlying.len(); + let length = timestamps.len(); let mut days = Vec::with_capacity(length); let mut seconds = Vec::with_capacity(length); let mut subsecond = Vec::with_capacity(length); - for &t in underlying.typed_data::().iter() { + for &t in timestamps.typed_data::().iter() { days.push(t / (86_400 * divisor)); seconds.push((t % (86_400 * divisor)) / divisor); subsecond.push((t % (86_400 * divisor)) % divisor); } Ok(DateTimePartsArray::try_new( - LocalDateTimeExtension::dtype(underlying.dtype().nullability()), + array.dtype().clone(), ctx.named("days").compress( - &PrimitiveArray::from(days).into_array(), + &PrimitiveArray::from_vec(days, timestamps.validity()).into_array(), like.as_ref().map(|l| l.days()).as_ref(), )?, ctx.named("seconds").compress( @@ -86,7 +73,6 @@ fn compress_localdatetime( &PrimitiveArray::from(subsecond).into_array(), like.as_ref().map(|l| l.subsecond()).as_ref(), )?, - ctx.compress_validity(underlying.validity())?, )? .into_array()) } diff --git a/vortex-datetime-parts/src/compute.rs b/vortex-datetime-parts/src/compute.rs index 8a4b1c9de..29ee25208 100644 --- a/vortex-datetime-parts/src/compute.rs +++ b/vortex-datetime-parts/src/compute.rs @@ -23,7 +23,6 @@ impl TakeFn for DateTimePartsArray<'_> { take(&self.days(), indices)?, take(&self.seconds(), indices)?, take(&self.subsecond(), indices)?, - self.validity(), )? .into_array()) } @@ -36,7 +35,6 @@ impl SliceFn for DateTimePartsArray<'_> { slice(&self.days(), start, stop)?, slice(&self.seconds(), start, stop)?, slice(&self.subsecond(), start, stop)?, - self.validity().slice(start, stop)?, )? .into_array()) }