Skip to content

Commit

Permalink
Improve module documentation for parquet crate (#1253)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Feb 2, 2022
1 parent f055fb0 commit 3ddb156
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 73 deletions.
115 changes: 58 additions & 57 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,64 @@
// specific language governing permissions and limitations
// under the License.

//! Contains asynchronous APIs for reading parquet files into
//! arrow [`RecordBatch`]
//! Provides `async` API for reading parquet files as
//! [`RecordBatch`]es
//!
//! ```
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
//! #
//! use arrow::record_batch::RecordBatch;
//! use arrow::util::pretty::pretty_format_batches;
//! use futures::TryStreamExt;
//! use tokio::fs::File;
//!
//! use parquet::arrow::ParquetRecordBatchStreamBuilder;
//!
//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
//! # assert_eq!(
//! # &actual_lines, expected_lines,
//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
//! # expected_lines, actual_lines
//! # );
//! # }
//!
//! let testdata = arrow::util::test_util::parquet_test_data();
//! let path = format!("{}/alltypes_plain.parquet", testdata);
//! let file = tokio::fs::File::open(path).await.unwrap();
//!
//! let builder = ParquetRecordBatchStreamBuilder::new(file)
//! .await
//! .unwrap()
//! .with_projection(vec![1, 2, 6])
//! .with_batch_size(3);
//!
//! let stream = builder.build().unwrap();
//!
//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
//! assert_eq!(results.len(), 3);
//!
//! assert_batches_eq(
//! &results,
//! &[
//! "+----------+-------------+-----------+",
//! "| bool_col | tinyint_col | float_col |",
//! "+----------+-------------+-----------+",
//! "| true | 0 | 0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0 |",
//! "| false | 1 | 1.1 |",
//! "| true | 0 | 0 |",
//! "| false | 1 | 1.1 |",
//! "+----------+-------------+-----------+",
//! ],
//! );
//! # }
//! ```
use std::collections::VecDeque;
use std::fmt::Formatter;
Expand Down Expand Up @@ -425,58 +481,3 @@ impl PageIterator for ColumnChunkIterator {
Ok(self.column_schema.clone())
}
}

#[cfg(test)]
mod tests {
use arrow::util::pretty::pretty_format_batches;
use futures::TryStreamExt;
use tokio::fs::File;

use super::*;

fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
let formatted = pretty_format_batches(batches).unwrap().to_string();
let actual_lines: Vec<_> = formatted.trim().lines().collect();
assert_eq!(
&actual_lines, expected_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}

#[tokio::test]
async fn test_parquet_stream() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(path).await.unwrap();

let builder = ParquetRecordBatchStreamBuilder::new(file)
.await
.unwrap()
.with_projection(vec![1, 2, 6])
.with_batch_size(3);

let stream = builder.build().unwrap();

let results = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(results.len(), 3);

assert_batches_eq(
&results,
&[
"+----------+-------------+-----------+",
"| bool_col | tinyint_col | float_col |",
"+----------+-------------+-----------+",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"+----------+-------------+-----------+",
],
);
}
}
31 changes: 15 additions & 16 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,39 @@
// specific language governing permissions and limitations
// under the License.

//! Provides API for reading/writing Arrow
//! [RecordBatch](arrow::record_batch::RecordBatch)es and
//! [Array](arrow::array::Arrays) to/from Parquet Files.
//!
//! [Apache Arrow](http://arrow.apache.org/) is a cross-language development platform for
//! in-memory data.
//!
//! This mod provides API for converting between arrow and parquet.
//!
//!# Example of writing Arrow record batch to Parquet file
//!
//!```rust
//! use arrow::array::Int32Array;
//! use arrow::datatypes::{DataType, Field, Schema};
//! use arrow::array::{Int32Array, ArrayRef};
//! use arrow::record_batch::RecordBatch;
//! use parquet::arrow::arrow_writer::ArrowWriter;
//! use parquet::file::properties::WriterProperties;
//! use std::fs::File;
//! use std::sync::Arc;
//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
//! let schema = Arc::new(Schema::new(vec![
//! Field::new("id", DataType::Int32, false),
//! Field::new("val", DataType::Int32, false),
//! ]));
//! let batch = RecordBatch::try_from_iter(vec![
//! ("id", Arc::new(ids) as ArrayRef),
//! ("val", Arc::new(vals) as ArrayRef),
//! ]).unwrap();
//!
//! let file = File::create("data.parquet").unwrap();
//!
//! let batch =
//! RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(ids), Arc::new(vals)]).unwrap();
//! let batches = vec![batch];
//!
//! // Default writer properties
//! let props = WriterProperties::builder().build();
//!
//! let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props)).unwrap();
//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
//!
//! for batch in batches {
//! writer.write(&batch).expect("Writing batch");
//! }
//! writer.write(&batch).expect("Writing batch");
//!
//! // writer must be closed to write footer
//! writer.close().unwrap();
//! ```
//!
Expand Down Expand Up @@ -134,6 +131,8 @@ experimental_mod!(schema);
pub use self::arrow_reader::ArrowReader;
pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@
// specific language governing permissions and limitations
// under the License.

//! This crate contains the official Native Rust implementation of
//! [Apache Parquet](https://parquet.apache.org/), part of
//! the [Apache Arrow](https://arrow.apache.org/) project.
//!
//! # Getting Started
//! Start with some examples:
//!
//! 1. [mod@file] for reading and writing parquet files using the
//! [ColumnReader](column::reader::ColumnReader) API.
//!
//! 2. [arrow] for reading and writing parquet files to Arrow
//! `RecordBatch`es
//!
//! 3. [arrow::async_reader] for `async` reading and writing parquet
//! files to Arrow `RecordBatch`es (requires the `async` feature).
#![allow(incomplete_features)]
#![allow(dead_code)]
#![allow(non_camel_case_types)]
Expand Down

0 comments on commit 3ddb156

Please sign in to comment.