Skip to content

Commit

Permalink
Add identity projection to the file reader (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Aug 1, 2024
1 parent bf5fe0a commit c153bd3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 32 deletions.
15 changes: 10 additions & 5 deletions vortex-serde/src/file/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,18 @@ impl StructLayout {

#[allow(dead_code)]
pub(crate) fn project(&self, projection: &Projection) -> StructLayout {
let mut new_children = VecDeque::with_capacity(projection.indices().len());
match projection {
Projection::All => self.clone(),
Projection::Partial(indices) => {
let mut new_children = VecDeque::with_capacity(indices.len());

for &idx in projection.indices() {
new_children.push_back(self.children[idx].clone());
}
for &idx in indices.iter() {
new_children.push_back(self.children[idx].clone());
}

StructLayout::new(new_children)
StructLayout::new(new_children)
}
}
}
}

Expand Down
29 changes: 14 additions & 15 deletions vortex-serde/src/file/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod projections;
pub mod schema;

const DEFAULT_BATCH_SIZE: usize = 65536;
const DEFAULT_PROJECTION: Projection = Projection::All;

pub struct VortexBatchReaderBuilder<R> {
reader: R,
Expand Down Expand Up @@ -102,14 +103,15 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
};

let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
let projection = self.projection.unwrap_or(DEFAULT_PROJECTION);

VortexBatchStream::try_new(
self.reader,
layout,
dtype,
self.row_filter.unwrap_or_default(),
batch_size,
self.projection,
projection,
self.take_indices,
)
}
Expand Down Expand Up @@ -170,8 +172,7 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {

pub struct VortexBatchStream<R> {
dtype: StructDType,
// TODO(robert): Have identity projection
projection: Option<Projection>,
projection: Projection,
take_indices: Option<Array>,
row_filter: RowFilter,
batch_reader: Option<BatchReader<R>>,
Expand All @@ -190,7 +191,7 @@ impl<R: VortexReadAt> VortexBatchStream<R> {
dtype: StructDType,
row_filter: RowFilter,
batch_size: usize,
projection: Option<Projection>,
projection: Projection,
take_indices: Option<Array>,
) -> VortexResult<Self> {
let schema = Schema(dtype.clone());
Expand Down Expand Up @@ -297,17 +298,15 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexBatchStream<R> {
}

batch = filter(&batch, &current_predicate)?;
let projected = self
.projection
.as_ref()
.map(|p| {
StructArray::try_from(batch.clone())
.unwrap()
.project(p.indices())
.unwrap()
.into_array()
})
.unwrap_or(batch);

let projected = match &self.projection {
Projection::All => batch,
Projection::Partial(indices) => StructArray::try_from(batch.clone())
.unwrap()
.project(indices.as_ref())
.unwrap()
.into_array(),
};

return Poll::Ready(Some(Ok(projected)));
}
Expand Down
18 changes: 7 additions & 11 deletions vortex-serde/src/file/reader/projections.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
#[derive(Clone)]
pub struct Projection {
indices: Vec<usize>,
#[derive(Clone, Default)]
pub enum Projection {
#[default]
All,
Partial(Vec<usize>),
}

impl Projection {
pub fn new(indices: impl AsRef<[usize]>) -> Self {
Projection {
indices: Vec::from(indices.as_ref()),
}
}

pub fn indices(&self) -> &[usize] {
self.indices.as_ref()
Self::Partial(Vec::from(indices.as_ref()))
}
}

impl From<Vec<usize>> for Projection {
fn from(indices: Vec<usize>) -> Self {
Self { indices }
Self::Partial(indices)
}
}
5 changes: 4 additions & 1 deletion vortex-serde/src/file/reader/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ impl Schema {
}

pub fn project(&self, projection: Projection) -> VortexResult<Self> {
self.0.project(projection.indices()).map(Self)
match projection {
Projection::All => Ok(self.clone()),
Projection::Partial(indicies) => self.0.project(indicies.as_ref()).map(Self),
}
}
}

0 comments on commit c153bd3

Please sign in to comment.