diff --git a/arrow-data/src/byte_view.rs b/arrow-data/src/byte_view.rs index b8b1731ac60..2cdabad3f71 100644 --- a/arrow-data/src/byte_view.rs +++ b/arrow-data/src/byte_view.rs @@ -39,6 +39,11 @@ impl ByteView { | ((self.buffer_index as u128) << 64) | ((self.offset as u128) << 96) } + + #[inline(always)] + pub fn as_bytes(self) -> [u8; 16] { + self.as_u128().to_le_bytes() + } } impl From for ByteView { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 958594c9323..945f62526a7 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Fields, SchemaBuilder}; -use crate::arrow::array_reader::byte_array::make_byte_view_array_reader; +use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index d0aa6f7b1eb..19086878c15 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -74,36 +74,6 @@ pub fn make_byte_array_reader( } } -/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. -pub fn make_byte_view_array_reader( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, -) -> Result> { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { - ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, - _ => ArrowType::BinaryView, - }, - }; - - match data_type { - ArrowType::BinaryView | ArrowType::Utf8View => { - let reader = GenericRecordReader::new(column_desc); - Ok(Box::new(ByteArrayReader::::new( - pages, data_type, reader, - ))) - } - - _ => Err(general_err!( - "invalid data type for byte array reader read to view type - {}", - data_type - )), - } -} - /// An [`ArrayReader`] for variable length byte arrays struct ByteArrayReader { data_type: ArrowType, @@ -618,7 +588,7 @@ mod tests { use super::*; use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use arrow_array::{Array, StringArray, StringViewArray}; + use arrow_array::{Array, StringArray}; use arrow_buffer::Buffer; #[test] @@ -676,64 +646,6 @@ mod tests { } } - #[test] - fn test_byte_array_string_view_decoder() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]); - - let column_desc = utf8_column(); - let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(output.values.as_slice(), "hello".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5]); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); - - assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); - assert_eq!( - output.values.as_slice(), - "helloworldlarge payload over 12 bytesb".as_bytes() - ); - assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("world"), - None, - Some("large payload over 12 bytes"), - Some("b"), - None, - None, - ] - ); - } - } - #[test] fn test_byte_array_decoder_skip() { let (pages, encoded_dictionary) = @@ -778,60 +690,6 @@ mod tests { } } - #[test] - fn test_byte_array_string_view_decoder_skip() { - let (pages, encoded_dictionary) = - byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); - - let column_desc = utf8_column(); - let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); - - decoder - .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) - .unwrap(); - - for (encoding, page) in pages { - let mut output = OffsetBuffer::::default(); - decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - - assert_eq!(output.values.as_slice(), "hello".as_bytes()); - assert_eq!(output.offsets.as_slice(), &[0, 5]); - - assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.skip_values(1).unwrap(), 1); - - assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); - assert_eq!( - output.values.as_slice(), - "hellolarge payload over 12 bytes".as_bytes() - ); - assert_eq!(output.offsets.as_slice(), &[0, 5, 32]); - - assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); - - let valid = [false, false, true, true, false, false]; - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); - let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - - assert_eq!( - strings.iter().collect::>(), - vec![ - None, - None, - Some("hello"), - Some("large payload over 12 bytes"), - None, - None, - ] - ); - } - } - #[test] fn test_byte_array_decoder_nulls() { let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs new file mode 100644 index 00000000000..044f34734b2 --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -0,0 +1,645 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow_array::ArrayRef; +use arrow_schema::DataType as ArrowType; +use bytes::Bytes; +use std::any::Any; + +/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. +pub fn make_byte_view_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() { + ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View, + _ => ArrowType::BinaryView, + }, + }; + + match data_type { + ArrowType::Utf8View | ArrowType::BinaryView => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteViewArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + record_reader: GenericRecordReader, +} + +impl ByteViewArrayReader { + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteViewArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self) -> Result { + let buffer = self.record_reader.consume_record_data(); + let null_buffer = self.record_reader.consume_bitmap_buffer(); + self.def_levels_buffer = self.record_reader.consume_def_levels(); + self.rep_levels_buffer = self.record_reader.consume_rep_levels(); + self.record_reader.reset(); + + let array: ArrayRef = buffer.into_array(null_buffer, self.data_type.clone()); + + Ok(array) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + skip_records(&mut self.record_reader, self.pages.as_mut(), num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteViewArrayColumnValueDecoder { + dict: Option, + decoder: Option, + validate_utf8: bool, +} + +impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { + type Buffer = ViewBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: Bytes, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = ViewBuffer::default(); + let mut decoder = ByteViewArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + self.decoder = Some(ByteViewArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, num_values, self.dict.as_ref()) + } + + fn skip_values(&mut self, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) + } +} + +/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] +pub enum ByteViewArrayDecoder { + Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), + DeltaLength(ByteViewArrayDecoderDeltaLength), + DeltaByteArray(ByteViewArrayDecoderDelta), +} + +impl ByteViewArrayDecoder { + pub fn new( + encoding: Encoding, + data: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Result { + let decoder = match encoding { + Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( + ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( + ByteViewArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut ViewBuffer, + len: usize, + dict: Option<&ViewBuffer>, + ) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.read(out, dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } + + /// Skip `len` values + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { + match self { + ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = + dict.ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), + ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderPlain { + buf: Bytes, + offset: usize, + validate_utf8: bool, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteViewArrayDecoderPlain { + pub fn new( + buf: Bytes, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Self { + Self { + buf, + validate_utf8, + offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.max_remaining_values); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte view array".into())); + } + + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + + self.offset = end_offset; + read += 1; + } + self.max_remaining_values -= to_read; + + Ok(to_read) + } + + pub fn skip(&mut self, to_skip: usize) -> Result { + let to_skip = to_skip.min(self.max_remaining_values); + let mut skip = 0; + let buf = self.buf.as_ref(); + + while self.offset < self.buf.len() && skip != to_skip { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + skip += 1; + self.offset = self.offset + 4 + len; + } + self.max_remaining_values -= skip; + Ok(skip) + } +} + +/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDeltaLength { + lengths: Vec, + data: Bytes, + length_offset: usize, + data_offset: usize, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDeltaLength { + fn new(data: Bytes, validate_utf8: bool) -> Result { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.clone(), 0)?; + let values = len_decoder.values_left(); + + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + validate_utf8, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let to_read = len.min(self.lengths.len() - self.length_offset); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; + + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + if self.data_offset + total_bytes > self.data.len() { + return Err(ParquetError::EOF( + "Insufficient delta length byte array bytes".to_string(), + )); + } + + let mut start_offset = self.data_offset; + for length in src_lengths { + let end_offset = start_offset + *length as usize; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; + start_offset = end_offset; + } + + self.data_offset = start_offset; + self.length_offset += to_read; + + Ok(to_read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + let remain_values = self.lengths.len() - self.length_offset; + let to_skip = remain_values.min(to_skip); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + self.data_offset += total_bytes; + self.length_offset += to_skip; + Ok(to_skip) + } +} + +/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDelta { + decoder: DeltaByteArrayDecoder, + validate_utf8: bool, +} + +impl ByteViewArrayDecoderDelta { + fn new(data: Bytes, validate_utf8: bool) -> Result { + Ok(Self { + decoder: DeltaByteArrayDecoder::new(data)?, + validate_utf8, + }) + } + + fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { + let read = self + .decoder + .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; + + Ok(read) + } + + fn skip(&mut self, to_skip: usize) -> Result { + self.decoder.skip(to_skip) + } +} + +/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`ViewBuffer`] +pub struct ByteViewArrayDecoderDictionary { + decoder: DictIndexDecoder, +} + +impl ByteViewArrayDecoderDictionary { + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values), + } + } + + fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.read(len, |keys| { + output.extend_from_dictionary(keys, &dict.views, &dict.buffers) + }) + } + + fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { + // All data must be NULL + if dict.is_empty() { + return Ok(0); + } + + self.decoder.skip(to_skip) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use arrow_array::{Array, StringViewArray}; + use arrow_buffer::Buffer; + + #[test] + fn test_byte_array_decoder() { + let (pages, encoded_dictionary) = byte_array_all_encodings(vec![ + "hello", + "world", + "here comes the snow", + "large payload over 12 bytes", + ]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + let first = output.views.first().unwrap(); + let len = *first as u32; + + assert_eq!( + &first.to_le_bytes()[4..4 + len as usize], + "hello".as_bytes() + ); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("here comes the snow"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + let first = output.views.first().unwrap(); + let len = *first as u32; + + assert_eq!( + &first.to_le_bytes()[4..4 + len as usize], + "hello".as_bytes() + ); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); + + let valid = [false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("large payload over 12 bytes"), + None, + None, + ] + ); + } + } + + #[test] + fn test_byte_array_decoder_nulls() { + let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + // test nulls read + for (encoding, page) in pages.clone() { + let mut output = ViewBuffer::default(); + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); + } + + // test nulls skip + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.skip_values(1024).unwrap(), 0); + } + } +} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index f662156543e..142daa41c21 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -32,6 +32,7 @@ use crate::file::reader::{FilePageIterator, FileReader}; mod builder; mod byte_array; mod byte_array_dictionary; +mod byte_view_array; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -46,8 +47,8 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; -pub use byte_array::make_byte_view_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; +pub use byte_view_array::make_byte_view_array_reader; #[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs index cbc795d94f5..544c8c88626 100644 --- a/parquet/src/arrow/buffer/mod.rs +++ b/parquet/src/arrow/buffer/mod.rs @@ -20,3 +20,4 @@ pub mod bit_util; pub mod dictionary_buffer; pub mod offset_buffer; +pub(crate) mod view_buffer; diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 32bb9d0862b..ce9eb1142a5 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -18,13 +18,10 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; -use arrow_array::builder::GenericByteViewBuilder; -use arrow_array::types::BinaryViewType; use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::DataType as ArrowType; -use std::sync::Arc; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] @@ -128,47 +125,18 @@ impl OffsetBuffer { /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { - match data_type { - ArrowType::Utf8View => { - let mut builder = self.build_generic_byte_view(); - Arc::new(builder.finish().to_string_view().unwrap()) - } - ArrowType::BinaryView => { - let mut builder = self.build_generic_byte_view(); - Arc::new(builder.finish()) - } - _ => { - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(self.len()) - .add_buffer(Buffer::from_vec(self.offsets)) - .add_buffer(Buffer::from_vec(self.values)) - .null_bit_buffer(null_buffer); - - let data = match cfg!(debug_assertions) { - true => array_data_builder.build().unwrap(), - false => unsafe { array_data_builder.build_unchecked() }, - }; - - make_array(data) - } - } - } - - fn build_generic_byte_view(self) -> GenericByteViewBuilder { - let mut builder = GenericByteViewBuilder::::with_capacity(self.len()); - let mut values = self.values; - for window in self.offsets.windows(2) { - let start = window[0]; - let end = window[1]; - let len = (end - start).to_usize().unwrap(); - let b = values.drain(..len).collect::>(); - if b.is_empty() { - builder.append_null(); - } else { - builder.append_value(b); - } - } - builder + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(self.len()) + .add_buffer(Buffer::from_vec(self.offsets)) + .add_buffer(Buffer::from_vec(self.values)) + .null_bit_buffer(null_buffer); + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() }, + }; + + make_array(data) } } @@ -225,7 +193,7 @@ impl ValuesBuffer for OffsetBuffer { #[cfg(test)] mod tests { use super::*; - use arrow_array::{Array, LargeStringArray, StringArray, StringViewArray}; + use arrow_array::{Array, LargeStringArray, StringArray}; #[test] fn test_offset_buffer_empty() { @@ -276,44 +244,6 @@ mod tests { ); } - #[test] - fn test_string_view() { - let mut buffer = OffsetBuffer::::default(); - for v in [ - "hello", - "world", - "large payload over 12 bytes", - "a", - "b", - "c", - ] { - buffer.try_push(v.as_bytes(), false).unwrap() - } - let split = std::mem::take(&mut buffer); - - let array = split.into_array(None, ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - assert_eq!( - strings.iter().map(|x| x.unwrap()).collect::>(), - vec![ - "hello", - "world", - "large payload over 12 bytes", - "a", - "b", - "c" - ] - ); - - buffer.try_push("test".as_bytes(), false).unwrap(); - let array = buffer.into_array(None, ArrowType::Utf8View); - let strings = array.as_any().downcast_ref::().unwrap(); - assert_eq!( - strings.iter().map(|x| x.unwrap()).collect::>(), - vec!["test"] - ); - } - #[test] fn test_offset_buffer_pad_nulls() { let mut buffer = OffsetBuffer::::default(); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs new file mode 100644 index 00000000000..1344de2ac42 --- /dev/null +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::{ParquetError, Result}; +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; +use arrow_data::{ArrayDataBuilder, ByteView}; +use arrow_schema::DataType as ArrowType; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +#[derive(Debug, Default)] +pub struct ViewBuffer { + pub views: Vec, + pub buffers: Vec, + pub nulls: Option, +} + +impl ViewBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.views.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + let length: u32 = data.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let offset = self.buffers.len() as u32; + self.buffers.extend_from_slice(data); + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset, + }; + self.views.push(view.into()); + Ok(()) + } + + /// Extends this buffer with a list of keys + pub fn extend_from_dictionary( + &mut self, + keys: &[K], + dict_views: &[u128], + dict_buffers: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.as_usize(); + if index >= dict_views.len() { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_views.len() + )); + } + + let view = dict_views[index]; + let len = view as u32; + + // Dictionary values are verified when decoding dictionary page, so validate_utf8 is false here. + if len <= 12 { + self.try_push(&view.to_le_bytes()[4..4 + len as usize], false)?; + } else { + let offset = (view >> 96) as usize; + self.try_push(&dict_buffers[offset..offset + len as usize], false)? + }; + } + Ok(()) + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { + let len = self.len(); + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .add_buffer(Buffer::from_vec(self.buffers)) + .null_bit_buffer(null_buffer); + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() }, + }; + + make_array(data) + } +} + +impl ValuesBuffer for ViewBuffer { + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + self.views.resize(read_offset + levels_read, 0); + + let values_range = read_offset..read_offset + values_read; + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + self.views[level_pos] = self.views[value_pos]; + } + } +}