Skip to content

Commit

Permalink
Remove unwraps with result
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Sep 25, 2019
1 parent 6407dee commit 433abab
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 53 deletions.
28 changes: 17 additions & 11 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ impl<T: DataType> PrimitiveArrayReader<T> {
.clone();

let mut record_reader = RecordReader::<T>::new(column_desc.clone());
record_reader.set_page_reader(pages.next().ok_or_else(|| {
general_err!(
"Can't \
build array without pages!"
)
})??)?;
record_reader.set_page_reader(
pages
.next()
.ok_or_else(|| general_err!("Can't build array without pages!"))??,
)?;

Ok(Self {
data_type,
Expand Down Expand Up @@ -217,8 +216,8 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}?;

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.record_reader.reset();
Ok(array)
}
Expand Down Expand Up @@ -305,7 +304,10 @@ impl ArrayReader for StructArrayReader {
)?;

// check that array child data has same size
let children_array_len = children_array.first().unwrap().len();
let children_array_len =
children_array.first().map(|arr| arr.len()).ok_or_else(|| {
general_err!("Struct array reader should have at least one child!")
})?;

let all_children_len_eq = children_array
.iter()
Expand Down Expand Up @@ -371,7 +373,9 @@ impl ArrayReader for StructArrayReader {
let rep_level_data = self
.children
.first()
.unwrap()
.ok_or_else(|| {
general_err!("Struct array reader should have at least one child!")
})?
.get_rep_levels()
.map(|data| -> Result<Buffer> {
let mut buffer = Int16BufferBuilder::new(children_array_len);
Expand Down Expand Up @@ -584,7 +588,9 @@ impl<'a> ArrayReaderBuilder {
let context = ArrayReaderBuilderContext::default();

self.visit_struct(self.root_schema.clone(), &context)
.map(|reader| reader.unwrap())
.and_then(|reader_opt| {
reader_opt.ok_or_else(|| general_err!("Failed to build array reader!"))
})
}

// Utility functions
Expand Down
4 changes: 2 additions & 2 deletions rust/parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ where

let mut array_data = ArrayDataBuilder::new(ArrowSourceType::get_data_type())
.len(record_reader.num_values())
.add_buffer(record_data);
.add_buffer(record_data?);

if let Some(b) = record_reader.consume_bitmap_buffer() {
if let Some(b) = record_reader.consume_bitmap_buffer()? {
array_data = array_data.null_bit_buffer(b);
}

Expand Down
86 changes: 46 additions & 40 deletions rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,13 @@ impl<T: DataType> RecordReader<T> {
/// The implementation has side effects. It will create a new buffer to hold those
/// definition level values that have already been read into memory but not counted
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Option<Buffer> {
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels {
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(
size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
);
new_buffer
.resize(num_left_values * size_of::<i16>())
.unwrap();
new_buffer.resize(num_left_values * size_of::<i16>())?;

let new_def_levels = FatPtr::<i16>::with_offset(&new_buffer, 0);
let new_def_levels = new_def_levels.to_slice_mut();
Expand All @@ -209,29 +207,25 @@ impl<T: DataType> RecordReader<T> {
new_def_levels[0..num_left_values]
.copy_from_slice(&left_def_levels[0..num_left_values]);

def_levels_buf
.resize(self.num_values * size_of::<i16>())
.unwrap();
def_levels_buf.resize(self.num_values * size_of::<i16>())?;
Some(new_buffer)
} else {
None
};

replace(&mut self.def_levels, new_buffer).map(|x| x.freeze())
Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.freeze()))
}

/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
// TODO: Optimize to reduce the copy
let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels {
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(
size_of::<i16>() * max(MIN_BATCH_SIZE, num_left_values),
);
new_buffer
.resize(num_left_values * size_of::<i16>())
.unwrap();
new_buffer.resize(num_left_values * size_of::<i16>())?;

let new_rep_levels = FatPtr::<i16>::with_offset(&new_buffer, 0);
let new_rep_levels = new_rep_levels.to_slice_mut();
Expand All @@ -242,26 +236,23 @@ impl<T: DataType> RecordReader<T> {
new_rep_levels[0..num_left_values]
.copy_from_slice(&left_rep_levels[0..num_left_values]);

rep_levels_buf
.resize(self.num_values * size_of::<i16>())
.unwrap();
rep_levels_buf.resize(self.num_values * size_of::<i16>())?;

Some(new_buffer)
} else {
None
};

replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze())
Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze()))
}

/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> Buffer {
pub fn consume_record_data(&mut self) -> Result<Buffer> {
// TODO: Optimize to reduce the copy
let num_left_values = self.values_written - self.num_values;
let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values));
new_buffer
.resize(num_left_values * T::get_type_size())
.unwrap();
new_buffer.resize(num_left_values * T::get_type_size())?;

let new_records =
FatPtr::<T::T>::with_offset_and_size(&new_buffer, 0, T::get_type_size());
Expand All @@ -277,22 +268,23 @@ impl<T: DataType> RecordReader<T> {
swap(&mut new_records[idx], &mut left_records[idx]);
}

self.records
.resize(self.num_values * T::get_type_size())
.unwrap();
replace(&mut self.records, new_buffer).freeze()
self.records.resize(self.num_values * T::get_type_size())?;

Ok(replace(&mut self.records, new_buffer).freeze())
}

/// Returns currently stored null bitmap data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
// TODO: Optimize to reduce the copy
if self.column_desc.max_def_level() > 0 {
assert!(self.null_bitmap.is_some());
let num_left_values = self.values_written - self.num_values;
let new_bitmap_builder = Some(BooleanBufferBuilder::new(max(
MIN_BATCH_SIZE,
num_left_values,
)));

let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder)
.map(|mut builder| builder.finish())
.unwrap();
Expand All @@ -303,13 +295,12 @@ impl<T: DataType> RecordReader<T> {
self.null_bitmap
.as_mut()
.unwrap()
.append(old_bitmap.is_set(i))
.unwrap();
.append(old_bitmap.is_set(i))?;
}

Some(old_bitmap.to_buffer())
Ok(Some(old_bitmap.to_buffer()))
} else {
None
Ok(None)
}
}

Expand All @@ -325,9 +316,9 @@ impl<T: DataType> RecordReader<T> {
}

/// Returns bitmap data.
pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
self.consume_bitmap_buffer()
.map(|buffer| Bitmap::from(buffer))
.map(|buffer| buffer.map(|b| Bitmap::from(b)))
}

/// Try to read one batch of data.
Expand Down Expand Up @@ -589,9 +580,12 @@ mod tests {
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]).unwrap();
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
assert_eq!(None, record_reader.consume_def_levels().unwrap());
assert_eq!(None, record_reader.consume_bitmap().unwrap());
}

#[test]
Expand Down Expand Up @@ -674,7 +668,10 @@ mod tests {
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[0, 7, 0, 6, 3, 0, 8]).unwrap();
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);

// Verify result def levels
let mut bb = Int16BufferBuilder::new(7);
Expand All @@ -683,15 +680,18 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels()
record_reader.consume_def_levels().unwrap()
);

// Verify bitmap
let mut bb = BooleanBufferBuilder::new(7);
bb.append_slice(&[false, true, false, true, true, false, true])
.unwrap();
let expected_bitmap = Bitmap::from(bb.finish());
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
}

#[test]
Expand Down Expand Up @@ -778,7 +778,10 @@ mod tests {
let mut bb = Int32BufferBuilder::new(9);
bb.append_slice(&[4, 0, 0, 7, 6, 3, 2, 8, 9]).unwrap();
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);

// Verify result def levels
let mut bb = Int16BufferBuilder::new(9);
Expand All @@ -787,15 +790,18 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels()
record_reader.consume_def_levels().unwrap()
);

// Verify bitmap
let mut bb = BooleanBufferBuilder::new(9);
bb.append_slice(&[true, false, false, true, true, true, true, true, true])
.unwrap();
let expected_bitmap = Bitmap::from(bb.finish());
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
}

#[test]
Expand Down

0 comments on commit 433abab

Please sign in to comment.