diff --git a/examples/parquet_write_parallel/.gitignore b/examples/parquet_write_parallel/.gitignore new file mode 100644 index 00000000000..4bed5da93fb --- /dev/null +++ b/examples/parquet_write_parallel/.gitignore @@ -0,0 +1 @@ +*.parquet diff --git a/examples/parquet_write_parallel/Cargo.toml b/examples/parquet_write_parallel/Cargo.toml new file mode 100644 index 00000000000..89eca0435d3 --- /dev/null +++ b/examples/parquet_write_parallel/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "parquet_write_parallel" +version = "0.1.0" +edition = "2018" + +[dependencies] +arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] } +rayon = { version = "1", default-features = false } diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs new file mode 100644 index 00000000000..204de4e835e --- /dev/null +++ b/examples/parquet_write_parallel/src/main.rs @@ -0,0 +1,93 @@ +/// Example demonstrating how to write to parquet in parallel. +use std::sync::Arc; + +use rayon::prelude::*; + +use arrow2::{ + array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*, + record_batch::RecordBatch, +}; + +fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { + let options = WriteOptions { + write_statistics: true, + compression: Compression::Snappy, + version: Version::V2, + }; + let encodings = batch.schema().fields().par_iter().map(|field| { + match field.data_type().to_physical_type() { + // let's be fancy and use delta-encoding for binary fields + PhysicalType::Binary + | PhysicalType::LargeBinary + | PhysicalType::Utf8 + | PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray, + // remaining is plain + _ => Encoding::Plain, + } + }); + + let parquet_schema = to_parquet_schema(batch.schema())?; + + // write batch to pages; parallelized by rayon + let columns = batch + .columns() + .par_iter() + .zip(parquet_schema.columns().to_vec().into_par_iter()) + .zip(encodings) + .map(|((array, descriptor), encoding)| { + let array = array.clone(); + + // create encoded and compressed pages this column + Ok(array_to_pages(array, descriptor, options, encoding)?.collect::>()) + }) + .collect::>>()?; + + // create the iterator over groups (one in this case) + // (for more batches, create the iterator from them here) + let row_groups = std::iter::once(Result::Ok(DynIter::new( + columns + .into_iter() + .map(|column| Ok(DynIter::new(column.into_iter()))), + ))); + + // Create a new empty file + let mut file = std::fs::File::create(path)?; + + // Write the file. + let _file_size = write_file( + &mut file, + row_groups, + batch.schema(), + parquet_schema, + options, + None, + )?; + + Ok(()) +} + +fn create_batch(size: usize) -> Result { + let c1: Int32Array = (0..size) + .map(|x| if x % 9 == 0 { None } else { Some(x as i32) }) + .collect(); + let c2: Utf8Array = (0..size) + .map(|x| { + if x % 8 == 0 { + None + } else { + Some(x.to_string()) + } + }) + .collect(); + + RecordBatch::try_from_iter([ + ("c1", Arc::new(c1) as Arc), + ("c2", Arc::new(c2) as Arc), + ]) +} + +fn main() -> Result<()> { + let batch = create_batch(10_000_000)?; + + parallel_write("example.parquet", &batch) +} diff --git a/guide/src/io/parquet_write.md b/guide/src/io/parquet_write.md index 08e27ff3d5a..4e9cf75036e 100644 --- a/guide/src/io/parquet_write.md +++ b/guide/src/io/parquet_write.md @@ -11,6 +11,8 @@ First, some notation: * `column chunk`: composed of multiple pages (similar of an `Array`) * `row group`: a group of columns with the same length (similar of a `RecordBatch` in Arrow) +## Single threaded + Here is an example of how to write a single column chunk into a single row group: ```rust @@ -23,3 +25,17 @@ assumes that a `RecordBatch` is mapped to a single row group with a single page ```rust {{#include ../../../examples/parquet_write_record.rs}} ``` + +## Multi-threaded writing + +As user of this crate, you will need to decide how you would like to parallelize, +and whether order is important. Below you can find an example where we +use [`rayon`](https://crates.io/crates/rayon) to perform the heavy lift of +encoding and compression. +This operation is [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel) +and results in a speed up equal to minimum between the number of cores +and number of columns in the record. + +```rust +{{#include ../../../examples/parquet_write_parallel/src/main.rs}} +```