Skip to content

Commit

Permalink
add bench: decimal with byte array and fixed length byte array (#2529)
Browse files Browse the repository at this point in the history
* add bench: decimal with byte array and fixed length byte array

* change comments
  • Loading branch information
liukun4515 authored Aug 29, 2022
1 parent c6e7680 commit 81f1f81
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 12 deletions.
203 changes: 199 additions & 4 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ use arrow::datatypes::DataType;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
make_byte_array_reader, make_fixed_len_byte_array_reader,
};
use parquet::basic::Type;
use parquet::data_type::FixedLenByteArrayType;
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
arrow::array_reader::ArrayReader,
Expand All @@ -47,6 +52,10 @@ fn build_test_schema() -> SchemaDescPtr {
OPTIONAL INT32 optional_decimal1_leaf (DECIMAL(8,2));
REQUIRED INT64 mandatory_decimal2_leaf (DECIMAL(16,2));
OPTIONAL INT64 optional_decimal2_leaf (DECIMAL(16,2));
REQUIRED BYTE_ARRAY mandatory_decimal3_leaf (DECIMAL(16,2));
OPTIONAL BYTE_ARRAY optional_decimal3_leaf (DECIMAL(16,2));
REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_decimal4_leaf (DECIMAL(16,2));
OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_decimal4_leaf (DECIMAL(16,2));
}
";
parse_message_type(message_type)
Expand All @@ -65,6 +74,71 @@ pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}

// support byte array for decimal
fn build_encoded_decimal_bytes_page_iterator<T>(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
min: i128,
max: i128,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.gen::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
// create the decimal value
let value = rng.gen_range(min..max);
// decimal of parquet use the big-endian to store
let bytes = match column_desc.physical_type() {
Type::BYTE_ARRAY => {
// byte array use the unfixed size
let big_int = BigInt::from(value);
big_int.to_signed_bytes_be()
}
Type::FIXED_LEN_BYTE_ARRAY => {
assert_eq!(column_desc.type_length(), 16);
// fixed length byte array use the fixed size
// the size is 16
value.to_be_bytes().to_vec()
}
_ => unimplemented!(),
};
let value = T::T::from(bytes);
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}
InMemoryPageIterator::new(schema, column_desc, pages)
}

fn build_encoded_primitive_page_iterator<T>(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -326,6 +400,7 @@ fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
}
total_count
}

fn create_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -354,11 +429,27 @@ fn create_primitive_array_reader(
}
}

fn create_decimal_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::BYTE_ARRAY => {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None)
.unwrap()
}
_ => unimplemented!(),
}
}

fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_reader;
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

Expand All @@ -378,6 +469,80 @@ fn create_string_byte_array_dictionary_reader(
.unwrap()
}

fn bench_byte_decimal<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
mandatory_column_desc: &ColumnDescPtr,
optional_column_desc: &ColumnDescPtr,
min: i128,
max: i128,
) where
T: parquet::data_type::DataType,
T::T: From<Vec<u8>>,
{
// all are plain encoding
let mut count: usize = 0;

// plain encoded, no NULLs
let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
mandatory_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
optional_column_desc.clone(),
0.0,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// half null
let data = build_encoded_decimal_bytes_page_iterator::<T>(
schema.clone(),
optional_column_desc.clone(),
0.5,
Encoding::PLAIN,
min,
max,
);
group.bench_function("plain encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_decimal_by_bytes_reader(
data.clone(),
optional_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
}

fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
Expand Down Expand Up @@ -611,9 +776,39 @@ fn decimal_benches(c: &mut Criterion) {
&schema,
&mandatory_decimal2_leaf_desc,
&optional_decimal2_leaf_desc,
// precision is 18: the max is 999999999999999999
999999999999000,
999999999999999,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();

// parquet BYTE_ARRAY, logical type decimal(16,2)
let mut group = c.benchmark_group("arrow_array_reader/BYTE_ARRAY/Decimal128Array");
let mandatory_decimal3_leaf_desc = schema.column(10);
let optional_decimal3_leaf_desc = schema.column(11);
bench_byte_decimal::<ByteArrayType>(
&mut group,
&schema,
&mandatory_decimal3_leaf_desc,
&optional_decimal3_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();

let mut group =
c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array");
let mandatory_decimal4_leaf_desc = schema.column(12);
let optional_decimal4_leaf_desc = schema.column(13);
bench_byte_decimal::<FixedLenByteArrayType>(
&mut group,
&schema,
&mandatory_decimal4_leaf_desc,
&optional_decimal4_leaf_desc,
// precision is 16: the max is 9999999999999999
9999999999999000,
9999999999999999,
);
group.finish();
}
Expand Down
22 changes: 14 additions & 8 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ impl From<ByteArray> for FixedLenByteArray {
}
}

impl From<Vec<u8>> for FixedLenByteArray {
fn from(buf: Vec<u8>) -> FixedLenByteArray {
FixedLenByteArray(ByteArray::from(buf))
}
}

impl From<FixedLenByteArray> for ByteArray {
fn from(other: FixedLenByteArray) -> Self {
other.0
Expand Down Expand Up @@ -1141,9 +1147,9 @@ macro_rules! make_type {
}

fn get_column_reader(
column_writer: ColumnReader,
column_reader: ColumnReader,
) -> Option<ColumnReaderImpl<Self>> {
match column_writer {
match column_reader {
ColumnReader::$reader_ident(w) => Some(w),
_ => None,
}
Expand Down Expand Up @@ -1248,29 +1254,29 @@ impl FromBytes for Int96 {
// FIXME Needed to satisfy the constraint of many decoding functions but ByteArray does not
// appear to actual be converted directly from bytes
impl FromBytes for ByteArray {
type Buffer = [u8; 8];
type Buffer = Vec<u8>;
fn from_le_bytes(bs: Self::Buffer) -> Self {
ByteArray::from(bs.to_vec())
ByteArray::from(bs)
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
}
fn from_ne_bytes(bs: Self::Buffer) -> Self {
ByteArray::from(bs.to_vec())
ByteArray::from(bs)
}
}

impl FromBytes for FixedLenByteArray {
type Buffer = [u8; 8];
type Buffer = Vec<u8>;

fn from_le_bytes(bs: Self::Buffer) -> Self {
Self(ByteArray::from(bs.to_vec()))
Self(ByteArray::from(bs))
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
}
fn from_ne_bytes(bs: Self::Buffer) -> Self {
Self(ByteArray::from(bs.to_vec()))
Self(ByteArray::from(bs))
}
}

Expand Down

0 comments on commit 81f1f81

Please sign in to comment.