Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Naive interleaved filtering and data reading #918

Merged
merged 7 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions vortex-dtype/src/serde/flatbuffers/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ pub fn resolve_field_references<'a, 'b: 'a>(
}

/// Deserialize flatbuffer schema selecting only columns defined by projection
pub fn deserialize_and_project(fb: fb::DType<'_>, projection: &[Field]) -> VortexResult<DType> {
let fb_struct = fb
pub fn deserialize_and_project(
fb_dtype: fb::DType<'_>,
projection: &[Field],
) -> VortexResult<DType> {
let fb_struct = fb_dtype
.type__as_struct_()
.ok_or_else(|| vortex_err!("The top-level type should be a struct"))?;
let nullability = fb_struct.nullable().into();
Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use vortex::array::StructArray;
use vortex::{Array, IntoArray};
use vortex_error::{vortex_err, VortexResult};

use crate::layouts::read::{Layout, ReadResult};
use crate::layouts::read::{LayoutReader, ReadResult};

#[derive(Debug)]
pub struct BatchReader {
names: Arc<[Arc<str>]>,
children: Vec<Box<dyn Layout>>,
children: Vec<Box<dyn LayoutReader>>,
arrays: Vec<Option<Array>>,
}

impl BatchReader {
pub fn new(names: Arc<[Arc<str>]>, children: Vec<Box<dyn Layout>>) -> Self {
pub fn new(names: Arc<[Arc<str>]>, children: Vec<Box<dyn LayoutReader>>) -> Self {
let arrays = vec![None; children.len()];
Self {
names,
Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use vortex::compute::slice;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_error::VortexResult;

use crate::layouts::read::{Layout, ReadResult};
use crate::layouts::read::{LayoutReader, ReadResult};

#[derive(Debug)]
pub struct BufferedReader {
layouts: VecDeque<Box<dyn Layout>>,
layouts: VecDeque<Box<dyn LayoutReader>>,
arrays: VecDeque<Array>,
batch_size: usize,
}

impl BufferedReader {
pub fn new(layouts: VecDeque<Box<dyn Layout>>, batch_size: usize) -> Self {
pub fn new(layouts: VecDeque<Box<dyn LayoutReader>>, batch_size: usize) -> Self {
Self {
layouts,
arrays: Default::default(),
Expand Down
77 changes: 38 additions & 39 deletions vortex-serde/src/layouts/read/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::sync::{Arc, RwLock};

use ahash::HashSet;
use bytes::BytesMut;
use vortex::{Array, ArrayDType};
use vortex_dtype::field::Field;
use vortex_error::{vortex_bail, VortexResult};
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
use vortex_schema::projection::Projection;
use vortex_schema::Schema;

use crate::io::VortexReadAt;
use crate::layouts::read::cache::{LayoutMessageCache, RelativeLayoutCache};
Expand Down Expand Up @@ -74,65 +71,67 @@ impl<R: VortexReadAt> LayoutReaderBuilder<R> {
let footer = self.read_footer().await?;
let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

// TODO(robert): Don't leak filter references into read projection
let (read_projection, result_projection) = if let Some(filter_columns) = self
let filter_projection = self
.row_filter
.as_ref()
.map(|f| f.references())
.filter(|refs| !refs.is_empty())
// This is necessary to have globally addressed columns in the relative cache,
// there is probably a better of doing that, but this works for now and the API isn't very externally-useful.
.map(|refs| footer.resolve_references(&refs.into_iter().collect::<Vec<_>>()))
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
.transpose()?
{
match self.projection.unwrap_or_default() {
Projection::All => (Projection::All, Projection::All),
Projection::Flat(mut v) => {
let original_len = v.len();
let existing_fields: HashSet<Field> = v.iter().cloned().collect();
v.extend(
filter_columns
.into_iter()
.filter(|f| !existing_fields.contains(f)),
);
(
Projection::Flat(v),
Projection::Flat((0..original_len).map(Field::from).collect()),
)
}
}
} else {
(self.projection.unwrap_or_default(), Projection::All)
};
.map(Projection::from);

let read_projection = self.projection.unwrap_or_default();

let projected_dtype = match &read_projection {
let projected_dtype = match read_projection {
Projection::All => footer.dtype()?,
Projection::Flat(projection) => footer.projected_dtype(projection)?,
Projection::Flat(ref projection) => footer.projected_dtype(projection)?,
};

let filter = self.row_filter.map(|f| {
let schema = Schema::new(projected_dtype.clone());
f.reorder(&schema)
});
let eval_dtype = filter_projection
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
.as_ref()
.map(|p| match p {
Projection::All => footer.dtype(),
Projection::Flat(fields) => footer.projected_dtype(fields),
})
.transpose()?;

let scan = Scan {
filter,
filter: self.row_filter.clone(),
batch_size,
projection: read_projection,
indices: self.indices,
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layouts_cache =
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone());

let layout = footer.layout(scan.clone(), layouts_cache)?;
let data_reader = footer.layout(
scan.clone(),
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone()),
)?;
let filter_reader = eval_dtype
.map(|dtype| {
footer.layout(
Scan {
filter: self.row_filter,
batch_size,
projection: filter_projection.vortex_expect(
"If we have eval dtype, we must also have a filter and projection",
),
indices: None,
},
RelativeLayoutCache::new(message_cache.clone(), dtype),
)
})
.transpose()?;

Ok(LayoutBatchStream::new(
self.reader,
layout,
data_reader,
filter_reader,
message_cache,
projected_dtype,
scan,
result_projection,
))
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-serde/src/layouts/read/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_flatbuffers::footer::LayoutVariant;

use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::layouts::{ChunkedLayoutSpec, ColumnLayoutSpec, FlatLayout};
use crate::layouts::read::{Layout, Scan};
use crate::layouts::read::{LayoutReader, Scan};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct LayoutId(pub u16);
Expand All @@ -25,7 +25,7 @@ pub trait LayoutSpec: Debug + Send + Sync {
scan: Scan,
layout_reader: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout>;
) -> Box<dyn LayoutReader>;
}

pub type LayoutSpecRef = &'static dyn LayoutSpec;
Expand Down Expand Up @@ -73,7 +73,7 @@ impl LayoutDeserializer {
fb_loc: usize,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let fb_layout = unsafe {
let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
fb::Layout::init_from_table(tab)
Expand Down
24 changes: 19 additions & 5 deletions vortex-serde/src/layouts/read/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::sync::Arc;

use vortex::array::BoolArray;
use vortex::compute::and;
use vortex::{Array, IntoArray};
use vortex::stats::ArrayStatistics;
use vortex::validity::Validity;
use vortex::{Array, IntoArray, IntoArrayVariant};
use vortex_dtype::field::{Field, FieldPath};
use vortex_error::VortexResult;
use vortex_expr::{expr_is_filter, split_conjunction, VortexExpr};
Expand All @@ -18,8 +20,8 @@ pub struct RowFilter {
}

impl RowFilter {
pub fn new(filter: Arc<dyn VortexExpr>) -> Self {
let conjunction = split_conjunction(&filter)
pub fn new(expr: Arc<dyn VortexExpr>) -> Self {
let conjunction = split_conjunction(&expr)
.into_iter()
.filter(expr_is_filter)
.collect();
Expand All @@ -33,6 +35,19 @@ impl RowFilter {
for expr in self.conjunction.iter() {
let new_mask = expr.evaluate(target)?;
mask = and(new_mask, mask)?;

if mask
.clone()
.into_bool()?
.statistics()
.compute_true_count()
.unwrap_or_default()
== 0
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
{
return Ok(
BoolArray::from_vec(vec![false; target.len()], Validity::AllValid).into_array(),
);
}
}

Ok(mask)
Expand All @@ -42,8 +57,7 @@ impl RowFilter {
pub fn references(&self) -> HashSet<Field> {
let mut set = HashSet::new();
for expr in self.conjunction.iter() {
let references = expr.references();
set.extend(references.iter().cloned());
set.extend(expr.references().iter().cloned());
}

set
Expand Down
8 changes: 4 additions & 4 deletions vortex-serde/src/layouts/read/footer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use bytes::Bytes;
use flatbuffers::root;
use vortex_dtype::field::Field;
use vortex_dtype::flatbuffers::{deserialize_and_project, resolve_field_references};
use vortex_dtype::flatbuffers::deserialize_and_project;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::{message as fb, ReadFlatBuffer};
use vortex_schema::Schema;

use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::context::LayoutDeserializer;
use crate::layouts::read::{Layout, Scan, FILE_POSTSCRIPT_SIZE};
use crate::layouts::read::{LayoutReader, Scan, FILE_POSTSCRIPT_SIZE};
use crate::messages::IPCDType;
use crate::FLATBUFFER_SIZE_LENGTH;

Expand Down Expand Up @@ -57,7 +57,7 @@ impl Footer {
&self,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let start_offset = self.leftovers_layout_offset();
let end_offset = self.leftovers.len() - FILE_POSTSCRIPT_SIZE;
let footer_bytes = self
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Footer {
let fb_struct = dtype
.type__as_struct_()
.ok_or_else(|| vortex_err!("The top-level type should be a struct"))?;
resolve_field_references(fb_struct, projection)
vortex_dtype::flatbuffers::resolve_field_references(fb_struct, projection)
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
.map(|idx| idx.map(Field::from))
.collect::<VortexResult<Vec<_>>>()
}
Expand Down
14 changes: 7 additions & 7 deletions vortex-serde/src/layouts/read/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::layouts::read::batch::BatchReader;
use crate::layouts::read::buffered::BufferedReader;
use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::context::{LayoutDeserializer, LayoutId, LayoutSpec};
use crate::layouts::read::{Layout, ReadResult, Scan};
use crate::layouts::read::{LayoutReader, ReadResult, Scan};
use crate::stream_writer::ByteRange;
use crate::ArrayBufferReader;

Expand Down Expand Up @@ -44,7 +44,7 @@ impl FlatLayout {
}
}

impl Layout for FlatLayout {
impl LayoutReader for FlatLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match self.state {
FlatLayoutState::Init => {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl LayoutSpec for ColumnLayoutSpec {
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout> {
) -> Box<dyn LayoutReader> {
Box::new(ColumnLayout::new(
fb_bytes,
fb_loc,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl ColumnLayout {
idx: usize,
children: Vector<ForwardsUOffset<fb::Layout>>,
dtype: DType,
) -> VortexResult<Box<dyn Layout>> {
) -> VortexResult<Box<dyn LayoutReader>> {
let layout = children.get(idx);

// TODO: Figure out complex nested schema projections
Expand All @@ -176,7 +176,7 @@ impl ColumnLayout {
}
}

impl Layout for ColumnLayout {
impl LayoutReader for ColumnLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ColumnLayoutState::Init => {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl LayoutSpec for ChunkedLayoutSpec {
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn Layout> {
) -> Box<dyn LayoutReader> {
Box::new(ChunkedLayout::new(
fb_bytes,
fb_loc,
Expand Down Expand Up @@ -296,7 +296,7 @@ impl ChunkedLayout {
}
}

impl Layout for ChunkedLayout {
impl LayoutReader for ChunkedLayout {
fn read_next(&mut self) -> VortexResult<Option<ReadResult>> {
match &mut self.state {
ChunkedLayoutState::Init => {
Expand Down
2 changes: 1 addition & 1 deletion vortex-serde/src/layouts/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ReadResult {
Batch(Array),
}

pub trait Layout: Debug + Send {
pub trait LayoutReader: Debug + Send {
/// Reads the data from the underlying layout
///
/// The layout can either return a batch data, i.e. an Array or ask for more layout messages to
Expand Down
Loading
Loading