diff --git a/Cargo.lock b/Cargo.lock index d8db21917..8732cb36d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,10 +457,10 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", - "vortex-ipc", "vortex-roaring", "vortex-runend", "vortex-sampling-compressor", + "vortex-serde", "xshell", ] @@ -4239,39 +4239,6 @@ dependencies = [ "flatbuffers", ] -[[package]] -name = "vortex-ipc" -version = "0.1.0" -dependencies = [ - "arrow", - "arrow-array", - "arrow-ipc", - "arrow-schema", - "arrow-select", - "build-vortex", - "bytes", - "criterion", - "flatbuffers", - "futures-executor", - "futures-util", - "itertools 0.13.0", - "monoio", - "object_store", - "pin-project", - "rand", - "simplelog", - "tokio", - "vortex-alp", - "vortex-array", - "vortex-buffer", - "vortex-dtype", - "vortex-error", - "vortex-fastlanes", - "vortex-flatbuffers", - "vortex-sampling-compressor", - "vortex-scalar", -] - [[package]] name = "vortex-roaring" version = "0.1.0" @@ -4357,6 +4324,40 @@ dependencies = [ "vortex-flatbuffers", ] +[[package]] +name = "vortex-serde" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "build-vortex", + "bytes", + "criterion", + "flatbuffers", + "futures", + "futures-executor", + "futures-util", + "itertools 0.13.0", + "monoio", + "object_store", + "pin-project", + "rand", + "simplelog", + "tokio", + "vortex-alp", + "vortex-array", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-fastlanes", + "vortex-flatbuffers", + "vortex-sampling-compressor", + "vortex-scalar", +] + [[package]] name = "vortex-zigzag" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 51336a85a..79853109c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "vortex-error", "vortex-expr", "vortex-flatbuffers", - "vortex-ipc", + "vortex-serde", "vortex-sampling-compressor", ] resolver = "2" @@ -32,6 +32,7 @@ rust-version = "1.76" [workspace.dependencies] ahash = "0.8.11" allocator-api2 = "0.2.16" +anyhow = "1.0" arrayref = "0.3.7" arrow = { version = "52.0.0", features = ["pyarrow"] } arrow-arith = "52.0.0" @@ -53,7 +54,6 @@ chrono = "0.4.38" criterion = { version = "0.5.1", features = ["html_reports"] } croaring = "2.0.0" csv = "1.3.0" -object_store = "0.10.1" datafusion = "40.0.0" datafusion-common = "40.0.0" datafusion-execution = "40.0.0" @@ -81,9 +81,11 @@ itertools = "0.13.0" lazy_static = "1.4.0" leb128 = "0.2.5" log = "0.4.21" +mimalloc = "0.1.42" monoio = "0.2.3" num-traits = "0.2.18" num_enum = "0.7.2" +object_store = "0.10.1" parquet = "52.0.0" paste = "1.0.14" pin-project = "1.1.5" @@ -108,8 +110,8 @@ uninit = "0.6.2" uuid = "1.8.0" walkdir = "2.5.0" worker = "0.3.0" +xshell = "0.2.6" zigzag = "0.1.0" -mimalloc = "0.1.42" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index ea3baf45b..61cad8f6d 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -15,7 +15,7 @@ rust-version = { workspace = true } workspace = true [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } @@ -51,11 +51,11 @@ vortex-dict = { path = "../encodings/dict" } vortex-dtype = { path = "../vortex-dtype" } vortex-error = { path = "../vortex-error", features = ["parquet"] } vortex-fastlanes = { path = "../encodings/fastlanes" } -vortex-ipc = { path = "../vortex-ipc", features = ["object_store"] } vortex-roaring = { path = "../encodings/roaring" } vortex-runend = { path = "../encodings/runend" } vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } -xshell = "0.2.6" +vortex-serde = { path = "../vortex-serde", features = ["object_store"] } +xshell = { workspace = true } [dev-dependencies] criterion = { workspace = true, features = ["html_reports", "async_tokio"] } diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index d1bdf017b..dc07b745f 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -14,8 +14,8 @@ use vortex::arrow::FromArrowType; use vortex::{IntoArray, ToArrayData}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; -use vortex_ipc::io::TokioAdapter; -use vortex_ipc::writer::ArrayWriter; +use vortex_serde::io::TokioAdapter; +use vortex_serde::writer::ArrayWriter; use crate::idempotent; use crate::reader::BATCH_SIZE; diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index c25c94a48..bd00a4e9e 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -31,11 +31,11 @@ use vortex::{Array, IntoArray, IntoCanonical, ToArrayData}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult}; -use vortex_ipc::chunked_reader::ChunkedArrayReader; -use vortex_ipc::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite}; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; use vortex_sampling_compressor::SamplingCompressor; +use vortex_serde::chunked_reader::ChunkedArrayReader; +use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite}; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; use crate::{COMPRESSORS, CTX}; @@ -153,7 +153,7 @@ pub async fn read_vortex_footer_format( ChunkedArrayReader::try_new( reader, CTX.clone(), - dtype, + dtype.into(), PrimitiveArray::from(footer.byte_offsets).into_array(), PrimitiveArray::from(footer.row_offsets).into_array(), ) diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index e4d67ea1a..348e42413 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use futures::executor::block_on; use vortex_buffer::io_buf::IoBuf; use vortex_error::VortexError; -use vortex_ipc::io::VortexWrite; +use vortex_serde::io::VortexWrite; use crate::data_downloads::{data_vortex_uncompressed, download_data}; use crate::reader::rewrite_parquet_as_vortex; diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 780274344..8893ee0c2 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -26,15 +26,11 @@ impl StructArray { self.len(), )) } -} -impl<'a> StructArray { - pub fn children(&'a self) -> impl Iterator + '_ { + pub fn children(&self) -> impl Iterator + '_ { (0..self.nfields()).map(move |idx| self.field(idx).unwrap()) } -} -impl StructArray { pub fn try_new( names: FieldNames, fields: Vec, @@ -85,11 +81,8 @@ impl StructArray { Self::try_new(FieldNames::from(names), fields, len, Validity::NonNullable) .expect("building StructArray with helper") } -} -impl StructArray { // TODO(aduffy): Add equivalent function to support field masks for nested column access. - /// Return a new StructArray with the given projection applied. /// /// Projection does not copy data arrays. Projection is defined by an ordinal array slice diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index e56dfac77..df946ba10 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -28,13 +28,7 @@ impl ArrayCompute for VarBinArray { impl ScalarAtFn for VarBinArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - Ok(varbin_scalar( - self.bytes_at(index)? - // TODO(ngates): update to use buffer when we refactor scalars. - .into_vec() - .unwrap_or_else(|b| b.as_ref().to_vec()), - self.dtype(), - )) + Ok(varbin_scalar(self.bytes_at(index)?, self.dtype())) } else { Ok(Scalar::null(self.dtype().clone())) } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index f46ed70b4..3a82467ae 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -216,12 +216,11 @@ impl<'a> FromIterator> for VarBinArray { } } -pub fn varbin_scalar(value: Vec, dtype: &DType) -> Scalar { +pub fn varbin_scalar(value: Buffer, dtype: &DType) -> Scalar { if matches!(dtype, DType::Utf8(_)) { - let str = unsafe { String::from_utf8_unchecked(value) }; - Scalar::utf8(str, dtype.nullability()) + Scalar::try_utf8(value, dtype.nullability()).unwrap() } else { - Scalar::binary(value.into(), dtype.nullability()) + Scalar::binary(value, dtype.nullability()) } } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 834b69bfc..157f1990f 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -101,8 +102,8 @@ impl<'a> VarBinAccumulator<'a> { pub fn finish(&self, dtype: &DType) -> StatsSet { StatsSet::from(HashMap::from([ - (Stat::Min, varbin_scalar(self.min.to_vec(), dtype)), - (Stat::Max, varbin_scalar(self.max.to_vec(), dtype)), + (Stat::Min, varbin_scalar(Buffer::from(self.min), dtype)), + (Stat::Max, varbin_scalar(Buffer::from(self.max), dtype)), (Stat::RunCount, self.runs.into()), (Stat::IsSorted, self.is_sorted.into()), (Stat::IsStrictSorted, self.is_strict_sorted.into()), diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index a966a8981..9da43bec8 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -1,3 +1,4 @@ +use vortex_buffer::Buffer; use vortex_error::VortexResult; use vortex_scalar::Scalar; @@ -22,7 +23,7 @@ impl ScalarAtFn for VarBinViewArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { self.bytes_at(index) - .map(|bytes| varbin_scalar(bytes, self.dtype())) + .map(|bytes| varbin_scalar(Buffer::from(bytes), self.dtype())) } else { Ok(Scalar::null(self.dtype().clone())) } diff --git a/vortex-dtype/src/dtype.rs b/vortex-dtype/src/dtype.rs index 4c06835e8..f5add95aa 100644 --- a/vortex-dtype/src/dtype.rs +++ b/vortex-dtype/src/dtype.rs @@ -3,6 +3,7 @@ use std::hash::Hash; use std::sync::Arc; use itertools::Itertools; +use vortex_error::{vortex_bail, VortexResult}; use DType::*; use crate::nullability::Nullability; @@ -87,6 +88,13 @@ impl DType { pub fn is_boolean(&self) -> bool { matches!(self, Bool(_)) } + + pub fn as_struct(&self) -> Option<&StructDType> { + match self { + Struct(s, _) => Some(s), + _ => None, + } + } } impl Display for DType { @@ -147,6 +155,22 @@ impl StructDType { pub fn dtypes(&self) -> &Arc<[DType]> { &self.dtypes } + + pub fn project(&self, indices: &[usize]) -> VortexResult { + let mut names = vec![]; + let mut dtypes = vec![]; + + for &idx in indices.iter() { + if idx > self.names.len() { + vortex_bail!("Projection column is out of bounds"); + } + + names.push(self.names[idx].clone()); + dtypes.push(self.dtypes[idx].clone()); + } + + Ok(StructDType::new(names.into(), dtypes)) + } } #[cfg(test)] diff --git a/vortex-ipc/src/io/read.rs b/vortex-ipc/src/io/read.rs deleted file mode 100644 index ebbf38d23..000000000 --- a/vortex-ipc/src/io/read.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::future::Future; -use std::io; -use std::io::{Cursor, Read}; - -use bytes::BytesMut; -use vortex_buffer::Buffer; -use vortex_error::vortex_err; - -pub trait VortexRead { - fn read_into(&mut self, buffer: BytesMut) -> impl Future>; -} - -pub trait VortexReadAt { - fn read_at_into( - &self, - pos: u64, - buffer: BytesMut, - ) -> impl Future>; - - // TODO(ngates): the read implementation should be able to hint at its latency/throughput - // allowing the caller to make better decisions about how to coalesce reads. - fn performance_hint(&self) -> usize { - 0 - } -} - -impl VortexRead for BytesMut { - async fn read_into(&mut self, buffer: BytesMut) -> io::Result { - if buffer.len() > self.len() { - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - vortex_err!("unexpected eof"), - )) - } else { - Ok(self.split_to(buffer.len())) - } - } -} - -impl VortexRead for Cursor> { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexRead for Cursor<&[u8]> { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexRead for Cursor { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexReadAt for Vec { - fn read_at_into( - &self, - pos: u64, - buffer: BytesMut, - ) -> impl Future> { - VortexReadAt::read_at_into(self.as_slice(), pos, buffer) - } -} - -impl VortexReadAt for [u8] { - async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { - let buffer_len = buffer.len(); - buffer.copy_from_slice(&self[pos as usize..][..buffer_len]); - Ok(buffer) - } -} - -impl VortexReadAt for Buffer { - async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { - let buffer_len = buffer.len(); - buffer.copy_from_slice( - self.slice(pos as usize..pos as usize + buffer_len) - .as_slice(), - ); - Ok(buffer) - } -} diff --git a/vortex-scalar/src/utf8.rs b/vortex-scalar/src/utf8.rs index 1e02f909c..90d3bb624 100644 --- a/vortex-scalar/src/utf8.rs +++ b/vortex-scalar/src/utf8.rs @@ -29,12 +29,22 @@ impl<'a> Utf8Scalar<'a> { impl Scalar { pub fn utf8(str: B, nullability: Nullability) -> Self where - BufferString: From, + B: Into, { - Self { + Self::try_utf8(str, nullability).unwrap() + } + + pub fn try_utf8( + str: B, + nullability: Nullability, + ) -> Result>::Error> + where + B: TryInto, + { + Ok(Self { dtype: DType::Utf8(nullability), - value: ScalarValue::BufferString(BufferString::from(str)), - } + value: ScalarValue::BufferString(str.try_into()?), + }) } } diff --git a/vortex-ipc/Cargo.toml b/vortex-serde/Cargo.toml similarity index 94% rename from vortex-ipc/Cargo.toml rename to vortex-serde/Cargo.toml index 5443b5b1a..fff6e2101 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-serde/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "vortex-ipc" +name = "vortex-serde" version = { workspace = true } -description = "Vortex IPC" +description = "Vortex Serialisation" homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } @@ -14,6 +14,7 @@ rust-version = { workspace = true } [dependencies] bytes = { workspace = true } flatbuffers = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } monoio = { workspace = true, optional = true, features = ["bytes"] } @@ -52,6 +53,7 @@ workspace = true default = ["futures", "monoio", "tokio"] futures = ["futures-util/io"] monoio = ["dep:monoio"] +tokio = ["dep:tokio"] [[bench]] name = "ipc_take" diff --git a/vortex-ipc/README.md b/vortex-serde/README.md similarity index 100% rename from vortex-ipc/README.md rename to vortex-serde/README.md diff --git a/vortex-ipc/benches/ipc_array_reader_take.rs b/vortex-serde/benches/ipc_array_reader_take.rs similarity index 94% rename from vortex-ipc/benches/ipc_array_reader_take.rs rename to vortex-serde/benches/ipc_array_reader_take.rs index 886a26d24..28f6ed9c5 100644 --- a/vortex-ipc/benches/ipc_array_reader_take.rs +++ b/vortex-serde/benches/ipc_array_reader_take.rs @@ -10,9 +10,9 @@ use vortex::array::primitive::PrimitiveArray; use vortex::stream::ArrayStreamExt; use vortex::validity::Validity; use vortex::{Context, IntoArray}; -use vortex_ipc::io::FuturesAdapter; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; +use vortex_serde::io::FuturesAdapter; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; // 100 record batches, 100k rows each // take from the first 20 batches and last batch diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-serde/benches/ipc_take.rs similarity index 96% rename from vortex-ipc/benches/ipc_take.rs rename to vortex-serde/benches/ipc_take.rs index 7c50297b2..2b0cb8cc4 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-serde/benches/ipc_take.rs @@ -15,10 +15,10 @@ use vortex::array::primitive::PrimitiveArray; use vortex::compress::CompressionStrategy; use vortex::compute::take; use vortex::{Context, IntoArray}; -use vortex_ipc::io::FuturesAdapter; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; use vortex_sampling_compressor::SamplingCompressor; +use vortex_serde::io::FuturesAdapter; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; fn ipc_take(c: &mut Criterion) { let mut group = c.benchmark_group("ipc_take"); diff --git a/vortex-ipc/build.rs b/vortex-serde/build.rs similarity index 100% rename from vortex-ipc/build.rs rename to vortex-serde/build.rs diff --git a/vortex-serde/flatbuffers/footer.fbs b/vortex-serde/flatbuffers/footer.fbs new file mode 100644 index 000000000..0a7335653 --- /dev/null +++ b/vortex-serde/flatbuffers/footer.fbs @@ -0,0 +1,26 @@ +namespace vortex.footer; + +table FlatLayout { + begin: uint64; + end: uint64; +} + +table NestedLayout { + children: [Layout]; + encoding: uint16; +} + +union LayoutVariant { + FlatLayout, + NestedLayout, +} + +table Layout { + layout: LayoutVariant; +} + +table Footer { + layout: Layout; +} + +root_type Footer; \ No newline at end of file diff --git a/vortex-ipc/flatbuffers/message.fbs b/vortex-serde/flatbuffers/message.fbs similarity index 96% rename from vortex-ipc/flatbuffers/message.fbs rename to vortex-serde/flatbuffers/message.fbs index cfd694784..130175cce 100644 --- a/vortex-ipc/flatbuffers/message.fbs +++ b/vortex-serde/flatbuffers/message.fbs @@ -1,20 +1,20 @@ include "vortex-array/flatbuffers/array.fbs"; include "vortex-dtype/flatbuffers/dtype.fbs"; -namespace vortex.ipc; +namespace vortex.serde; enum Version: uint8 { V0 = 0, } -table Schema { - dtype: vortex.dtype.DType; -} - enum Compression: uint8 { None = 0, } +table Schema { + dtype: vortex.dtype.DType; +} + struct Buffer { offset: uint64; padding: uint16; diff --git a/vortex-ipc/src/chunked_reader/mod.rs b/vortex-serde/src/chunked_reader/mod.rs similarity index 65% rename from vortex-ipc/src/chunked_reader/mod.rs rename to vortex-serde/src/chunked_reader/mod.rs index 0c987400d..3d845ae25 100644 --- a/vortex-ipc/src/chunked_reader/mod.rs +++ b/vortex-serde/src/chunked_reader/mod.rs @@ -1,10 +1,14 @@ +use std::io::Cursor; use std::sync::Arc; +use vortex::compute::unary::scalar_at; +use vortex::stream::ArrayStream; use vortex::{Array, Context}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use crate::io::VortexReadAt; +use crate::stream_reader::StreamArrayReader; mod take_rows; @@ -12,7 +16,7 @@ mod take_rows; pub struct ChunkedArrayReader { read: R, context: Arc, - dtype: DType, + dtype: Arc, // One row per chunk + 1 row for the end of the last chunk. byte_offsets: Array, @@ -23,7 +27,7 @@ impl ChunkedArrayReader { pub fn try_new( read: R, context: Arc, - dtype: DType, + dtype: Arc, byte_offsets: Array, row_offsets: Array, ) -> VortexResult { @@ -47,4 +51,14 @@ impl ChunkedArrayReader { } Ok(()) } + + pub async fn array_stream(&mut self) -> impl ArrayStream + '_ { + let mut cursor = Cursor::new(&self.read); + cursor.set_position(u64::try_from(&scalar_at(&self.byte_offsets, 0).unwrap()).unwrap()); + StreamArrayReader::try_new(cursor, self.context.clone()) + .await + .unwrap() + .with_dtype(self.dtype.clone()) + .into_array_stream() + } } diff --git a/vortex-ipc/src/chunked_reader/take_rows.rs b/vortex-serde/src/chunked_reader/take_rows.rs similarity index 89% rename from vortex-ipc/src/chunked_reader/take_rows.rs rename to vortex-serde/src/chunked_reader/take_rows.rs index 236f68fcc..40a62b149 100644 --- a/vortex-ipc/src/chunked_reader/take_rows.rs +++ b/vortex-serde/src/chunked_reader/take_rows.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::compute::unary::{subtract_scalar, try_cast}; -use vortex::compute::{search_sorted, slice, take, SearchSortedSide}; +use vortex::compute::{search_sorted, slice, take, SearchResult, SearchSortedSide}; use vortex::stats::ArrayStatistics; use vortex::stream::{ArrayStream, ArrayStreamExt}; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; @@ -94,7 +94,7 @@ impl ChunkedArrayReader { .try_collect() .await?; - Ok(ChunkedArray::try_new(chunks, self.dtype.clone())?.into_array()) + Ok(ChunkedArray::try_new(chunks, (*self.dtype).clone())?.into_array()) } /// Coalesce reads for the given chunks. @@ -167,9 +167,11 @@ fn find_chunks(row_offsets: &Array, indices: &Array) -> VortexResult(); for (pos, idx) in indices.maybe_null_slice::().iter().enumerate() { - let chunk_idx = row_offsets_ref.binary_search(idx).unwrap_or_else(|x| x - 1); + let chunk_idx = match search_sorted(row_offsets.array(), *idx, SearchSortedSide::Left)? { + SearchResult::Found(i) => i, + SearchResult::NotFound(i) => i - 1, + }; chunks .entry(chunk_idx as u32) .and_modify(|chunk_indices: &mut ChunkIndices| { @@ -204,10 +206,11 @@ mod test { use std::io::Cursor; use std::sync::Arc; + use futures_executor::block_on; use itertools::Itertools; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; - use vortex::{Context, IntoArray, IntoCanonical}; + use vortex::{Context, IntoArray, IntoArrayVariant}; use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_error::VortexResult; @@ -216,29 +219,30 @@ mod test { use crate::writer::ArrayWriter; use crate::MessageReader; - async fn chunked_array() -> VortexResult>> { + fn chunked_array() -> VortexResult>> { let c = ChunkedArray::try_new( vec![PrimitiveArray::from((0i32..1000).collect_vec()).into_array(); 10], PType::I32.into(), )? .into_array(); - ArrayWriter::new(vec![]).write_array(c).await + block_on(async { ArrayWriter::new(vec![]).write_array(c).await }) } - #[tokio::test] + #[test] #[cfg_attr(miri, ignore)] - async fn test_take_rows() -> VortexResult<()> { - let writer = chunked_array().await?; + fn test_take_rows() -> VortexResult<()> { + let writer = chunked_array()?; let array_layout = writer.array_layouts()[0].clone(); - let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone()); let byte_offsets = PrimitiveArray::from(array_layout.chunks.byte_offsets.clone()); + let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone()); let buffer = Buffer::from(writer.into_inner()); - let mut msgs = MessageReader::try_new(Cursor::new(buffer.clone())).await?; - let dtype = msgs.read_dtype().await?; + let mut msgs = + block_on(async { MessageReader::try_new(Cursor::new(buffer.clone())).await })?; + let dtype = Arc::new(block_on(async { msgs.read_dtype().await })?); let mut reader = ChunkedArrayReader::try_new( buffer, @@ -249,11 +253,12 @@ mod test { ) .unwrap(); - let result = reader - .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array()) - .await? - .into_canonical()? - .into_primitive()?; + let result = block_on(async { + reader + .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array()) + .await + })? + .into_primitive()?; assert_eq!(result.len(), 3); assert_eq!(result.maybe_null_slice::(), &[0, 10, 999]); diff --git a/vortex-serde/src/file/file_writer.rs b/vortex-serde/src/file/file_writer.rs new file mode 100644 index 000000000..fda2091db --- /dev/null +++ b/vortex-serde/src/file/file_writer.rs @@ -0,0 +1,292 @@ +use std::collections::VecDeque; +use std::mem; + +use flatbuffers::{FlatBufferBuilder, WIPOffset}; +use futures::{Stream, TryStreamExt}; +use itertools::Itertools; +use vortex::array::chunked::ChunkedArray; +use vortex::array::struct_::StructArray; +use vortex::stream::ArrayStream; +use vortex::validity::Validity; +use vortex::{Array, ArrayDType, IntoArray}; +use vortex_buffer::io_buf::IoBuf; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_flatbuffers::WriteFlatBuffer; + +use crate::file::layouts::{ChunkedLayout, FlatLayout, Layout, StructLayout}; +use crate::flatbuffers::footer as fb; +use crate::io::VortexWrite; +use crate::messages::IPCSchema; +use crate::writer::ChunkLayout; +use crate::MessageWriter; + +pub const MAGIC_BYTES: [u8; 4] = *b"VRX1"; + +pub struct FileWriter { + msgs: MessageWriter, + + dtype: Option, + column_chunks: Vec, +} + +pub struct Footer { + layout: Layout, +} + +impl Footer { + pub fn new(layout: Layout) -> Self { + Self { layout } + } +} + +impl WriteFlatBuffer for Footer { + type Target<'a> = fb::Footer<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let layout_offset = self.layout.write_flatbuffer(fbb); + fb::Footer::create( + fbb, + &fb::FooterArgs { + layout: Some(layout_offset), + }, + ) + } +} + +impl FileWriter { + pub fn new(write: W) -> Self { + FileWriter { + msgs: MessageWriter::new(write), + dtype: None, + column_chunks: Vec::new(), + } + } + + pub async fn write_array_columns(self, array: Array) -> VortexResult { + if let Ok(chunked) = ChunkedArray::try_from(&array) { + self.write_array_columns_stream(chunked.array_stream()) + .await + } else { + self.write_array_columns_stream(array.into_array_stream()) + .await + } + } + + pub async fn write_array_columns_stream( + mut self, + mut array_stream: S, + ) -> VortexResult { + match self.dtype { + None => self.dtype = Some(array_stream.dtype().clone()), + Some(ref sd) => { + if sd != array_stream.dtype() { + vortex_bail!( + "Expected all arrays in the stream to have the same dtype {}, found {}", + sd, + array_stream.dtype() + ) + } + } + } + + while let Some(columns) = array_stream.try_next().await? { + let st = StructArray::try_from(&columns)?; + for (i, field) in st.children().enumerate() { + let chunk_pos = if let Ok(chunked_array) = ChunkedArray::try_from(field.clone()) { + self.write_column_chunks(chunked_array.array_stream(), i) + .await? + } else { + self.write_column_chunks(field.into_array_stream(), i) + .await? + }; + + self.merge_chunk_offsets(i, chunk_pos); + } + } + + Ok(self) + } + + async fn write_column_chunks( + &mut self, + mut stream: S, + column_idx: usize, + ) -> VortexResult + where + S: Stream> + Unpin, + { + let column_row_offset = self + .column_chunks + .get(column_idx) + .and_then(|c| c.row_offsets.last()) + .copied() + .unwrap_or(0u64); + let mut byte_offsets = vec![self.msgs.tell()]; + let mut row_offsets = vec![column_row_offset]; + + while let Some(chunk) = stream.try_next().await? { + row_offsets.push( + row_offsets + .last() + .map(|off| off + chunk.len() as u64) + .expect("Row offsets should be initialized with a value"), + ); + self.msgs.write_batch(chunk).await?; + byte_offsets.push(self.msgs.tell()); + } + + Ok(ChunkLayout { + byte_offsets, + row_offsets, + }) + } + + fn merge_chunk_offsets(&mut self, column_idx: usize, chunk_pos: ChunkLayout) { + if let Some(chunk) = self.column_chunks.get_mut(column_idx) { + chunk.byte_offsets.extend(chunk_pos.byte_offsets); + chunk.row_offsets.extend(chunk_pos.row_offsets); + } else { + self.column_chunks.push(chunk_pos); + } + } + + async fn write_metadata_arrays(&mut self) -> VortexResult { + let DType::Struct(..) = self.dtype.as_ref().expect("Should have written values") else { + unreachable!("Values are a structarray") + }; + + let mut column_layouts = VecDeque::with_capacity(self.column_chunks.len()); + + for mut chunk in mem::take(&mut self.column_chunks) { + let mut chunks = VecDeque::new(); + + let len = chunk.byte_offsets.len() - 1; + let byte_counts = chunk + .byte_offsets + .iter() + .skip(1) + .zip(chunk.byte_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + + chunks.extend( + chunk + .byte_offsets + .iter() + .zip(chunk.byte_offsets.iter().skip(1)) + .map(|(begin, end)| Layout::Flat(FlatLayout::new(*begin, *end))), + ); + let row_counts = chunk + .row_offsets + .iter() + .skip(1) + .zip(chunk.row_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + chunk.byte_offsets.truncate(len); + chunk.row_offsets.truncate(len); + + let metadata_array = StructArray::try_new( + [ + "byte_offset".into(), + "byte_count".into(), + "row_offset".into(), + "row_count".into(), + ] + .into(), + vec![ + chunk.byte_offsets.into_array(), + byte_counts.into_array(), + chunk.row_offsets.into_array(), + row_counts.into_array(), + ], + len, + Validity::NonNullable, + )?; + + let metadata_table_begin = self.msgs.tell(); + self.msgs.write_dtype(metadata_array.dtype()).await?; + self.msgs.write_batch(metadata_array.into_array()).await?; + chunks.push_front(Layout::Flat(FlatLayout::new( + metadata_table_begin, + self.msgs.tell(), + ))); + column_layouts.push_back(Layout::Chunked(ChunkedLayout::new(chunks))); + } + + Ok(StructLayout::new(column_layouts)) + } + + async fn write_file_trailer(self, footer: Footer) -> VortexResult { + let schema_offset = self.msgs.tell(); + let mut w = self.msgs.into_inner(); + + let dtype_len = Self::write_flatbuffer( + &mut w, + &IPCSchema(&self.dtype.expect("Needed a schema at this point")), + ) + .await?; + let _ = Self::write_flatbuffer(&mut w, &footer).await?; + + w.write_all(schema_offset.to_le_bytes()).await?; + w.write_all((schema_offset + dtype_len).to_le_bytes()) + .await?; + w.write_all(MAGIC_BYTES).await?; + Ok(w) + } + + // TODO(robert): Remove this once messagewriter/reader can write non length prefixed messages + async fn write_flatbuffer(write: &mut W, fb: &F) -> VortexResult { + let mut fbb = FlatBufferBuilder::new(); + let fb_offset = fb.write_flatbuffer(&mut fbb); + fbb.finish_minimal(fb_offset); + + let (buffer, buffer_begin) = fbb.collapse(); + let buffer_end = buffer.len(); + let sliced_buf = buffer.slice(buffer_begin, buffer_end); + let buf_len = sliced_buf.as_slice().len() as u64; + + write.write_all(sliced_buf).await?; + Ok(buf_len) + } + + pub async fn finalize(mut self) -> VortexResult { + let top_level_layout = self.write_metadata_arrays().await?; + self.write_file_trailer(Footer::new(Layout::Struct(top_level_layout))) + .await + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::array::varbin::VarBinArray; + use vortex::validity::Validity; + use vortex::IntoArray; + + use crate::file::file_writer::FileWriter; + + #[test] + fn write_columns() { + let strings = VarBinArray::from(vec!["ab", "foo", "bar", "baz"]); + let numbers = PrimitiveArray::from(vec![1u32, 2, 3, 4]); + let st = StructArray::try_new( + ["strings".into(), "numbers".into()].into(), + vec![strings.into_array(), numbers.into_array()], + 4, + Validity::NonNullable, + ) + .unwrap(); + let buf = Vec::new(); + let mut writer = FileWriter::new(buf); + writer = block_on(async { writer.write_array_columns(st.into_array()).await }).unwrap(); + let written = block_on(async { writer.finalize().await }).unwrap(); + assert!(!written.is_empty()); + } +} diff --git a/vortex-serde/src/file/footer.rs b/vortex-serde/src/file/footer.rs new file mode 100644 index 000000000..d9222482a --- /dev/null +++ b/vortex-serde/src/file/footer.rs @@ -0,0 +1,45 @@ +use bytes::Bytes; +use flatbuffers::root; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_flatbuffers::ReadFlatBuffer; + +use crate::file::layouts::Layout; +use crate::file::FULL_FOOTER_SIZE; +use crate::messages::IPCDType; + +pub struct Footer { + pub(crate) schema_offset: u64, + /// This is actually layouts + pub(crate) footer_offset: u64, + pub(crate) leftovers: Bytes, + pub(crate) leftovers_offset: u64, +} + +impl Footer { + pub fn leftovers_footer_offset(&self) -> usize { + (self.footer_offset - self.leftovers_offset) as usize + } + + pub fn leftovers_schema_offset(&self) -> usize { + (self.schema_offset - self.leftovers_offset) as usize + } + + pub fn layout(&self) -> VortexResult { + let start_offset = self.leftovers_footer_offset(); + let end_offset = self.leftovers.len() - FULL_FOOTER_SIZE; + let layout_bytes = &self.leftovers[start_offset..end_offset]; + let fb_footer = root::(layout_bytes)?; + let fb_layout = fb_footer.layout().expect("Footer must contain a layout"); + + Layout::try_from(fb_layout) + } + + pub fn dtype(&self) -> VortexResult { + let start_offset = self.leftovers_schema_offset(); + let end_offset = self.leftovers_footer_offset(); + let dtype_bytes = &self.leftovers[start_offset..end_offset]; + + Ok(IPCDType::read_flatbuffer(&root::(dtype_bytes)?)?.0) + } +} diff --git a/vortex-serde/src/file/layouts.rs b/vortex-serde/src/file/layouts.rs new file mode 100644 index 000000000..3f79ea20d --- /dev/null +++ b/vortex-serde/src/file/layouts.rs @@ -0,0 +1,248 @@ +use std::collections::VecDeque; + +use flatbuffers::{FlatBufferBuilder, WIPOffset}; +use vortex_error::{VortexError, VortexResult}; +use vortex_flatbuffers::WriteFlatBuffer; + +use super::reader::projections::Projection; +use crate::flatbuffers::footer as fb; +use crate::writer::ByteRange; + +#[derive(Debug, Clone)] +pub enum Layout { + Chunked(ChunkedLayout), + Struct(StructLayout), + Flat(FlatLayout), +} + +impl Layout { + pub fn as_struct(&self) -> Option<&StructLayout> { + match self { + Self::Struct(l) => Some(l), + _ => None, + } + } + + pub fn as_struct_mut(&mut self) -> Option<&mut StructLayout> { + match self { + Self::Struct(l) => Some(l), + _ => None, + } + } + + pub fn as_flat(&self) -> Option<&FlatLayout> { + match self { + Self::Flat(l) => Some(l), + _ => None, + } + } + + pub fn as_flat_mut(&mut self) -> Option<&mut FlatLayout> { + match self { + Self::Flat(l) => Some(l), + _ => None, + } + } + + pub fn as_chunked(&self) -> Option<&ChunkedLayout> { + match self { + Self::Chunked(l) => Some(l), + _ => None, + } + } + + pub fn as_chunked_mut(&mut self) -> Option<&mut ChunkedLayout> { + match self { + Self::Chunked(l) => Some(l), + _ => None, + } + } +} + +impl WriteFlatBuffer for Layout { + type Target<'a> = fb::Layout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let layout_variant = match self { + Self::Chunked(l) => l.write_flatbuffer(fbb).as_union_value(), + Self::Struct(l) => l.write_flatbuffer(fbb).as_union_value(), + Self::Flat(l) => l.write_flatbuffer(fbb).as_union_value(), + }; + + let mut layout = fb::LayoutBuilder::new(fbb); + layout.add_layout_type(match self { + Self::Chunked(_) => fb::LayoutVariant::NestedLayout, + Self::Struct(_) => fb::LayoutVariant::NestedLayout, + Self::Flat(_) => fb::LayoutVariant::FlatLayout, + }); + layout.add_layout(layout_variant); + layout.finish() + } +} + +#[derive(Debug, Copy, Clone)] +pub struct FlatLayout { + pub(crate) range: ByteRange, +} + +impl WriteFlatBuffer for FlatLayout { + type Target<'a> = fb::FlatLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + fb::FlatLayout::create( + fbb, + &fb::FlatLayoutArgs { + begin: self.range.begin, + end: self.range.end, + }, + ) + } +} + +impl FlatLayout { + pub fn new(begin: u64, end: u64) -> Self { + Self { + range: ByteRange { begin, end }, + } + } +} + +#[derive(Debug, Clone)] +pub struct ChunkedLayout { + pub(crate) children: VecDeque, +} + +impl WriteFlatBuffer for ChunkedLayout { + type Target<'a> = fb::NestedLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let child_offsets = self + .children + .iter() + .map(|c| c.write_flatbuffer(fbb)) + .collect::>(); + let child_vector = fbb.create_vector(&child_offsets); + fb::NestedLayout::create( + fbb, + &fb::NestedLayoutArgs { + children: Some(child_vector), + // TODO(robert): Make this pluggable + encoding: 1u16, + }, + ) + } +} + +impl ChunkedLayout { + pub fn new(child_ranges: VecDeque) -> Self { + Self { + children: child_ranges, + } + } + + pub fn metadata_range(&self) -> &Layout { + &self.children[0] + } +} + +// TODO(robert): Should struct layout store a schema? How do you pick a child by name +#[derive(Debug, Clone)] +pub struct StructLayout { + pub(crate) children: VecDeque, +} + +impl WriteFlatBuffer for StructLayout { + type Target<'a> = fb::NestedLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let child_offsets = self + .children + .iter() + .map(|c| c.write_flatbuffer(fbb)) + .collect::>(); + let child_vector = fbb.create_vector(&child_offsets); + fb::NestedLayout::create( + fbb, + &fb::NestedLayoutArgs { + children: Some(child_vector), + // TODO(robert): Make this pluggable + encoding: 2u16, + }, + ) + } +} + +impl StructLayout { + pub fn new(child_ranges: VecDeque) -> Self { + Self { + children: child_ranges, + } + } + + #[allow(dead_code)] + pub(crate) fn project(&self, projection: &Projection) -> StructLayout { + let mut new_children = VecDeque::with_capacity(projection.indices().len()); + + for &idx in projection.indices() { + new_children.push_back(self.children[idx].clone()); + } + + StructLayout::new(new_children) + } +} + +impl TryFrom> for Layout { + type Error = VortexError; + + fn try_from(value: fb::NestedLayout<'_>) -> Result { + let children = value + .children() + .unwrap() + .iter() + .map(Layout::try_from) + .collect::>>()?; + match value.encoding() { + 1 => Ok(Layout::Chunked(ChunkedLayout::new(children))), + 2 => Ok(Layout::Struct(StructLayout::new(children))), + _ => unreachable!(), + } + } +} + +impl From> for FlatLayout { + fn from(value: fb::FlatLayout<'_>) -> Self { + FlatLayout::new(value.begin(), value.end()) + } +} + +impl TryFrom> for Layout { + type Error = VortexError; + + fn try_from(value: fb::FlatLayout<'_>) -> Result { + Ok(Layout::Flat(value.into())) + } +} + +impl TryFrom> for Layout { + type Error = VortexError; + + fn try_from(value: fb::Layout<'_>) -> Result { + match value.layout_type() { + fb::LayoutVariant::FlatLayout => value.layout_as_flat_layout().unwrap().try_into(), + fb::LayoutVariant::NestedLayout => value.layout_as_nested_layout().unwrap().try_into(), + _ => unreachable!(), + } + } +} diff --git a/vortex-serde/src/file/mod.rs b/vortex-serde/src/file/mod.rs new file mode 100644 index 000000000..fc9c02b58 --- /dev/null +++ b/vortex-serde/src/file/mod.rs @@ -0,0 +1,6 @@ +pub mod file_writer; +mod footer; +mod layouts; +pub mod reader; + +pub const FULL_FOOTER_SIZE: usize = 20; diff --git a/vortex-serde/src/file/reader/filtering.rs b/vortex-serde/src/file/reader/filtering.rs new file mode 100644 index 000000000..d66cbf499 --- /dev/null +++ b/vortex-serde/src/file/reader/filtering.rs @@ -0,0 +1,20 @@ +use vortex::Array; +use vortex_error::VortexResult; + +use super::projections::Projection; + +pub trait FilteringPredicate { + fn projection(&self) -> &Projection; + fn evaluate(&mut self, array: &Array) -> VortexResult; +} + +#[derive(Default)] +pub struct RowFilter { + pub(crate) _filters: Vec>, +} + +impl RowFilter { + pub fn new(filters: Vec>) -> Self { + Self { _filters: filters } + } +} diff --git a/vortex-serde/src/file/reader/mod.rs b/vortex-serde/src/file/reader/mod.rs new file mode 100644 index 000000000..a03225d98 --- /dev/null +++ b/vortex-serde/src/file/reader/mod.rs @@ -0,0 +1,461 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::{Bytes, BytesMut}; +use filtering::RowFilter; +use futures::future::BoxFuture; +use futures::{ready, FutureExt, Stream}; +use projections::Projection; +use schema::Schema; +use vortex::array::constant::ConstantArray; +use vortex::array::struct_::StructArray; +use vortex::compute::unary::subtract_scalar; +use vortex::compute::{and, filter, search_sorted, slice, take, SearchSortedSide}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; +use vortex_dtype::{match_each_integer_ptype, DType, StructDType}; +use vortex_error::{vortex_bail, VortexError, VortexResult}; +use vortex_scalar::Scalar; + +use super::layouts::{Layout, StructLayout}; +use crate::file::file_writer::MAGIC_BYTES; +use crate::file::footer::Footer; +use crate::io::VortexReadAt; +use crate::{ArrayBufferReader, ReadResult}; + +pub mod filtering; +pub mod projections; +pub mod schema; + +pub struct VortexBatchReaderBuilder { + reader: R, + projection: Option, + len: Option, + take_indices: Option, + row_filter: Option, +} + +impl VortexBatchReaderBuilder { + // Recommended read-size according to the AWS performance guide + const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024; + const FOOTER_TRAILER_SIZE: usize = 20; + + pub fn new(reader: R) -> Self { + Self { + reader, + projection: None, + row_filter: None, + len: None, + take_indices: None, + } + } + + pub fn with_length(mut self, len: u64) -> Self { + self.len = Some(len); + self + } + + pub fn with_projection(mut self, projection: Projection) -> Self { + self.projection = Some(projection); + self + } + + pub fn with_take_indices(mut self, array: Array) -> Self { + // TODO(#441): Allow providing boolean masks + assert!( + array.dtype().is_int(), + "Mask arrays have to be integer arrays" + ); + self.take_indices = Some(array); + self + } + + pub fn with_row_filter(mut self, row_filter: RowFilter) -> Self { + self.row_filter = Some(row_filter); + self + } + + pub async fn build(mut self) -> VortexResult> { + let footer = self.read_footer().await?; + + // TODO(adamg): We probably want to unify everything that is going on here into a single type and implementation + let layout = if let Layout::Struct(s) = footer.layout()? { + s + } else { + vortex_bail!("Top level layout must be a 'StructLayout'"); + }; + let dtype = if let DType::Struct(s, _) = footer.dtype()? { + s + } else { + vortex_bail!("Top level dtype must be a 'StructDType'"); + }; + + Ok(VortexBatchStream { + layout, + dtype, + projection: self.projection, + take_indices: self.take_indices, + row_filter: self.row_filter.unwrap_or_default(), + reader: Some(self.reader), + metadata_layouts: None, + state: StreamingState::default(), + context: Default::default(), + current_offset: 0, + }) + } + + async fn len(&self) -> usize { + let len = match self.len { + Some(l) => l, + None => self.reader.size().await, + }; + + len as usize + } + + pub async fn read_footer(&mut self) -> VortexResult