Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made GroupFilter optional in parquet'sRecordReader and added method to set it. #386

Merged
merged 2 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::Read;
use std::sync::Arc;
use std::{fs, io::Cursor, path::PathBuf};

use criterion::{criterion_group, criterion_main, Criterion};
Expand All @@ -20,8 +19,7 @@ fn to_buffer(size: usize) -> Vec<u8> {
fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);

let reader =
read::RecordReader::try_new(file, Some(vec![column]), None, Arc::new(|_, _| true), None)?;
let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_read_record.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::parquet::read;
Expand All @@ -11,7 +10,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
let reader = read::RecordReader::try_new(reader, None, None, None, None)?;

for maybe_batch in reader {
let batch = maybe_batch?;
Expand Down
16 changes: 11 additions & 5 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct RecordReader<R: Read + Seek> {
indices: Rc<Vec<usize>>,
buffer: Vec<u8>,
decompress_buffer: Vec<u8>,
groups_filter: GroupFilter,
groups_filter: Option<GroupFilter>,
pages_filter: Option<PageFilter>,
metadata: Rc<FileMetaData>,
current_group: usize,
Expand All @@ -36,7 +36,7 @@ impl<R: Read + Seek> RecordReader<R> {
mut reader: R,
projection: Option<Vec<usize>>,
limit: Option<usize>,
groups_filter: GroupFilter,
groups_filter: Option<GroupFilter>,
pages_filter: Option<PageFilter>,
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;
Expand Down Expand Up @@ -89,6 +89,10 @@ impl<R: Read + Seek> RecordReader<R> {
pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}

pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.groups_filter = Some(groups_filter);
}
}

impl<R: Read + Seek> Iterator for RecordReader<R> {
Expand All @@ -109,9 +113,11 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
let row_group = self.current_group;
let metadata = self.metadata.clone();
let group = &metadata.row_groups[row_group];
if !(self.groups_filter)(row_group, group) {
self.current_group += 1;
return self.next();
if let Some(groups_filter) = self.groups_filter.as_ref() {
if !(groups_filter)(row_group, group) {
self.current_group += 1;
return self.next();
}
}
let columns_meta = group.columns();

Expand Down
10 changes: 2 additions & 8 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@ pub fn read_column<R: Read + Seek>(
) -> Result<ArrayStats> {
let metadata = read_metadata(&mut reader)?;

let mut reader = RecordReader::try_new(
reader,
Some(vec![column]),
None,
Arc::new(|_, _| true),
None,
)?;
let mut reader = RecordReader::try_new(reader, Some(vec![column]), None, None, None)?;

let statistics = metadata.row_groups[row_group]
.column(column)
Expand Down Expand Up @@ -432,7 +426,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result<Vec<u8>

fn integration_read(data: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
let reader = Cursor::new(data);
let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
let reader = RecordReader::try_new(reader, None, None, None, None)?;
let schema = reader.schema().clone();
let batches = reader.collect::<Result<Vec<_>>>()?;

Expand Down
3 changes: 1 addition & 2 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::*;
use arrow2::error::Result;
Expand Down Expand Up @@ -223,7 +222,7 @@ fn all_types() -> Result<()> {
let path = "testing/parquet-testing/data/alltypes_plain.parquet";
let reader = std::fs::File::open(path)?;

let reader = RecordReader::try_new(reader, None, None, Arc::new(|_, _| true), None)?;
let reader = RecordReader::try_new(reader, None, None, None, None)?;

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 1);
Expand Down