Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made example of parallel parquet write be over multiple batches #544

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 64 additions & 30 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
/// Example demonstrating how to write to parquet in parallel.
//! Example demonstrating how to write to parquet in parallel.
use std::collections::VecDeque;
use std::sync::Arc;

use rayon::prelude::*;

use arrow2::{
array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*,
array::*,
datatypes::PhysicalType,
error::{ArrowError, Result},
io::parquet::write::*,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}

impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}

impl FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = ArrowError;

fn advance(&mut self) -> Result<()> {
self.current = self.columns.pop_front();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}

fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: Compression::Snappy,
version: Version::V2,
};
let encodings = batch.schema().fields().par_iter().map(|field| {
let encodings = batches[0].schema().fields().par_iter().map(|field| {
match field.data_type().to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
Expand All @@ -26,30 +58,32 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
}
});

let parquet_schema = to_parquet_schema(batch.schema())?;

// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().to_vec().into_par_iter())
.zip(encodings)
.map(|((array, descriptor), encoding)| {
// create encoded and compressed pages this column
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<Vec<CompressedPage>>>>()?;
let parquet_schema = to_parquet_schema(batches[0].schema())?;

let a = parquet_schema.clone();
let row_groups = batches.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
.columns()
.par_iter()
.zip(a.columns().to_vec().into_par_iter())
.zip(encodings.clone())
.map(|((array, descriptor), encoding)| {
// create encoded and compressed pages this column
let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?;
encoded_pages
.map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into()))
.collect::<Result<VecDeque<_>>>()
})
.collect::<Result<Vec<VecDeque<CompressedPage>>>>()?;

// create the iterator over groups (one in this case)
// (for more batches, create the iterator from them here)
let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| {
Ok(DynStreamingIterator::new(
fallible_streaming_iterator::convert(column.iter().map(Ok)),
))
}))));
let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Ok(row_group)
});

// Create a new empty file
let mut file = std::fs::File::create(path)?;
Expand All @@ -58,7 +92,7 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> {
let _file_size = write_file(
&mut file,
row_groups,
batch.schema(),
batches[0].schema(),
parquet_schema,
options,
None,
Expand Down Expand Up @@ -88,7 +122,7 @@ fn create_batch(size: usize) -> Result<RecordBatch> {
}

fn main() -> Result<()> {
let batch = create_batch(10_000_000)?;
let batch = create_batch(5_000_000)?;

parallel_write("example.parquet", &batch)
parallel_write("example.parquet", &[batch.clone(), batch])
}