Skip to content

Commit

Permalink
fix: IPC writer should truncate string array with all empty string (#…
Browse files Browse the repository at this point in the history
…2314)

* fix: IPC truncation failure

* fix(ipc): get_value_offset_byte_width bug

* pass cargo fmt

* remove redundant comments

* fix(ipc): write_array_data bug

Co-authored-by: jasonnnli <[email protected]>
  • Loading branch information
JasonLi-cn and jasonnnli authored Aug 4, 2022
1 parent e835853 commit d56d88e
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,16 @@ fn get_buffer_element_width(spec: &BufferSpec) -> usize {
}
}

/// Returns byte width for binary value_offset buffer spec.
#[inline]
fn get_value_offset_byte_width(data_type: &DataType) -> usize {
match data_type {
DataType::Binary | DataType::Utf8 => 4,
DataType::LargeBinary | DataType::LargeUtf8 => 8,
_ => unreachable!(),
}
}

/// Returns the number of total bytes in base binary arrays.
fn get_binary_buffer_len(array_data: &ArrayData) -> usize {
if array_data.is_empty() {
Expand Down Expand Up @@ -1005,13 +1015,16 @@ fn write_array_data(
data_type,
DataType::Binary | DataType::LargeBinary | DataType::Utf8 | DataType::LargeUtf8
) {
let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let offset_buffer = &array_data.buffers()[0];
let value_offset_byte_width = get_value_offset_byte_width(data_type);
let min_length = (array_data.len() + 1) * value_offset_byte_width;
if buffer_need_truncate(
array_data.offset(),
value_buffer,
&BufferSpec::VariableWidth,
total_bytes,
offset_buffer,
&BufferSpec::FixedWidth {
byte_width: value_offset_byte_width,
},
min_length,
) {
// Rebase offsets and truncate values
let (new_offsets, byte_offset) =
Expand All @@ -1029,6 +1042,8 @@ fn write_array_data(

offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset);

let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let buffer_length = min(total_bytes, value_buffer.len() - byte_offset);
let buffer_slice =
&value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
Expand Down Expand Up @@ -1832,4 +1847,21 @@ mod tests {
assert!(structs.column(1).is_null(1));
assert_eq!(record_batch_slice, deserialized_batch);
}

#[test]
fn truncate_ipc_string_array_with_all_empty_string() {
fn create_batch() -> RecordBatch {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let a =
StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
}

let record_batch = create_batch();
let record_batch_slice = record_batch.slice(0, 1);
let deserialized_batch = deserialize(serialize(&record_batch_slice));

assert!(serialize(&record_batch).len() > serialize(&record_batch_slice).len());
assert_eq!(record_batch_slice, deserialized_batch);
}
}

0 comments on commit d56d88e

Please sign in to comment.