Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Replaced own allocator by std::Vec. (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Sep 26, 2021
1 parent f761d41 commit 235b7f5
Show file tree
Hide file tree
Showing 22 changed files with 582 additions and 398 deletions.
23 changes: 22 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ jobs:
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc

miri-checks-custom-allocator:
name: MIRI with custom allocator
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup
- name: Run
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full,cache_aligned -- --skip io::parquet --skip io::ipc

coverage:
name: Coverage
runs-on: ubuntu-latest
Expand All @@ -91,7 +110,9 @@ jobs:
- name: Install tarpaulin
run: cargo install cargo-tarpaulin
- name: Run coverage
run: cargo tarpaulin --features full --out Xml
run: |
cargo tarpaulin --features cache_aligned --out Xml
cargo tarpaulin --features full --out Xml
- name: Report coverage
continue-on-error: true
run: bash <(curl -s https://codecov.io/bash)
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ full = [
"merge_sort",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
"chrono-tz"
]
merge_sort = ["itertools"]
io_csv = ["io_csv_read", "io_csv_write"]
Expand All @@ -125,6 +125,9 @@ compute = ["strength_reduce", "multiversion", "lexical-core", "ahash"]
io_parquet = ["parquet2", "io_ipc", "base64", "futures"]
benchmarks = ["rand"]
simd = ["packed_simd"]
# uses a custom allocator whose pointers are aligned along cache lines.
# Using this features makes `Buffer` and `MutableBuffer` incompatible with `Vec`.
cache_aligned = []

[package.metadata.cargo-all-features]
skip_feature_sets = [
Expand Down
6 changes: 0 additions & 6 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,8 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
fn total_allocated_bytes() -> PyResult<isize> {
Ok(arrow2::total_allocated_bytes())
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
Ok(())
Expand Down
9 changes: 0 additions & 9 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@ def __reduce__(self):

class TestCase(unittest.TestCase):
def setUp(self):
self.old_allocated_rust = (
arrow_pyarrow_integration_testing.total_allocated_bytes()
)
self.old_allocated_cpp = pyarrow.total_allocated_bytes()

def tearDown(self):
# No leak of Rust
self.assertEqual(
self.old_allocated_rust,
arrow_pyarrow_integration_testing.total_allocated_bytes(),
)

# No leak of C++ memory
self.assertEqual(self.old_allocated_cpp, pyarrow.total_allocated_bytes())

Expand Down
54 changes: 26 additions & 28 deletions benches/arithmetic_kernels.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,15 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use criterion::Criterion;

use arrow2::array::*;
use arrow2::util::bench_util::*;
use arrow2::{
compute::arithmetics::basic::div::div_scalar, datatypes::DataType, types::NativeType,
compute::arithmetics::basic::add::add, compute::arithmetics::basic::div::div_scalar,
datatypes::DataType, types::NativeType,
};
use num_traits::NumCast;
use std::ops::Div;
use std::ops::{Add, Div};

fn bench_div_scalar<T>(lhs: &PrimitiveArray<T>, rhs: &T)
where
Expand All @@ -34,17 +18,31 @@ where
criterion::black_box(div_scalar(lhs, rhs));
}

fn bench_add<T>(lhs: &PrimitiveArray<T>, rhs: &PrimitiveArray<T>)
where
T: NativeType + Add<Output = T> + NumCast,
{
criterion::black_box(add(lhs, rhs)).unwrap();
}

fn add_benchmark(c: &mut Criterion) {
let size = 65536;
let arr = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 43);
(10..=20).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);
let arr_a = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 43);
let arr_b = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 42);

c.bench_function("divide_scalar 4", |b| {
// 4 is a very fast optimizable divisor
b.iter(|| bench_div_scalar(&arr, &4))
});
c.bench_function("divide_scalar prime", |b| {
// large prime number that is probably harder to simplify
b.iter(|| bench_div_scalar(&arr, &524287))
c.bench_function(&format!("divide_scalar 2^{}", log2_size), |b| {
// 4 is a very fast optimizable divisor
b.iter(|| bench_div_scalar(&arr_a, &4))
});
c.bench_function(&format!("divide_scalar prime 2^{}", log2_size), |b| {
// large prime number that is probably harder to simplify
b.iter(|| bench_div_scalar(&arr_a, &524287))
});

c.bench_function(&format!("add 2^{}", log2_size), |b| {
b.iter(|| bench_add(&arr_a, &arr_b))
});
});
}

Expand Down
5 changes: 0 additions & 5 deletions src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ pub use alignment::ALIGNMENT;
// If this number is not zero after all objects have been `drop`, there is a memory leak
static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0);

/// Returns the total number of bytes allocated to buffers by the allocator.
pub fn total_allocated_bytes() -> isize {
unsafe { ALLOCATIONS.load(std::sync::atomic::Ordering::SeqCst) }
}

/// # Safety
/// This pointer may only be used to check if memory is allocated.
#[inline]
Expand Down
4 changes: 2 additions & 2 deletions src/bitmap/bitmap_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ where

let iterator = iter.map(|left| op(left)).chain(std::iter::once(rem));

let buffer = MutableBuffer::from_trusted_len_iter(iterator);
let buffer = MutableBuffer::from_chunk_iter(iterator);

Bitmap::from_u8_buffer(buffer.into(), length)
Bitmap::from_u8_buffer(buffer, length)
}

/// Apply a bitwise operation `op` to one input and return the result as a [`Bitmap`].
Expand Down
19 changes: 9 additions & 10 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::slice;
use std::{fmt::Debug, fmt::Formatter};
use std::{ptr::NonNull, sync::Arc};

use crate::alloc;
use crate::ffi;
use crate::types::NativeType;
#[cfg(feature = "cache_aligned")]
use crate::vec::AlignedVec as Vec;

/// Mode of deallocating memory regions
pub enum Deallocation {
Expand Down Expand Up @@ -79,11 +80,6 @@ impl<T: NativeType> Bytes<T> {
self.len
}

#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}

#[inline]
pub fn ptr(&self) -> NonNull<T> {
self.ptr
Expand All @@ -94,9 +90,12 @@ impl<T: NativeType> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
Deallocation::Native(capacity) => {
unsafe { alloc::free_aligned(self.ptr, *capacity) };
}
Deallocation::Native(capacity) => unsafe {
#[cfg(feature = "cache_aligned")]
let _ = Vec::from_raw_parts(self.ptr, self.len, *capacity);
#[cfg(not(feature = "cache_aligned"))]
let _ = Vec::from_raw_parts(self.ptr.as_ptr(), self.len, *capacity);
},
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
}
Expand Down Expand Up @@ -127,6 +126,6 @@ impl<T: NativeType> Debug for Bytes<T> {
}
}

// This is sound because `Bytes` is an imutable container
// This is sound because `Bytes` is an immutable container
unsafe impl<T: NativeType> Send for Bytes<T> {}
unsafe impl<T: NativeType> Sync for Bytes<T> {}
13 changes: 11 additions & 2 deletions src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ impl<T: NativeType> Buffer<T> {
MutableBuffer::from_len_zeroed(length).into()
}

/// Auxiliary method to create a new Buffer
/// Takes ownership of [`Vec`].
/// # Implementation
/// This function is `O(1)`
#[cfg(not(feature = "cache_aligned"))]
#[cfg_attr(docsrs, doc(cfg(not(feature = "cache_aligned"))))]
#[inline]
pub fn from_bytes(bytes: Bytes<T>) -> Self {
pub fn from_vec(data: Vec<T>) -> Self {
MutableBuffer::from_vec(data).into()
}

/// Auxiliary method to create a new Buffer
pub(crate) fn from_bytes(bytes: Bytes<T>) -> Self {
let length = bytes.len();
Buffer {
data: Arc::new(bytes),
Expand Down
1 change: 0 additions & 1 deletion src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod immutable;
mod mutable;

pub(crate) mod bytes;
pub(crate) mod util;

pub use immutable::Buffer;
pub use mutable::MutableBuffer;
Loading

0 comments on commit 235b7f5

Please sign in to comment.