Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MapArrayReader (#2484) (#1699) (#1561) #2500

Merged
merged 3 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ fn build_map_reader(
field.arrow_type.clone(),
field.def_level,
field.rep_level,
field.nullable,
)))
}

Expand Down
281 changes: 151 additions & 130 deletions parquet/src/arrow/array_reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,67 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError::ArrowError;
use crate::errors::{ParquetError, Result};
use arrow::array::{Array, ArrayDataBuilder, ArrayRef, MapArray};
use arrow::buffer::{Buffer, MutableBuffer};
use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
use crate::errors::Result;
use arrow::array::{Array, ArrayRef, MapArray};
use arrow::datatypes::DataType as ArrowType;
use arrow::datatypes::ToByteSlice;
use arrow::util::bit_util;
use std::any::Any;
use std::sync::Arc;

/// Implementation of a map array reader.
pub struct MapArrayReader {
key_reader: Box<dyn ArrayReader>,
value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
map_def_level: i16,
#[allow(unused)]
map_rep_level: i16,
reader: ListArrayReader<i32>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this inner might me more idiomatic

image

}

impl MapArrayReader {
/// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and `nullable`
/// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
pub fn new(
key_reader: Box<dyn ArrayReader>,
value_reader: Box<dyn ArrayReader>,
data_type: ArrowType,
def_level: i16,
rep_level: i16,
nullable: bool,
) -> Self {
Self {
key_reader,
value_reader,
data_type,
// These are the wrong way round https://github.com/apache/arrow-rs/issues/1699
map_def_level: rep_level,
map_rep_level: def_level,
}
let struct_def_level = match nullable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could document what nullable means in this context as part of a docstring

true => def_level + 2,
false => def_level + 1,
};
let struct_rep_level = rep_level + 1;

let element = match &data_type {
ArrowType::Map(element, _) => match element.data_type() {
ArrowType::Struct(fields) if fields.len() == 2 => {
// Parquet cannot represent nullability at this level (#1697)
// and so encountering nullability here indicates some manner
// of schema inconsistency / inference bug
assert!(!element.is_nullable(), "map struct cannot be nullable");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in #1697 the encoding of DataType::Map permits greater nullability than the type can actually contain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1697 is a question -- so is the solution "we will assume that the inner struct can not be nullable until we have an existence proof to the contrary?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it is more, if this level is nullable the schema is inconsistent as there is no way to represent that in parquet 😅 The schema inference logic will never generate this - https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/schema/complex.rs#L350

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth a comment there

element
}
_ => unreachable!("expected struct with two fields"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually it might be a nicer UX to return an error rather than panic'ing here (e.g. MapArrayReader::try_new()) but that would also change the API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a public API so I could change it, but that also makes me less inclined to do so. It is a bug in the builder if you run into this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense 👍

},
_ => unreachable!("expected map type"),
};

let struct_reader = StructArrayReader::new(
element.data_type().clone(),
vec![key_reader, value_reader],
struct_def_level,
struct_rep_level,
false,
);

let reader = ListArrayReader::new(
Box::new(struct_reader),
ArrowType::List(element.clone()),
def_level,
rep_level,
nullable,
);

Self { data_type, reader }
}
}

Expand All @@ -65,131 +89,128 @@ impl ArrayReader for MapArrayReader {
}

fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let key_len = self.key_reader.read_records(batch_size)?;
let value_len = self.value_reader.read_records(batch_size)?;
// Check that key and value have the same lengths
if key_len != value_len {
return Err(general_err!(
"Map key and value should have the same lengths."
));
}
Ok(key_len)
self.reader.read_records(batch_size)
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
let key_array = self.key_reader.consume_batch()?;
let value_array = self.value_reader.consume_batch()?;

// Check that key and value have the same lengths
let key_length = key_array.len();
if key_length != value_array.len() {
return Err(general_err!(
"Map key and value should have the same lengths."
));
}

let def_levels = self
.key_reader
.get_def_levels()
.ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?;
let rep_levels = self
.key_reader
.get_rep_levels()
.ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?;

if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == key_length)) {
return Err(ArrowError(
"Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
));
}

let entry_data_type = if let ArrowType::Map(field, _) = &self.data_type {
field.data_type().clone()
} else {
return Err(ArrowError("Expected a map arrow type".to_string()));
};

let entry_data = ArrayDataBuilder::new(entry_data_type)
.len(key_length)
.add_child_data(key_array.into_data())
.add_child_data(value_array.into_data());
let entry_data = unsafe { entry_data.build_unchecked() };

let entry_len = rep_levels.iter().filter(|level| **level == 0).count();

// first item in each list has rep_level = 0, subsequent items have rep_level = 1
let mut offsets: Vec<i32> = Vec::new();
let mut cur_offset = 0;
def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
if *r == 0 || d == &self.map_def_level {
offsets.push(cur_offset);
}
if d > &self.map_def_level {
cur_offset += 1;
}
});
offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
// TODO: A useful optimization is to use the null count to fill with
// 0 or null, to reduce individual bits set in a loop.
// To favour dense data, set every slot to true, then unset
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let null_slice = null_buf.as_slice_mut();
let mut list_index = 0;
for i in 0..rep_levels.len() {
// If the level is lower than empty, then the slot is null.
// When a list is non-nullable, its empty level = null level,
// so this automatically factors that in.
if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
// should be empty list
bit_util::unset_bit(null_slice, list_index);
}
if rep_levels[i] == 0 {
list_index += 1;
}
}
let value_offsets = Buffer::from(&offsets.to_byte_slice());

// Now we can build array data
let array_data = ArrayDataBuilder::new(self.data_type.clone())
.len(entry_len)
.add_buffer(value_offsets)
.null_bit_buffer(Some(null_buf.into()))
.add_child_data(entry_data);

let array_data = unsafe { array_data.build_unchecked() };

Ok(Arc::new(MapArray::from(array_data)))
// A MapArray is just a ListArray with a StructArray child
// we can therefore just alter the ArrayData
let array = self.reader.consume_batch().unwrap();
let data = array.data().clone();
let builder = data.into_builder().data_type(self.data_type.clone());

// SAFETY - we can assume that ListArrayReader produces valid ListArray
// of the expected type, and as such its output can be reinterpreted as
// a MapArray without validation
Ok(Arc::new(MapArray::from(unsafe {
builder.build_unchecked()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unsafe is OK because the inner StructArrayReader already validated the contents of the structs, and MapArrayReader::new() already validated the DataType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much, will add a doc comment

})))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let key_skipped = self.key_reader.skip_records(num_records)?;
let value_skipped = self.value_reader.skip_records(num_records)?;
if key_skipped != value_skipped {
return Err(general_err!(
"MapArrayReader out of sync, skipped {} keys and {} values",
key_skipped,
value_skipped
));
}
Ok(key_skipped)
self.reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_def_levels()
self.reader.get_def_levels()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
// Children repetition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_rep_levels()
self.reader.get_rep_levels()
}
}

#[cfg(test)]
mod tests {
//TODO: Add unit tests (#1561)
use super::*;
use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::ArrowWriter;
use arrow::array;
use arrow::array::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow::datatypes::{Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credit to @d-willis for this test

// This test writes a parquet file with the following data:
// +--------------------------------------------------------+
// |map |
// +--------------------------------------------------------+
// |null |
// |null |
// |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
// +--------------------------------------------------------+
//
// It then attempts to read the data back and checks that the third record
// contains the expected values.
fn read_map_array_column() {
// Schema for single map of string to int32
let schema = Schema::new(vec![Field::new(
"map",
ArrowType::Map(
Box::new(Field::new(
"entries",
ArrowType::Struct(vec![
Field::new("keys", ArrowType::Utf8, false),
Field::new("values", ArrowType::Int32, true),
]),
false,
)),
false, // Map field not sorted
),
true,
)]);

// Create builders for map
let string_builder = StringBuilder::new(5);
let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new(1);
let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);

// Add two null records and one record with five entries
map_builder.append(false).expect("adding null map entry");
map_builder.append(false).expect("adding null map entry");
map_builder.keys().append_value("three");
map_builder.keys().append_value("four");
map_builder.keys().append_value("five");
map_builder.keys().append_value("six");
map_builder.keys().append_value("seven");

map_builder.values().append_value(3);
map_builder.values().append_value(4);
map_builder.values().append_value(5);
map_builder.values().append_value(6);
map_builder.values().append_value(7);
map_builder.append(true).expect("adding map entry");

// Create record batch
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
.expect("create record batch");

// Write record batch to file
let mut buffer = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None)
.expect("creat file writer");
writer.write(&batch).expect("writing file");
writer.close().expect("close writer");

// Read file
let reader = Bytes::from(buffer);
let record_batch_reader =
ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
for maybe_record_batch in record_batch_reader {
let record_batch = maybe_record_batch.expect("Getting current batch");
let col = record_batch.column(0);
assert!(col.is_null(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

assert!(col.is_null(1));
let map_entry = array::as_map_array(col).value(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this test also validate that the first two entries are null? Something like

assert!(!array::as_map_array(col).is_valid(0));
assert!(!array::as_map_array(col).is_valid(1));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test was originally written to test my very specific failure condition.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a worthwhile addition though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry I probably should have phrased that as "I suggest this test also validate the first two entries are null"

let struct_col = array::as_struct_array(&map_entry);
let key_col = array::as_string_array(struct_col.column(0)); // Key column
assert_eq!(key_col.value(0), "three");
assert_eq!(key_col.value(1), "four");
assert_eq!(key_col.value(2), "five");
assert_eq!(key_col.value(3), "six");
assert_eq!(key_col.value(4), "seven");
}
}
}