Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added example showing parallel writes to parquet. (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Sep 23, 2021
1 parent 93c56d7 commit 688e979
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/parquet_write_parallel/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.parquet
8 changes: 8 additions & 0 deletions examples/parquet_write_parallel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "parquet_write_parallel"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rayon = { version = "1", default-features = false }
93 changes: 93 additions & 0 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/// Example demonstrating how to write to parquet in parallel.
use std::sync::Arc;

use rayon::prelude::*;

use arrow2::{
array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Snappy,
version: Version::V2,
};
let encodings = batch.schema().fields().par_iter().map(|field| {
match field.data_type().to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
| PhysicalType::LargeBinary
| PhysicalType::Utf8
| PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// remaining is plain
_ => Encoding::Plain,
}
});

let parquet_schema = to_parquet_schema(batch.schema())?;

// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
let array = array.clone();

// create encoded and compressed pages this column
Ok(array_to_pages(array, descriptor, options, encoding)?.collect::<Vec<_>>())
})
.collect::<Result<Vec<_>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynIter::new(column.into_iter()))),
)));

// Create a new empty file
let mut file = std::fs::File::create(path)?;

// Write the file.
let _file_size = write_file(
&mut file,
row_groups,
batch.schema(),
parquet_schema,
options,
None,
)?;

Ok(())
}

fn create_batch(size: usize) -> Result<RecordBatch> {
let c1: Int32Array = (0..size)
.map(|x| if x % 9 == 0 { None } else { Some(x as i32) })
.collect();
let c2: Utf8Array<i32> = (0..size)
.map(|x| {
if x % 8 == 0 {
None
} else {
Some(x.to_string())
}
})
.collect();

RecordBatch::try_from_iter([
("c1", Arc::new(c1) as Arc<dyn Array>),
("c2", Arc::new(c2) as Arc<dyn Array>),
])
}

fn main() -> Result<()> {
let batch = create_batch(10_000_000)?;

parallel_write("example.parquet", &batch)
}
16 changes: 16 additions & 0 deletions guide/src/io/parquet_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ First, some notation:
* `column chunk`: composed of multiple pages (similar of an `Array`)
* `row group`: a group of columns with the same length (similar of a `RecordBatch` in Arrow)

## Single threaded

Here is an example of how to write a single column chunk into a single row group:

```rust
Expand All @@ -23,3 +25,17 @@ assumes that a `RecordBatch` is mapped to a single row group with a single page
```rust
{{#include ../../../examples/parquet_write_record.rs}}
```

## Multi-threaded writing

As user of this crate, you will need to decide how you would like to parallelize,
and whether order is important. Below you can find an example where we
use [`rayon`](https://crates.io/crates/rayon) to perform the heavy lift of
encoding and compression.
This operation is [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel)
and results in a speed up equal to minimum between the number of cores
and number of columns in the record.

```rust
{{#include ../../../examples/parquet_write_parallel/src/main.rs}}
```

0 comments on commit 688e979

Please sign in to comment.