Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate ArrayData Buffer Alignment and Automatically Align IPC buffers (#4255) #4681

Merged
merged 6 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,13 +1037,17 @@ mod tests {
#[should_panic(
expected = "Memory pointer is not aligned with the specified scalar type"
)]
// Different error messages, so skip for now
// https://github.com/apache/arrow-rs/issues/1545
#[cfg(not(feature = "force_validate"))]
fn test_primitive_array_alignment() {
let buf = Buffer::from_slice_ref([0_u64]);
let buf2 = buf.slice(1);
let array_data = ArrayData::builder(DataType::Int32)
.add_buffer(buf2)
.build()
.unwrap();
let array_data = unsafe {
ArrayData::builder(DataType::Int32)
.add_buffer(buf2)
.build_unchecked()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to use build_unchecked here, as ArrayData will now complain about the alignment 🎉

};
drop(Int32Array::from(array_data));
}

Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,6 @@ pub type LargeBinaryType = GenericBinaryType<i64>;
mod tests {
use super::*;
use arrow_data::{layout, BufferSpec};
use std::mem::size_of;

#[test]
fn month_day_nano_should_roundtrip() {
Expand Down Expand Up @@ -1541,7 +1540,8 @@ mod tests {
assert_eq!(
spec,
&BufferSpec::FixedWidth {
byte_width: size_of::<T::Native>()
byte_width: std::mem::size_of::<T::Native>(),
alignment: std::mem::align_of::<T::Native>(),
}
);
}
Expand Down
230 changes: 159 additions & 71 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_iterator::BitSliceIterator;
use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer};
use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
use arrow_schema::{ArrowError, DataType, UnionMode};
use std::convert::TryInto;
use std::mem;
Expand Down Expand Up @@ -451,7 +451,7 @@ impl ArrayData {

for spec in layout.buffers.iter() {
match spec {
BufferSpec::FixedWidth { byte_width } => {
BufferSpec::FixedWidth { byte_width, .. } => {
let buffer_size =
self.len.checked_mul(*byte_width).ok_or_else(|| {
ArrowError::ComputeError(
Expand Down Expand Up @@ -699,6 +699,23 @@ impl ArrayData {
Self::new_null(data_type, 0)
}

/// Verifies that the buffers meet the minimum alignment requirements for the data type
///
/// Buffers that are not adequately aligned will be copied to a new aligned allocation
///
/// This can be useful for when interacting with data sent over IPC or FFI, that may
/// not meet the minimum alignment requirements
fn align_buffers(&mut self) {
let layout = layout(&self.data_type);
for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) {
if let BufferSpec::FixedWidth { alignment, .. } = spec {
if buffer.as_ptr().align_offset(*alignment) != 0 {
*buffer = Buffer::from_slice_ref(buffer.as_ref())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we maybe log a debug message when this happens? Some users might find it confusing that the buffers are silently copied / re-aligned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't currently have a logging setup #1495

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we consider to use a feature for automatic alignment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I read this PR, if a user is creating ArrayData directly, they have to explicitly call align_buffer in order to ensure the data is aligned and the documentation says it may copy the data.

If they are using the IPC reader then the reader may automatically align (by copying) the data, which seems better than erroring.

As a user, if I didn't want buffers realigned automatically, it would make probably want a setting on the IPC reader that said "error rather than silently fix" on unaligned data. That seems like something we could add as a follow on PR if someone wants that behavior

}
}
}
}

/// "cheap" validation of an `ArrayData`. Ensures buffers are
/// sufficiently sized to store `len` + `offset` total elements of
/// `data_type` and performs other inexpensive consistency checks.
Expand Down Expand Up @@ -736,17 +753,26 @@ impl ArrayData {
self.buffers.iter().zip(layout.buffers.iter()).enumerate()
{
match spec {
BufferSpec::FixedWidth { byte_width } => {
let min_buffer_size = len_plus_offset
.checked_mul(*byte_width)
.expect("integer overflow computing min buffer size");
BufferSpec::FixedWidth {
byte_width,
alignment,
} => {
let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);

if buffer.len() < min_buffer_size {
return Err(ArrowError::InvalidArgumentError(format!(
"Need at least {} bytes in buffers[{}] in array of type {:?}, but got {}",
min_buffer_size, i, self.data_type, buffer.len()
)));
}

let align_offset = buffer.as_ptr().align_offset(*alignment);
if align_offset != 0 {
return Err(ArrowError::InvalidArgumentError(format!(
"Misaligned buffers[{i}] in array of type {:?}, offset from expected alignment of {alignment} by {}",
self.data_type, align_offset.min(alignment - align_offset)
)));
}
}
BufferSpec::VariableWidth => {
// not cheap to validate (need to look at the
Expand Down Expand Up @@ -1493,7 +1519,8 @@ impl ArrayData {
pub fn layout(data_type: &DataType) -> DataTypeLayout {
// based on C/C++ implementation in
// https://github.com/apache/arrow/blob/661c7d749150905a63dd3b52e0a04dac39030d95/cpp/src/arrow/type.h (and .cc)
use std::mem::size_of;
use arrow_schema::IntervalUnit::*;

match data_type {
DataType::Null => DataTypeLayout {
buffers: vec![],
Expand All @@ -1503,44 +1530,52 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
buffers: vec![BufferSpec::BitMap],
can_contain_null_mask: true,
},
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Interval(_) => {
DataTypeLayout::new_fixed_width(data_type.primitive_width().unwrap())
}
DataType::Duration(_) => DataTypeLayout::new_fixed_width(size_of::<i64>()),
DataType::Binary => DataTypeLayout::new_binary(size_of::<i32>()),
DataType::FixedSizeBinary(bytes_per_value) => {
let bytes_per_value: usize = (*bytes_per_value)
.try_into()
.expect("negative size for fixed size binary");
DataTypeLayout::new_fixed_width(bytes_per_value)
DataType::Int8 => DataTypeLayout::new_fixed_width::<i8>(),
DataType::Int16 => DataTypeLayout::new_fixed_width::<i16>(),
DataType::Int32 => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Int64 => DataTypeLayout::new_fixed_width::<i64>(),
DataType::UInt8 => DataTypeLayout::new_fixed_width::<u8>(),
DataType::UInt16 => DataTypeLayout::new_fixed_width::<u16>(),
DataType::UInt32 => DataTypeLayout::new_fixed_width::<u32>(),
DataType::UInt64 => DataTypeLayout::new_fixed_width::<u64>(),
DataType::Float16 => DataTypeLayout::new_fixed_width::<half::f16>(),
DataType::Float32 => DataTypeLayout::new_fixed_width::<f32>(),
DataType::Float64 => DataTypeLayout::new_fixed_width::<f64>(),
DataType::Timestamp(_, _) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Date32 => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Date64 => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Time32(_) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Time64(_) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Interval(YearMonth) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Interval(DayTime) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Interval(MonthDayNano) => DataTypeLayout::new_fixed_width::<i128>(),
DataType::Duration(_) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Decimal128(_, _) => DataTypeLayout::new_fixed_width::<i128>(),
DataType::Decimal256(_, _) => DataTypeLayout::new_fixed_width::<i256>(),
DataType::FixedSizeBinary(size) => {
let spec = BufferSpec::FixedWidth {
byte_width: (*size).try_into().unwrap(),
alignment: mem::align_of::<u8>(),
};
DataTypeLayout {
buffers: vec![spec],
can_contain_null_mask: true,
}
}
DataType::LargeBinary => DataTypeLayout::new_binary(size_of::<i64>()),
DataType::Utf8 => DataTypeLayout::new_binary(size_of::<i32>()),
DataType::LargeUtf8 => DataTypeLayout::new_binary(size_of::<i64>()),
DataType::List(_) => DataTypeLayout::new_fixed_width(size_of::<i32>()),
DataType::Binary => DataTypeLayout::new_binary::<i32>(),
DataType::LargeBinary => DataTypeLayout::new_binary::<i64>(),
DataType::Utf8 => DataTypeLayout::new_binary::<i32>(),
DataType::LargeUtf8 => DataTypeLayout::new_binary::<i64>(),
DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data
DataType::LargeList(_) => DataTypeLayout::new_fixed_width(size_of::<i64>()),
DataType::List(_) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::LargeList(_) => DataTypeLayout::new_fixed_width::<i64>(),
DataType::Map(_, _) => DataTypeLayout::new_fixed_width::<i32>(),
DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data,
DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all in child data,
DataType::Union(_, mode) => {
let type_ids = BufferSpec::FixedWidth {
byte_width: size_of::<i8>(),
byte_width: mem::size_of::<i8>(),
alignment: mem::align_of::<i8>(),
};

DataTypeLayout {
Expand All @@ -1552,7 +1587,8 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
vec![
type_ids,
BufferSpec::FixedWidth {
byte_width: size_of::<i32>(),
byte_width: mem::size_of::<i32>(),
alignment: mem::align_of::<i32>(),
},
]
}
Expand All @@ -1561,19 +1597,6 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
}
}
DataType::Dictionary(key_type, _value_type) => layout(key_type),
DataType::Decimal128(_, _) => {
// Decimals are always some fixed width; The rust implementation
// always uses 16 bytes / size of i128
DataTypeLayout::new_fixed_width(size_of::<i128>())
}
DataType::Decimal256(_, _) => {
// Decimals are always some fixed width.
DataTypeLayout::new_fixed_width(32)
}
DataType::Map(_, _) => {
// same as ListType
DataTypeLayout::new_fixed_width(size_of::<i32>())
}
}
}

Expand All @@ -1589,10 +1612,13 @@ pub struct DataTypeLayout {
}

impl DataTypeLayout {
/// Describes a basic numeric array where each element has a fixed width
pub fn new_fixed_width(byte_width: usize) -> Self {
/// Describes a basic numeric array where each element has type `T`
pub fn new_fixed_width<T>() -> Self {
Self {
buffers: vec![BufferSpec::FixedWidth { byte_width }],
buffers: vec![BufferSpec::FixedWidth {
byte_width: mem::size_of::<T>(),
alignment: mem::align_of::<T>(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One consequence of this formulation is the alignment is architecture specific, I suspect this is fine, but it potentially worth highlighting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well let's hope nobody mmaps arrow data between different architectures w/o carefully thinking about alignment.

On that note: I wonder what the cost would be to heavily overalign buffers in general to to make them valid on all supported architectures. Like why not just align everything to 256bits?

Copy link
Contributor Author

@tustvold tustvold Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like why not just align everything to 256bits

We do, Rust doesn't 😄 And at least until the allocator API is stabilised there is no way to over-align a Vec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we are aligning all buffers to native sizes -- I thought the point was that the arrow spec said we should align buffers to 64 bits -- if that is the case, shouldn't this code reflect the spec, not the native size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because native alignment is what the arrow-rs implementation requires, for many types this is less than 64-bits. We don't, for example, care if the values buffer of a StringArray is aligned at all. On the flip side, some architectures, such as aarch64 require more than 64-bit alignment for i128, and so we require that also

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the documentation added in ff6af65 make the behavior in this PR much clearer.

Thank you

}],
can_contain_null_mask: true,
}
}
Expand All @@ -1608,14 +1634,15 @@ impl DataTypeLayout {
}

/// Describes a basic numeric array where each element has a fixed
/// with offset buffer of `offset_byte_width` bytes, followed by a
/// with offset buffer of type `T`, followed by a
/// variable width data buffer
pub fn new_binary(offset_byte_width: usize) -> Self {
pub fn new_binary<T>() -> Self {
Self {
buffers: vec![
// offsets
BufferSpec::FixedWidth {
byte_width: offset_byte_width,
byte_width: mem::size_of::<T>(),
alignment: mem::align_of::<T>(),
},
// values
BufferSpec::VariableWidth,
Expand All @@ -1628,8 +1655,18 @@ impl DataTypeLayout {
/// Layout specification for a single data type buffer
#[derive(Debug, PartialEq, Eq)]
pub enum BufferSpec {
/// each element has a fixed width
FixedWidth { byte_width: usize },
/// Each element is a fixed width primitive, with the given `byte_width` and `alignment`
///
/// `alignment` is the alignment required by Rust for an array of the corresponding primitive,
/// see [`Layout::array`](std::alloc::Layout::array) and [`std::mem::align_of`].
///
/// Arrow-rs requires that all buffers are have at least this alignment, to allow for
/// [slice](std::slice) based APIs. We do not require alignment in excess of this to allow
/// for array slicing, and interoperability with `Vec` which in the absence of support
/// for custom allocators, cannot be over-aligned.
///
/// Note that these alignment requirements will vary between architectures
FixedWidth { byte_width: usize, alignment: usize },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document here what alignment represents (e.g. required pointer alignment for buffers of this type)

/// Variable width, such as string data for utf8 data
VariableWidth,
/// Buffer holds a bitmap.
Expand Down Expand Up @@ -1741,6 +1778,15 @@ impl ArrayDataBuilder {
/// apply.
#[allow(clippy::let_and_return)]
pub unsafe fn build_unchecked(self) -> ArrayData {
let data = self.build_impl();
// Provide a force_validate mode
#[cfg(feature = "force_validate")]
data.validate_data().unwrap();
data
}

/// Same as [`Self::build_unchecked`] but ignoring `force_validate` feature flag
unsafe fn build_impl(self) -> ArrayData {
let nulls = self.nulls.or_else(|| {
let buffer = self.null_bit_buffer?;
let buffer = BooleanBuffer::new(buffer, self.offset, self.len);
Expand All @@ -1750,26 +1796,41 @@ impl ArrayDataBuilder {
})
});

let data = ArrayData {
ArrayData {
data_type: self.data_type,
len: self.len,
offset: self.offset,
buffers: self.buffers,
child_data: self.child_data,
nulls: nulls.filter(|b| b.null_count() != 0),
};

// Provide a force_validate mode
#[cfg(feature = "force_validate")]
data.validate_data().unwrap();
data
}
}

/// Creates an array data, validating all inputs
#[allow(clippy::let_and_return)]
pub fn build(self) -> Result<ArrayData, ArrowError> {
let data = unsafe { self.build_unchecked() };
#[cfg(not(feature = "force_validate"))]
let data = unsafe { self.build_impl() };
data.validate_data()?;
Ok(data)
}

/// Creates an array data, validating all inputs, and aligning any buffers
///
/// Rust requires that arrays are aligned to their corresponding primitive,
/// see [`Layout::array`](std::alloc::Layout::array) and [`std::mem::align_of`].
///
/// [`ArrayData`] therefore requires that all buffers are have at least this alignment,
/// to allow for [slice](std::slice) based APIs. See [`BufferSpec::FixedWidth`].
///
/// As this alignment is architecture specific, and not guaranteed by all arrow implementations,
/// this method is provided to automatically copy buffers to a new correctly aligned allocation
/// when necessary, making it useful when interacting with buffers produced by other systems,
/// e.g. IPC or FFI.
///
/// This is unlike `[Self::build`] which will instead return an error on encountering
/// insufficiently aligned buffers.
pub fn build_aligned(self) -> Result<ArrayData, ArrowError> {
let mut data = unsafe { self.build_impl() };
data.align_buffers();
data.validate_data()?;
Ok(data)
}
Expand Down Expand Up @@ -2057,4 +2118,31 @@ mod tests {
assert_eq!(buffers.len(), layout.buffers.len());
}
}

#[test]
fn test_alignment() {
let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let sliced = buffer.slice(1);

let mut data = ArrayData {
data_type: DataType::Int32,
len: 0,
offset: 0,
buffers: vec![buffer],
child_data: vec![],
nulls: None,
};
data.validate_full().unwrap();

data.buffers[0] = sliced;
let err = data.validate().unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: Misaligned buffers[0] in array of type Int32, offset from expected alignment of 4 by 1"
);

data.align_buffers();
data.validate_full().unwrap();
}
}
Loading