From 76c96eda9f12fcd1a4b6e3afa2cbd9989b36ef4f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 4 Dec 2021 09:02:07 +0000 Subject: [PATCH] Added example to read parquet in parallel with rayon. --- examples/parquet_read_parallel.rs | 6 +- examples/parquet_read_parallel/Cargo.toml | 8 +++ examples/parquet_read_parallel/src/main.rs | 82 ++++++++++++++++++++++ examples/parquet_read_record.rs | 5 +- 4 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 examples/parquet_read_parallel/Cargo.toml create mode 100644 examples/parquet_read_parallel/src/main.rs diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index 38f180d4739..fd474201c19 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -107,9 +107,9 @@ fn main() -> Result<()> { let args: Vec = env::args().collect(); let file_path = &args[1]; + let start = SystemTime::now(); let batch = parallel_read(file_path, 0)?; - for array in batch.columns() { - println!("{}", array) - } + assert!(batch.num_rows() > 0); + println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) } diff --git a/examples/parquet_read_parallel/Cargo.toml b/examples/parquet_read_parallel/Cargo.toml new file mode 100644 index 00000000000..89eca0435d3 --- /dev/null +++ b/examples/parquet_read_parallel/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "parquet_write_parallel" +version = "0.1.0" +edition = "2018" + +[dependencies] +arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] } +rayon = { version = "1", default-features = false } diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs new file mode 100644 index 00000000000..aa12fcd5fb9 --- /dev/null +++ b/examples/parquet_read_parallel/src/main.rs @@ -0,0 +1,82 @@ +//! Example demonstrating how to read from parquet in parallel using rayon +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; +use std::time::SystemTime; + +use rayon::prelude::*; + +use arrow2::{ + error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, + record_batch::RecordBatch, +}; + +fn parallel_read(path: &str, row_group: usize) -> Result { + let mut file = BufReader::new(File::open(path)?); + let file_metadata = read::read_metadata(&mut file)?; + let arrow_schema = Arc::new(read::get_schema(&file_metadata)?); + + // IO-bounded + let columns = file_metadata + .schema() + .fields() + .iter() + .enumerate() + .map(|(field_i, field)| { + let start = SystemTime::now(); + println!("read start - field: {}", field_i); + let mut columns = read::get_column_iterator( + &mut file, + &file_metadata, + row_group, + field_i, + None, + vec![], + ); + + let mut column_chunks = vec![]; + while let read::State::Some(mut new_iter) = columns.advance().unwrap() { + if let Some((pages, metadata)) = new_iter.get() { + let pages = pages.collect::>(); + + column_chunks.push((pages, metadata.clone())); + } + columns = new_iter; + } + println!( + "read end - {:?}: {} {}", + start.elapsed().unwrap(), + field_i, + row_group + ); + (field_i, field.clone(), column_chunks) + }) + .collect::>(); + + // CPU-bounded + let columns = columns + .into_par_iter() + .map(|(field_i, parquet_field, column_chunks)| { + let columns = read::ReadColumnIterator::new(parquet_field, column_chunks); + let field = &arrow_schema.fields()[field_i]; + + read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into()) + }) + .collect::>>()?; + + RecordBatch::try_new(arrow_schema, columns) +} + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + let file_path = &args[1]; + let row_group = args[2].parse::().unwrap(); + + let start = SystemTime::now(); + let batch = parallel_read(file_path, row_group)?; + assert!(batch.num_rows() > 0); + println!("took: {} ms", start.elapsed().unwrap().as_millis()); + + Ok(()) +} diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs index 9c5467ad791..6f9e1db4d6f 100644 --- a/examples/parquet_read_record.rs +++ b/examples/parquet_read_record.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::time::SystemTime; use arrow2::error::Result; use arrow2::io::parquet::read; @@ -12,9 +13,11 @@ fn main() -> Result<()> { let reader = File::open(file_path)?; let reader = read::RecordReader::try_new(reader, None, None, None, None)?; + let start = SystemTime::now(); for maybe_batch in reader { let batch = maybe_batch?; - println!("{:?}", batch); + assert!(batch.num_rows() > 0); } + println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) }