Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ArrayIterator and ArrayStream #327

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bzip2 = { workspace = true }
csv = { workspace = true }
enum-iterator = { workspace = true }
fs_extra = { workspace = true }
futures-util = { workspace = true }
humansize = { workspace = true }
itertools = { workspace = true }
lance = { version = "0.10.16", features = [] }
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::Compressor;
use vortex::compute::take::take;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, ToArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_ipc::array_stream::ArrayStreamExt;
use vortex_ipc::io::TokioVortexRead;
use vortex_ipc::writer::StreamWriter;
use vortex_ipc::MessageReader;
Expand All @@ -44,9 +44,10 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect()
.into_chunked()
.await
})
.map(|a| a.into_array())
}

pub fn rewrite_parquet_as_vortex<W: Write>(
Expand Down
3 changes: 3 additions & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ arrow-schema = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
futures-util = { workspace = true }
humansize = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
num_enum = { workspace = true }
paste = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
vortex-buffer = { path = "../vortex-buffer" }
vortex-dtype = { path = "../vortex-dtype", features = ["flatbuffers", "serde"] }
Expand All @@ -49,6 +51,7 @@ build-vortex = { path = "../build-vortex" }

[dev-dependencies]
criterion = { workspace = true }
tokio = { workspace = true }

[[bench]]
name = "search_sorted"
Expand Down
9 changes: 6 additions & 3 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::array::primitive::PrimitiveArray;
use crate::compute::scalar_at::scalar_at;
use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn};
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
use crate::validity::Validity::NonNullable;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
Expand Down Expand Up @@ -85,12 +86,14 @@ impl ChunkedArray {
let index_in_chunk = index - chunk_start;
(index_chunk, index_in_chunk)
}
}

impl<'a> ChunkedArray {
pub fn chunks(&'a self) -> impl Iterator<Item = Array> + '_ {
pub fn chunks(&self) -> impl Iterator<Item = Array> + '_ {
(0..self.nchunks()).map(|c| self.chunk(c).unwrap())
}

pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().map(Ok))
}
}

impl FromIterator<Array> for ChunkedArray {
Expand Down
36 changes: 36 additions & 0 deletions vortex-array/src/iter/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::iter::ArrayIterator;
use crate::Array;

pub struct ArrayIteratorAdapter<I> {
dtype: DType,
inner: I,
}

impl<I> ArrayIteratorAdapter<I> {
pub fn new(dtype: DType, inner: I) -> Self {
Self { dtype, inner }
}
}

impl<I> Iterator for ArrayIteratorAdapter<I>
where
I: Iterator<Item = VortexResult<Array>>,
{
type Item = VortexResult<Array>;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}

impl<I> ArrayIterator for ArrayIteratorAdapter<I>
where
I: Iterator<Item = VortexResult<Array>>,
{
fn dtype(&self) -> &DType {
&self.dtype
}
}
23 changes: 23 additions & 0 deletions vortex-array/src/iter/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use itertools::Itertools;
use vortex_error::VortexResult;

use crate::array::chunked::ChunkedArray;
use crate::iter::ArrayIterator;
use crate::stream::{ArrayStream, ArrayStreamAdapter};

pub trait ArrayIteratorExt: ArrayIterator {
fn into_stream(self) -> impl ArrayStream
where
Self: Sized,
{
ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
}

fn try_into_chunked(self) -> VortexResult<ChunkedArray>
where
Self: Sized,
{
let dtype = self.dtype().clone();
ChunkedArray::try_new(self.try_collect()?, dtype)
}
}
14 changes: 14 additions & 0 deletions vortex-array/src/iter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
mod adapter;
mod ext;
pub use adapter::*;
pub use ext::*;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::Array;

/// A stream of array chunks along with a DType.
/// Analogous to Arrow's RecordBatchReader.
pub trait ArrayIterator: Iterator<Item = VortexResult<Array>> {
fn dtype(&self) -> &DType;
}
2 changes: 2 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ mod data;
pub mod encoding;
mod flatten;
mod implementation;
pub mod iter;
mod metadata;
mod sampling;
pub mod stats;
pub mod stream;
mod tree;
mod typed;
pub mod validity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::task::Poll;

use futures_util::Stream;
use pin_project::pin_project;
use vortex::Array;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::array_stream::ArrayStream;
use crate::stream::ArrayStream;
use crate::Array;

/// An adapter for a stream of array chunks to implement an ArrayReader.
#[pin_project]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
use std::future::Future;

use futures_util::TryFutureExt;
use futures_util::TryStreamExt;
use vortex::array::chunked::ChunkedArray;
use vortex::{Array, IntoArray};
use vortex_error::VortexResult;

use crate::array_stream::take_rows::TakeRows;
use crate::array_stream::ArrayStream;
use crate::array_stream::ArrayStreamAdapter;
use crate::array::chunked::ChunkedArray;
use crate::stream::take_rows::TakeRows;
use crate::stream::ArrayStream;
use crate::stream::ArrayStreamAdapter;
use crate::Array;

pub trait ArrayStreamExt: ArrayStream {
fn collect(self) -> impl Future<Output = VortexResult<Array>>
where
Self: Sized,
{
self.collect_chunked()
.map_ok(|chunked| chunked.into_array())
}

fn collect_chunked(self) -> impl Future<Output = VortexResult<ChunkedArray>>
fn into_chunked(self) -> impl Future<Output = VortexResult<ChunkedArray>>
where
Self: Sized,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ pub use adapter::*;
pub use ext::*;
use futures_util::Stream;
pub use take_rows::*;
use vortex::Array;
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::Array;

mod adapter;
mod ext;
mod take_rows;
Expand Down
105 changes: 105 additions & 0 deletions vortex-array/src/stream/take_rows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::{ready, Stream};
use pin_project::pin_project;
use vortex_dtype::match_each_integer_ptype;
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::Scalar;

use crate::compute::scalar_subtract::subtract_scalar;
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::compute::slice::slice;
use crate::compute::take::take;
use crate::stats::{ArrayStatistics, Stat};
use crate::stream::ArrayStream;
use crate::IntoArray;
use crate::{Array, ArrayDType};

#[pin_project]
pub struct TakeRows<'idx, R: ArrayStream> {
#[pin]
reader: R,
indices: &'idx Array,
row_offset: usize,
}

impl<'idx, R: ArrayStream> TakeRows<'idx, R> {
#[allow(dead_code)]
pub fn try_new(reader: R, indices: &'idx Array) -> VortexResult<Self> {
if !indices.is_empty() {
if !indices.statistics().compute_is_sorted().unwrap_or(false) {
vortex_bail!("Indices must be sorted to take from IPC stream")
}

if indices
.statistics()
.compute_null_count()
.map(|nc| nc > 0)
.unwrap_or(true)
{
vortex_bail!("Indices must not contain nulls")
}

if !indices.dtype().is_int() {
vortex_bail!("Indices must be integers")
}

if indices.dtype().is_signed_int()
&& indices
.statistics()
.compute_as_cast::<i64>(Stat::Min)
.map(|min| min < 0)
.unwrap_or(true)
{
vortex_bail!("Indices must be positive")
}
}

Ok(Self {
reader,
indices,
row_offset: 0,
})
}
}

impl<'idx, R: ArrayStream> Stream for TakeRows<'idx, R> {
type Item = VortexResult<Array>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if this.indices.is_empty() {
return Poll::Ready(None);
}

while let Some(batch) = ready!(this.reader.as_mut().poll_next(cx)?) {
let curr_offset = *this.row_offset;
let left = search_sorted::<usize>(this.indices, curr_offset, SearchSortedSide::Left)?
.to_index();
let right = search_sorted::<usize>(
this.indices,
curr_offset + batch.len(),
SearchSortedSide::Left,
)?
.to_index();

*this.row_offset += batch.len();

if left == right {
continue;
}

// TODO(ngates): this is probably too heavy to run on the event loop. We should spawn
// onto a worker pool.
let indices_for_batch = slice(this.indices, left, right)?.flatten_primitive()?;
let shifted_arr = match_each_integer_ptype!(indices_for_batch.ptype(), |$T| {
subtract_scalar(&indices_for_batch.into_array(), &Scalar::from(curr_offset as $T))?
});
return Poll::Ready(take(&batch, &shifted_arr).map(Some).transpose());
}

Poll::Ready(None)
}
}
2 changes: 1 addition & 1 deletion vortex-ipc/benches/ipc_array_reader_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures_util::{pin_mut, TryStreamExt};
use itertools::Itertools;
use vortex::array::primitive::PrimitiveArray;
use vortex::stream::ArrayStreamExt;
use vortex::{Context, IntoArray};
use vortex_dtype::Nullability;
use vortex_dtype::{DType, PType};
use vortex_ipc::array_stream::ArrayStreamExt;
use vortex_ipc::io::FuturesVortexRead;
use vortex_ipc::writer::StreamWriter;
use vortex_ipc::MessageReader;
Expand Down
Loading