Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading of dictionary encoded pages with null values (#1111) #1130

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,153 @@ mod tests {
);
}

#[test]
fn test_complex_array_reader_dict_enc_string() {
use crate::encoding::{DictEncoder, Encoder};
use crate::memory::MemTracker;
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_mid {
OPTIONAL BYTE_ARRAY leaf (UTF8);
}
}
";
let num_pages = 2;
let values_per_page = 100;
let str_base = "Hello World";

let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();

assert_eq!(max_def_level, 2);
assert_eq!(max_rep_level, 1);

let mut rng = thread_rng();
let mut pages: Vec<Vec<Page>> = Vec::new();

let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
let mut all_values = Vec::with_capacity(num_pages * values_per_page);

for i in 0..num_pages {
let mem_tracker = Arc::new(MemTracker::new());
let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
// add data page
let mut values = Vec::with_capacity(values_per_page);

for _ in 0..values_per_page {
let def_level = rng.gen_range(0..max_def_level + 1);
let rep_level = rng.gen_range(0..max_rep_level + 1);
if def_level == max_def_level {
let len = rng.gen_range(1..str_base.len());
let slice = &str_base[..len];
values.push(ByteArray::from(slice));
all_values.push(Some(slice.to_string()));
} else {
all_values.push(None)
}
rep_levels.push(rep_level);
def_levels.push(def_level)
}

let range = i * values_per_page..(i + 1) * values_per_page;
let mut pb =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
let _ = dict_encoder.put(&values);
let indices = dict_encoder
.write_indices()
.expect("write_indices() should be OK");
pb.add_indices(indices);
let data_page = pb.consume();
// for each page log num_values vs actual values in page
// println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len());
// add dictionary page
let dict = dict_encoder
.write_dict()
.expect("write_dict() should be OK");
let dict_page = Page::DictionaryPage {
buf: dict,
num_values: dict_encoder.num_entries() as u32,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
};
pages.push(vec![dict_page, data_page]);
}

let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
let converter = Utf8Converter::new(Utf8ArrayConverter {});
let mut array_reader =
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
None,
)
.unwrap();

let mut accu_len: usize = 0;

// println!("---------- reading a batch of {} values ----------", values_per_page / 2);
let array = array_reader.next_batch(values_per_page / 2).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
accu_len += array.len();

// Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
// and the last values_per_page/2 ones are from the second column chunk
// println!("---------- reading a batch of {} values ----------", values_per_page);
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..array.len() {
if array.is_valid(i) {
assert_eq!(
all_values[i + accu_len].as_ref().unwrap().as_str(),
strings.value(i)
)
} else {
assert_eq!(all_values[i + accu_len], None)
}
}
accu_len += array.len();

// Try to read values_per_page values, however there are only values_per_page/2 values
// println!("---------- reading a batch of {} values ----------", values_per_page);
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
}

/// Array reader for test.
struct InMemoryArrayReader {
data_type: ArrowType,
Expand Down
Loading