diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 415da272c59..feef774decd 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -1,20 +1,52 @@ -/// Example demonstrating how to write to parquet in parallel. +//! Example demonstrating how to write to parquet in parallel. +use std::collections::VecDeque; use std::sync::Arc; use rayon::prelude::*; use arrow2::{ - array::*, datatypes::PhysicalType, error::Result, io::parquet::write::*, + array::*, + datatypes::PhysicalType, + error::{ArrowError, Result}, + io::parquet::write::*, record_batch::RecordBatch, }; -fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { +struct Bla { + columns: VecDeque, + current: Option, +} + +impl Bla { + pub fn new(columns: VecDeque) -> Self { + Self { + columns, + current: None, + } + } +} + +impl FallibleStreamingIterator for Bla { + type Item = CompressedPage; + type Error = ArrowError; + + fn advance(&mut self) -> Result<()> { + self.current = self.columns.pop_front(); + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + self.current.as_ref() + } +} + +fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> { let options = WriteOptions { write_statistics: true, compression: Compression::Snappy, version: Version::V2, }; - let encodings = batch.schema().fields().par_iter().map(|field| { + let encodings = batches[0].schema().fields().par_iter().map(|field| { match field.data_type().to_physical_type() { // let's be fancy and use delta-encoding for binary fields PhysicalType::Binary @@ -26,30 +58,32 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { } }); - let parquet_schema = to_parquet_schema(batch.schema())?; - - // write batch to pages; parallelized by rayon - let columns = batch - .columns() - .par_iter() - .zip(parquet_schema.columns().to_vec().into_par_iter()) - .zip(encodings) - .map(|((array, descriptor), encoding)| { - // create encoded and compressed pages this column - let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?; - encoded_pages - .map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into())) - .collect::>>() - }) - .collect::>>>()?; + let parquet_schema = to_parquet_schema(batches[0].schema())?; + + let a = parquet_schema.clone(); + let row_groups = batches.iter().map(|batch| { + // write batch to pages; parallelized by rayon + let columns = batch + .columns() + .par_iter() + .zip(a.columns().to_vec().into_par_iter()) + .zip(encodings.clone()) + .map(|((array, descriptor), encoding)| { + // create encoded and compressed pages this column + let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?; + encoded_pages + .map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into())) + .collect::>>() + }) + .collect::>>>()?; - // create the iterator over groups (one in this case) - // (for more batches, create the iterator from them here) - let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| { - Ok(DynStreamingIterator::new( - fallible_streaming_iterator::convert(column.iter().map(Ok)), - )) - })))); + let row_group = DynIter::new( + columns + .into_iter() + .map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))), + ); + Ok(row_group) + }); // Create a new empty file let mut file = std::fs::File::create(path)?; @@ -58,7 +92,7 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { let _file_size = write_file( &mut file, row_groups, - batch.schema(), + batches[0].schema(), parquet_schema, options, None, @@ -88,7 +122,7 @@ fn create_batch(size: usize) -> Result { } fn main() -> Result<()> { - let batch = create_batch(10_000_000)?; + let batch = create_batch(5_000_000)?; - parallel_write("example.parquet", &batch) + parallel_write("example.parquet", &[batch.clone(), batch]) }