Skip to content

Commit

Permalink
Async parquet reader (apache#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 11, 2022
1 parent b724fa8 commit 2767084
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 19 deletions.
12 changes: 8 additions & 4 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <[email protected]>"]
keywords = [ "arrow", "parquet", "hadoop" ]
keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
Expand All @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
futures = { version = "0.3", optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }

[dev-dependencies]
criterion = "0.3"
Expand All @@ -62,16 +64,18 @@ cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
# Experimental, async API
async = ["futures", "tokio"]

[[ bin ]]
[[bin]]
name = "parquet-read"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-schema"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]

Expand Down
15 changes: 5 additions & 10 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
Expand Down Expand Up @@ -1237,7 +1237,7 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
arrow_schema: SchemaRef,
column_indices: T,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
Expand Down Expand Up @@ -1277,13 +1277,8 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(
Arc::new(proj),
Arc::new(arrow_schema),
Arc::new(leaves),
row_groups,
)
.build_array_reader()
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
.build_array_reader()
}

/// Used to build array reader.
Expand Down Expand Up @@ -2774,7 +2769,7 @@ mod tests {

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
Box::new(file_reader),
)
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
Arc::new(self.get_schema()?),
column_indices,
Box::new(self.file_reader.clone()),
)?;
Expand Down
Loading

0 comments on commit 2767084

Please sign in to comment.