diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 468e7f61bca4..baceacbf307a 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust" homepage = "https://github.com/apache/arrow-rs" repository = "https://github.com/apache/arrow-rs" authors = ["Apache Arrow "] -keywords = [ "arrow", "parquet", "hadoop" ] +keywords = ["arrow", "parquet", "hadoop"] readme = "README.md" build = "build.rs" edition = "2021" @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true } clap = { version = "2.33.3", optional = true } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" +futures = { version = "0.3", optional = true } +tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } [dev-dependencies] criterion = "0.3" @@ -62,16 +64,18 @@ cli = ["serde_json", "base64", "clap"] test_common = [] # Experimental, unstable functionality primarily used for testing experimental = [] +# Experimental, async API +async = ["futures", "tokio"] -[[ bin ]] +[[bin]] name = "parquet-read" required-features = ["cli"] -[[ bin ]] +[[bin]] name = "parquet-schema" required-features = ["cli"] -[[ bin ]] +[[bin]] name = "parquet-rowcount" required-features = ["cli"] diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index aff04273f9f5..ee4f48ac15a8 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -41,7 +41,7 @@ use arrow::datatypes::{ Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema, - Time32MillisecondType as ArrowTime32MillisecondType, + SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, @@ -1237,7 +1237,7 @@ impl ArrayReader for StructArrayReader { /// Create array reader from parquet schema, column indices, and parquet file reader. pub fn build_array_reader( parquet_schema: SchemaDescPtr, - arrow_schema: Schema, + arrow_schema: SchemaRef, column_indices: T, row_groups: Box, ) -> Result> @@ -1277,13 +1277,8 @@ where fields: filtered_root_fields, }; - ArrayReaderBuilder::new( - Arc::new(proj), - Arc::new(arrow_schema), - Arc::new(leaves), - row_groups, - ) - .build_array_reader() + ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups) + .build_array_reader() } /// Used to build array reader. @@ -2774,7 +2769,7 @@ mod tests { let array_reader = build_array_reader( file_reader.metadata().file_metadata().schema_descr_ptr(), - arrow_schema, + Arc::new(arrow_schema), vec![0usize].into_iter(), Box::new(file_reader), ) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 0dccb01c67eb..92212a02452a 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -142,7 +142,7 @@ impl ArrowReader for ParquetFileArrowReader { .metadata() .file_metadata() .schema_descr_ptr(), - self.get_schema()?, + Arc::new(self.get_schema()?), column_indices, Box::new(self.file_reader.clone()), )?; diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs new file mode 100644 index 000000000000..a3edc8e6ae3b --- /dev/null +++ b/parquet/src/arrow/async_reader.rs @@ -0,0 +1,312 @@ +use std::fmt::Formatter; +use std::io::{Cursor, SeekFrom}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use byteorder::{ByteOrder, LittleEndian}; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::Stream; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; + +use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; +use crate::arrow::arrow_reader::ParquetRecordBatchReader; +use crate::arrow::parquet_to_arrow_schema; +use crate::basic::Compression; +use crate::column::page::{PageIterator, PageReader}; +use crate::errors::{ParquetError, Result}; +use crate::file::footer::parse_metadata_buffer; +use crate::file::metadata::ParquetMetaData; +use crate::file::reader::SerializedPageReader; +use crate::file::PARQUET_MAGIC; +use crate::memory::ByteBufferPtr; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; + +/// A [`Stream`] of [`RecordBatch`] for a parquet file +pub struct ParquetRecordBatchStream { + metadata: Arc, + schema: SchemaRef, + + batch_size: usize, + + /// The next row group to read + next_row_group_idx: usize, + + /// This is an option so it can be moved into a future + input: Option, + + state: StreamState, +} + +enum StreamState { + Init, + /// Decoding a batch + Decoding(ParquetRecordBatchReader), + /// Reading data from input + Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), + /// Error + Error, +} + +impl std::fmt::Debug for ParquetRecordBatchStream { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetRecordBatchStream") + .field("metadata", &self.metadata) + .field("schema", &self.schema) + .finish() + } +} + +impl ParquetRecordBatchStream { + /// Create a new [`ParquetRecordBatchStream`] from the provided [`File`] + pub async fn new(mut input: T, batch_size: usize) -> Result { + let metadata = Arc::new(read_footer(&mut input).await?); + + let schema = Arc::new(parquet_to_arrow_schema( + metadata.file_metadata().schema_descr(), + metadata.file_metadata().key_value_metadata(), + )?); + + Ok(Self { + input: Some(input), + metadata, + schema, + batch_size, + next_row_group_idx: 0, + state: StreamState::Init, + }) + } + + /// Returns the [`SchemaRef`] for this parquet file + pub fn schema(&self) -> &SchemaRef { + &self.schema + } +} + +impl Stream + for ParquetRecordBatchStream +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &mut self.state { + StreamState::Decoding(batch_reader) => match batch_reader.next() { + Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), + Some(Err(e)) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(ParquetError::ArrowError( + e.to_string(), + )))); + } + None => self.state = StreamState::Init, + }, + StreamState::Init => { + if self.next_row_group_idx == self.metadata.num_row_groups() { + return Poll::Ready(None); + } + + let row_group_idx = self.next_row_group_idx; + self.next_row_group_idx += 1; + + let metadata = self.metadata.clone(); + let mut input = match self.input.take() { + Some(input) => input, + None => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(general_err!( + "input stream lost" + )))); + } + }; + + self.state = StreamState::Reading( + async move { + let row_group_metadata = metadata.row_group(row_group_idx); + let mut column_chunks = + Vec::with_capacity(row_group_metadata.num_columns()); + + for column in row_group_metadata.columns() { + let (start, length) = column.byte_range(); + let end = start + length; + + input.seek(SeekFrom::Start(start)).await?; + + let mut buffer = vec![0_u8; (end - start) as usize]; + input.read_exact(buffer.as_mut_slice()).await?; + + column_chunks.push(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data: ByteBufferPtr::new(buffer), + }) + } + + Ok(( + input, + InMemoryRowGroup { + schema: metadata.file_metadata().schema_descr_ptr(), + column_chunks, + }, + )) + } + .boxed(), + ) + } + StreamState::Reading(f) => { + let result = futures::ready!(f.poll_unpin(cx)); + self.state = StreamState::Init; + + let row_group: Box = match result { + Ok((input, row_group)) => { + self.input = Some(input); + Box::new(row_group) + } + Err(e) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }; + + let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); + let column_indices = 0..parquet_schema.num_columns(); + + let array_reader = build_array_reader( + parquet_schema, + self.schema.clone(), + column_indices, + row_group, + )?; + + let batch_reader = + ParquetRecordBatchReader::try_new(self.batch_size, array_reader) + .expect("reader"); + + self.state = StreamState::Decoding(batch_reader) + } + StreamState::Error => return Poll::Pending, + } + } + } +} + +async fn read_footer( + input: &mut T, +) -> Result { + input.seek(SeekFrom::End(-8)).await?; + + let mut buf = [0_u8; 8]; + input.read_exact(&mut buf).await?; + + if &buf[4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; + if metadata_len < 0 { + return Err(general_err!( + "Invalid Parquet file. Metadata length is less than zero ({})", + metadata_len + )); + } + + input.seek(SeekFrom::End(-8 - metadata_len)).await?; + + let mut buf = Vec::with_capacity(metadata_len as usize + 8); + input.read_to_end(&mut buf).await?; + + parse_metadata_buffer(&mut Cursor::new(buf)) +} + +struct InMemoryRowGroup { + schema: SchemaDescPtr, + column_chunks: Vec, +} + +impl RowGroupCollection for InMemoryRowGroup { + fn schema(&self) -> Result { + Ok(self.schema.clone()) + } + + fn column_chunks(&self, i: usize) -> Result> { + let page_reader = self.column_chunks[i].pages(); + + Ok(Box::new(ColumnChunkIterator { + schema: self.schema.clone(), + column_schema: self.schema.columns()[i].clone(), + reader: Some(page_reader), + })) + } +} + +struct ColumnChunkMetadata {} + +struct InMemoryColumnChunk { + num_values: i64, + compression: Compression, + physical_type: crate::basic::Type, + data: ByteBufferPtr, +} + +impl InMemoryColumnChunk { + fn pages(&self) -> Result> { + let page_reader = SerializedPageReader::new( + Cursor::new(self.data.clone()), + self.num_values, + self.compression, + self.physical_type, + )?; + + Ok(Box::new(page_reader)) + } +} + +struct ColumnChunkIterator { + schema: SchemaDescPtr, + column_schema: ColumnDescPtr, + reader: Option>>, +} + +impl Iterator for ColumnChunkIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.reader.take() + } +} + +impl PageIterator for ColumnChunkIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.column_schema.clone()) + } +} + +#[cfg(test)] +mod tests { + use futures::TryStreamExt; + use tokio::fs::File; + + use super::*; + + #[tokio::test] + async fn test_parquet_stream() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/nested_structs.rust.parquet", testdata); + let file = File::open(path).await.unwrap(); + + let stream = ParquetRecordBatchStream::new(file, 1024).await.unwrap(); + let results = stream.try_collect::>().await.unwrap(); + println!("{:?}", results); + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 57ad5b1d4292..97bf58c46c9c 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,6 +122,10 @@ experimental_mod!(array_reader); experimental_mod!(arrow_array_reader); pub mod arrow_reader; pub mod arrow_writer; + +#[cfg(feature = "async")] +pub mod async_reader; + experimental_mod!(converter); pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 2e572944868b..ae2e9d4f822a 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -78,7 +78,6 @@ pub fn parse_metadata(chunk_reader: &R) -> Result; if footer_metadata_len > file_size as usize { return Err(general_err!( "Invalid Parquet file. Metadata start is less than zero ({})", @@ -87,16 +86,21 @@ pub fn parse_metadata(chunk_reader: &R) -> Result( + metadata_read: &mut T, +) -> Result { // TODO: row group filtering let mut prot = TCompactInputProtocol::new(metadata_read); let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 47d8258694d7..abd6ac62af13 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -104,7 +104,7 @@ pub mod statistics; pub mod writer; const FOOTER_SIZE: usize = 8; -const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +pub(crate) const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; /// The number of bytes read at the end of the parquet file on first read const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024;