Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added example to read parquet in parallel with rayon #658

Merged
merged 1 commit into from
Dec 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let start = SystemTime::now();
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
assert!(batch.num_rows() > 0);
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
8 changes: 8 additions & 0 deletions examples/parquet_read_parallel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "parquet_write_parallel"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rayon = { version = "1", default-features = false }
82 changes: 82 additions & 0 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Example demonstrating how to read from parquet in parallel using rayon
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use std::time::SystemTime;

use rayon::prelude::*;

use arrow2::{
error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
let mut file = BufReader::new(File::open(path)?);
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

// IO-bounded
let columns = file_metadata
.schema()
.fields()
.iter()
.enumerate()
.map(|(field_i, field)| {
let start = SystemTime::now();
println!("read start - field: {}", field_i);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
println!(
"read end - {:?}: {} {}",
start.elapsed().unwrap(),
field_i,
row_group
);
(field_i, field.clone(), column_chunks)
})
.collect::<Vec<_>>();

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|(field_i, parquet_field, column_chunks)| {
let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
let field = &arrow_schema.fields()[field_i];

read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into())
})
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];
let row_group = args[2].parse::<usize>().unwrap();

let start = SystemTime::now();
let batch = parallel_read(file_path, row_group)?;
assert!(batch.num_rows() > 0);
println!("took: {} ms", start.elapsed().unwrap().as_millis());

Ok(())
}
5 changes: 4 additions & 1 deletion examples/parquet_read_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::io::parquet::read;
Expand All @@ -12,9 +13,11 @@ fn main() -> Result<()> {
let reader = File::open(file_path)?;
let reader = read::RecordReader::try_new(reader, None, None, None, None)?;

let start = SystemTime::now();
for maybe_batch in reader {
let batch = maybe_batch?;
println!("{:?}", batch);
assert!(batch.num_rows() > 0);
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}