Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into alamb/full_array_da…
Browse files Browse the repository at this point in the history
…ta_construction_validation
  • Loading branch information
alamb committed Dec 3, 2021
2 parents 76baca1 + 9fb2a5f commit 4436472
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 70 deletions.
12 changes: 6 additions & 6 deletions arrow/src/array/equal/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ pub(super) fn child_logical_null_buffer(
let array_offset = parent_data.offset();
let bitmap_len = bit_util::ceil(parent_len * len, 8);
let mut buffer = MutableBuffer::from_len_zeroed(bitmap_len);
let mut null_slice = buffer.as_slice_mut();
let null_slice = buffer.as_slice_mut();
(array_offset..parent_len + array_offset).for_each(|index| {
let start = index * len;
let end = start + len;
let mask = parent_bitmap.is_set(index);
(start..end).for_each(|child_index| {
if mask && self_null_bitmap.is_set(child_index) {
bit_util::set_bit(&mut null_slice, child_index);
bit_util::set_bit(null_slice, child_index);
}
});
});
Expand All @@ -151,12 +151,12 @@ pub(super) fn child_logical_null_buffer(
// slow path
let array_offset = parent_data.offset();
let mut buffer = MutableBuffer::new_null(parent_len);
let mut null_slice = buffer.as_slice_mut();
let null_slice = buffer.as_slice_mut();
(0..parent_len).for_each(|index| {
if parent_bitmap.is_set(index + array_offset)
&& self_null_bitmap.is_set(index + array_offset)
{
bit_util::set_bit(&mut null_slice, index);
bit_util::set_bit(null_slice, index);
}
});
Some(buffer.into())
Expand All @@ -182,7 +182,7 @@ fn logical_list_bitmap<OffsetSize: OffsetSizeTrait>(
let offset_start = offsets.first().unwrap().to_usize().unwrap();
let offset_len = offsets.get(parent_data.len()).unwrap().to_usize().unwrap();
let mut buffer = MutableBuffer::new_null(offset_len - offset_start);
let mut null_slice = buffer.as_slice_mut();
let null_slice = buffer.as_slice_mut();

offsets
.windows(2)
Expand All @@ -194,7 +194,7 @@ fn logical_list_bitmap<OffsetSize: OffsetSizeTrait>(
let mask = parent_bitmap.is_set(index);
(start..end).for_each(|child_index| {
if mask && child_bitmap.is_set(child_index) {
bit_util::set_bit(&mut null_slice, child_index - offset_start);
bit_util::set_bit(null_slice, child_index - offset_start);
}
});
});
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/array/transform/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
let buffer = &mut mutable.buffer1;
resize_for_bits(buffer, mutable.len + len);
set_bits(
&mut buffer.as_slice_mut(),
buffer.as_slice_mut(),
values,
mutable.len,
array.offset() + start,
Expand Down
10 changes: 1 addition & 9 deletions arrow/src/compute/kernels/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,14 @@ where
}

/// Options that define how `take` should behave
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct TakeOptions {
/// Perform bounds check before taking indices from values.
/// If enabled, an `ArrowError` is returned if the indices are out of bounds.
/// If not enabled, and indices exceed bounds, the kernel will panic.
pub check_bounds: bool,
}

impl Default for TakeOptions {
fn default() -> Self {
Self {
check_bounds: false,
}
}
}

#[inline(always)]
fn maybe_usize<I: ArrowNativeType>(index: I) -> Result<usize> {
index
Expand Down
4 changes: 2 additions & 2 deletions arrow/src/compute/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub(super) mod tests {
values.append(&mut array);
} else {
list_null_count += 1;
bit_util::unset_bit(&mut list_bitmap.as_slice_mut(), idx);
bit_util::unset_bit(list_bitmap.as_slice_mut(), idx);
}
offset.push(values.len() as i64);
}
Expand Down Expand Up @@ -386,7 +386,7 @@ pub(super) mod tests {
values.extend(items.into_iter());
} else {
list_null_count += 1;
bit_util::unset_bit(&mut list_bitmap.as_slice_mut(), idx);
bit_util::unset_bit(list_bitmap.as_slice_mut(), idx);
values.extend(vec![None; length as usize].into_iter());
}
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl Field {
DataType::Struct(mut fields) => match map.get("children") {
Some(Value::Array(values)) => {
let struct_fields: Result<Vec<Field>> =
values.iter().map(|v| Field::from(v)).collect();
values.iter().map(Field::from).collect();
fields.append(&mut struct_fields?);
DataType::Struct(fields)
}
Expand Down
5 changes: 1 addition & 4 deletions arrow/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,7 @@ impl Schema {
match *json {
Value::Object(ref schema) => {
let fields = if let Some(Value::Array(fields)) = schema.get("fields") {
fields
.iter()
.map(|f| Field::from(f))
.collect::<Result<_>>()?
fields.iter().map(Field::from).collect::<Result<_>>()?
} else {
return Err(ArrowError::ParseError(
"Schema fields should be an array".to_string(),
Expand Down
16 changes: 8 additions & 8 deletions arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,9 @@ fn write_continuation<W: Write>(
/// Write array data to a vector of bytes
fn write_array_data(
array_data: &ArrayData,
mut buffers: &mut Vec<ipc::Buffer>,
mut arrow_data: &mut Vec<u8>,
mut nodes: &mut Vec<ipc::FieldNode>,
buffers: &mut Vec<ipc::Buffer>,
arrow_data: &mut Vec<u8>,
nodes: &mut Vec<ipc::FieldNode>,
offset: i64,
num_rows: usize,
null_count: usize,
Expand All @@ -775,11 +775,11 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};

offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset);
offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
}

array_data.buffers().iter().for_each(|buffer| {
offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset);
offset = write_buffer(buffer, buffers, arrow_data, offset);
});

if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) {
Expand All @@ -788,9 +788,9 @@ fn write_array_data(
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
&mut buffers,
&mut arrow_data,
&mut nodes,
buffers,
arrow_data,
nodes,
offset,
data_ref.len(),
data_ref.null_count(),
Expand Down
79 changes: 66 additions & 13 deletions arrow/src/util/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! available unless `feature = "prettyprint"` is enabled.
use crate::{array::ArrayRef, record_batch::RecordBatch};
use std::fmt::Display;

use comfy_table::{Cell, Table};

Expand All @@ -27,13 +28,16 @@ use crate::error::Result;
use super::display::array_value_to_string;

///! Create a visual representation of record batches
pub fn pretty_format_batches(results: &[RecordBatch]) -> Result<String> {
Ok(create_table(results)?.to_string())
pub fn pretty_format_batches(results: &[RecordBatch]) -> Result<impl Display> {
create_table(results)
}

///! Create a visual representation of columns
pub fn pretty_format_columns(col_name: &str, results: &[ArrayRef]) -> Result<String> {
Ok(create_column(col_name, results)?.to_string())
pub fn pretty_format_columns(
col_name: &str,
results: &[ArrayRef],
) -> Result<impl Display> {
create_column(col_name, results)
}

///! Prints a visual representation of record batches to stdout
Expand Down Expand Up @@ -115,6 +119,7 @@ mod tests {

use super::*;
use crate::array::{DecimalBuilder, FixedSizeListBuilder, Int32Array};
use std::fmt::Write;
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -144,7 +149,7 @@ mod tests {
],
)?;

let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();

let expected = vec![
"+---+-----+",
Expand Down Expand Up @@ -176,7 +181,7 @@ mod tests {
Arc::new(array::StringArray::from(vec![Some("e"), None, Some("g")])),
];

let table = pretty_format_columns("a", &columns)?;
let table = pretty_format_columns("a", &columns)?.to_string();

let expected = vec![
"+---+", "| a |", "+---+", "| a |", "| b |", "| |", "| d |", "| e |",
Expand Down Expand Up @@ -208,7 +213,7 @@ mod tests {
// define data (null)
let batch = RecordBatch::try_new(schema, arrays).unwrap();

let table = pretty_format_batches(&[batch]).unwrap();
let table = pretty_format_batches(&[batch]).unwrap().to_string();

let expected = vec![
"+---+---+---+",
Expand Down Expand Up @@ -244,7 +249,7 @@ mod tests {

let batch = RecordBatch::try_new(schema, vec![array])?;

let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();

let expected = vec![
"+-------+",
Expand Down Expand Up @@ -285,7 +290,7 @@ mod tests {
let array = Arc::new(builder.finish());

let batch = RecordBatch::try_new(schema, vec![array])?;
let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();
let expected = vec![
"+-----------+",
"| d1 |",
Expand Down Expand Up @@ -320,7 +325,9 @@ mod tests {
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();

let table = pretty_format_batches(&[batch]).expect("formatting batches");
let table = pretty_format_batches(&[batch])
.expect("formatting batches")
.to_string();

let expected = $EXPECTED_RESULT;
let actual: Vec<&str> = table.lines().collect();
Expand Down Expand Up @@ -494,7 +501,7 @@ mod tests {

let batch = RecordBatch::try_new(schema, vec![dm])?;

let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();

let expected = vec![
"+-------+",
Expand Down Expand Up @@ -535,7 +542,7 @@ mod tests {

let batch = RecordBatch::try_new(schema, vec![dm])?;

let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();
let expected = vec![
"+------+", "| f |", "+------+", "| 101 |", "| |", "| 200 |",
"| 3040 |", "+------+",
Expand Down Expand Up @@ -589,7 +596,7 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)])
.unwrap();

let table = pretty_format_batches(&[batch])?;
let table = pretty_format_batches(&[batch])?.to_string();
let expected = vec![
r#"+-------------------------------------+----+"#,
r#"| c1 | c2 |"#,
Expand All @@ -605,4 +612,50 @@ mod tests {

Ok(())
}

#[test]
fn test_writing_formatted_batches() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Int32, true),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(array::StringArray::from(vec![
Some("a"),
Some("b"),
None,
Some("d"),
])),
Arc::new(array::Int32Array::from(vec![
Some(1),
None,
Some(10),
Some(100),
])),
],
)?;

let mut buf = String::new();
write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();

let s = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| a | 1 |",
"| b | |",
"| | 10 |",
"| d | 100 |",
"+---+-----+",
];
let expected = String::from(s.join("\n"));
assert_eq!(expected, buf);

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub async fn scenario_setup(port: &str) -> Result {
pub struct AuthBasicProtoScenarioImpl {
username: Arc<str>,
password: Arc<str>,
#[allow(dead_code)]
peer_identity: Arc<Mutex<Option<String>>>,
}

Expand Down
16 changes: 8 additions & 8 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ fn get_col_writer(

#[allow(clippy::borrowed_box)]
fn write_leaves(
mut row_group_writer: &mut Box<dyn RowGroupWriter>,
row_group_writer: &mut Box<dyn RowGroupWriter>,
array: &arrow_array::ArrayRef,
mut levels: &mut Vec<LevelInfo>,
levels: &mut Vec<LevelInfo>,
) -> Result<()> {
match array.data_type() {
ArrowDataType::Null
Expand Down Expand Up @@ -173,7 +173,7 @@ fn write_leaves(
| ArrowDataType::LargeUtf8
| ArrowDataType::Decimal(_, _)
| ArrowDataType::FixedSizeBinary(_) => {
let mut col_writer = get_col_writer(&mut row_group_writer)?;
let mut col_writer = get_col_writer(row_group_writer)?;
write_leaf(
&mut col_writer,
array,
Expand All @@ -186,7 +186,7 @@ fn write_leaves(
// write the child list
let data = array.data();
let child_array = arrow_array::make_array(data.child_data()[0].clone());
write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
write_leaves(row_group_writer, &child_array, levels)?;
Ok(())
}
ArrowDataType::Struct(_) => {
Expand All @@ -195,7 +195,7 @@ fn write_leaves(
.downcast_ref::<arrow_array::StructArray>()
.expect("Unable to get struct array");
for field in struct_array.columns() {
write_leaves(&mut row_group_writer, field, &mut levels)?;
write_leaves(row_group_writer, field, levels)?;
}
Ok(())
}
Expand All @@ -204,15 +204,15 @@ fn write_leaves(
.as_any()
.downcast_ref::<arrow_array::MapArray>()
.expect("Unable to get map array");
write_leaves(&mut row_group_writer, &map_array.keys(), &mut levels)?;
write_leaves(&mut row_group_writer, &map_array.values(), &mut levels)?;
write_leaves(row_group_writer, &map_array.keys(), levels)?;
write_leaves(row_group_writer, &map_array.values(), levels)?;
Ok(())
}
ArrowDataType::Dictionary(_, value_type) => {
// cast dictionary to a primitive
let array = arrow::compute::cast(array, value_type)?;

let mut col_writer = get_col_writer(&mut row_group_writer)?;
let mut col_writer = get_col_writer(row_group_writer)?;
write_leaf(
&mut col_writer,
&array,
Expand Down
Loading

0 comments on commit 4436472

Please sign in to comment.