Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified infering arrow schema from a parquet schema (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 6, 2022
1 parent 74f65d7 commit 4fbbd90
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 297 deletions.
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::get_schema(&metadata)?;
let schema = read::infer_schema(&metadata)?;

// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};
fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
let mut file = BufReader::new(File::open(path)?);
let metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&metadata)?;
let schema = read::infer_schema(&metadata)?;

// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
Expand Down
48 changes: 48 additions & 0 deletions guide/src/io/parquet_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,51 @@ by delegating all CPU-intensive tasks to separate threads.
This can of course be reversed; in configurations where IO is bounded (e.g. when a
network is involved), we can use multiple producers of pages, potentially divided
in file readers, and a single consumer that performs all CPU-intensive work.

## Apache Arrow <-> Apache Parquet

Arrow and Parquet are two different formats that declare different physical and logical types.
When reading Parquet, we must _infer_ to which types we are reading the data to.
This inference is based on Parquet's physical, logical and converted types.

When a logical type is defined, we use it as follows:

| `Parquet` | `Parquet logical` | `DataType` |
| ----------------- | ----------------- | ------------- |
| Int32 | Int8 | Int8 |
| Int32 | Int16 | Int16 |
| Int32 | Int32 | Int32 |
| Int32 | UInt8 | UInt8 |
| Int32 | UInt16 | UInt16 |
| Int32 | UInt32 | UInt32 |
| Int32 | Decimal | Decimal |
| Int32 | Date | Date32 |
| Int32 | Time(ms) | Time32(ms) |
| Int64 | Int64 | Int64 |
| Int64 | UInt64 | UInt64 |
| Int64 | Time(us) | Time64(us) |
| Int64 | Time(ns) | Time64(ns) |
| Int64 | Timestamp(\_) | Timestamp(\_) |
| Int64 | Decimal | Decimal |
| ByteArray | Utf8 | Utf8 |
| ByteArray | JSON | Binary |
| ByteArray | BSON | Binary |
| ByteArray | ENUM | Binary |
| ByteArray | Decimal | Decimal |
| FixedLenByteArray | Decimal | Decimal |

When a a logical type is not defined but a converted type is defined, we use
the equivalent convertion as above, mutatis mutandis.

When neither is defined, we fall back to the physical representation:

| `Parquet` | `DataType` |
| ----------------- | --------------- |
| Boolean | Boolean |
| Int32 | Int32 |
| Int64 | Int64 |
| Int96 | Timestamp(ns) |
| Float | Float32 |
| Double | Float64 |
| ByteArray | Binary |
| FixedLenByteArray | FixedSizeBinary |
4 changes: 2 additions & 2 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
error::{ArrowError, Result},
};

use super::{get_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};
use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;

Expand Down Expand Up @@ -47,7 +47,7 @@ impl<R: Read + Seek> FileReader<R> {
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

let schema = get_schema(&metadata)?;
let schema = infer_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod utils;
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
pub(crate) use schema::is_type_nullable;
pub use schema::{get_schema, FileMetaData};
pub use schema::{infer_schema, FileMetaData};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
use deserialize::page_iter_to_arrays;
Expand Down
Loading

0 comments on commit 4fbbd90

Please sign in to comment.