Skip to content

Commit

Permalink
Add option to skip decoding arrow metadata from parquet (apache#1459)
Browse files Browse the repository at this point in the history
Fix inference from null logical type (apache#1557)

Replace some `&Option<T>` with `Option<&T>` (apache#1556)
  • Loading branch information
tustvold committed Apr 13, 2022
1 parent c9549bb commit 019cd07
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 32 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,7 @@ mod tests {
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();

let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), &None).unwrap();
let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();

let file = tempfile::tempfile().unwrap();
let props = WriterProperties::builder()
Expand Down
167 changes: 156 additions & 11 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;

/// Arrow reader api.
Expand Down Expand Up @@ -78,19 +78,40 @@ pub trait ArrowReader {
T: IntoIterator<Item = usize>;
}

#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
}

impl ArrowReaderOptions {
/// Create a new [`ArrowReaderOptions`] with the default settings
fn new() -> Self {
Self::default()
}

/// Parquet files generated by some writers may contain embedded arrow
/// schema and metadata. This may not be correct or compatible with your system
///
/// Set `skip_arrow_metadata` to true, to skip decoding this
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
}
}
}

pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,

options: ArrowReaderOptions,
}

impl ArrowReader for ParquetFileArrowReader {
type RecordReader = ParquetRecordBatchReader;

fn get_schema(&mut self) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
}

fn get_schema_by_columns<T>(
Expand All @@ -106,13 +127,13 @@ impl ArrowReader for ParquetFileArrowReader {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
self.get_kv_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
self.get_kv_metadata(),
)
}
}
Expand Down Expand Up @@ -154,14 +175,41 @@ impl ArrowReader for ParquetFileArrowReader {
}

impl ParquetFileArrowReader {
/// Create a new [`ParquetFileArrowReader`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self { file_reader }
Self {
file_reader,
options: Default::default(),
}
}

/// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`]
pub fn new_with_options(
file_reader: Arc<dyn FileReader>,
options: ArrowReaderOptions,
) -> Self {
Self {
file_reader,
options,
}
}

// Expose the reader metadata
/// Expose the reader metadata
pub fn get_metadata(&mut self) -> ParquetMetaData {
self.file_reader.metadata().clone()
}

/// Returns the key value metadata, returns `None` if [`ArrowReaderOptions::skip_arrow_metadata`]
fn get_kv_metadata(&self) -> Option<&Vec<KeyValue>> {
if self.options.skip_arrow_metadata {
return None;
}

self.file_reader
.metadata()
.file_metadata()
.key_value_metadata()
}
}

pub struct ParquetRecordBatchReader {
Expand Down Expand Up @@ -245,18 +293,22 @@ mod tests {
use rand::{thread_rng, RngCore};
use serde_json::json;
use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
use tempfile::tempfile;

use arrow::array::*;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};

use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
};
use crate::arrow::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
};
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::ArrowWriter;
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
Expand Down Expand Up @@ -1238,4 +1290,97 @@ mod tests {
let val = list.value(0);
assert_eq!(val.len(), 0);
}

#[test]
fn test_null_schema_inference() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/null_list.parquet", testdata);
let reader =
Arc::new(SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap());

let arrow_field = Field::new(
"emptylist",
ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Null, true))),
true,
);

let options = ArrowReaderOptions::default().with_skip_arrow_metadata(true);
let mut arrow_reader = ParquetFileArrowReader::new_with_options(reader, options);
let schema = arrow_reader.get_schema().unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0), &arrow_field);
}

#[test]
fn test_skip_metadata() {
let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
let field = Field::new("col", col.data_type().clone(), true);

let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));

let metadata = [("key".to_string(), "value".to_string())]
.into_iter()
.collect();

let schema_with_metadata =
Arc::new(Schema::new(vec![field.with_metadata(Some(metadata))]));

assert_ne!(schema_with_metadata, schema_without_metadata);

let batch =
RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef])
.unwrap();

let file = |version: WriterVersion| {
let props = WriterProperties::builder()
.set_writer_version(version)
.build();

let file = tempfile().unwrap();
let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
batch.schema(),
Some(props),
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
file
};

let v1_reader = Arc::new(
SerializedFileReader::new(file(WriterVersion::PARQUET_1_0)).unwrap(),
);
let v2_reader = Arc::new(
SerializedFileReader::new(file(WriterVersion::PARQUET_2_0)).unwrap(),
);

let mut arrow_reader = ParquetFileArrowReader::new(v1_reader.clone());
assert_eq!(
&arrow_reader.get_schema().unwrap(),
schema_with_metadata.as_ref()
);

let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
let mut arrow_reader =
ParquetFileArrowReader::new_with_options(v1_reader, options);
assert_eq!(
&arrow_reader.get_schema().unwrap(),
schema_without_metadata.as_ref()
);

let mut arrow_reader = ParquetFileArrowReader::new(v2_reader.clone());
assert_eq!(
&arrow_reader.get_schema().unwrap(),
schema_with_metadata.as_ref()
);

let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
let mut arrow_reader =
ParquetFileArrowReader::new_with_options(v2_reader, options);
assert_eq!(
&arrow_reader.get_schema().unwrap(),
schema_without_metadata.as_ref()
);
}
}
Loading

0 comments on commit 019cd07

Please sign in to comment.