Skip to content

Commit

Permalink
Added support to the Arrow C stream interface (read and write) (jorge…
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and Dexter Duckworth committed Mar 2, 2022
1 parent 29e563a commit 7b822a0
Show file tree
Hide file tree
Showing 17 changed files with 926 additions and 564 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ documentation of each of its APIs.
* Most feature-complete implementation of Apache Arrow after the reference implementation (C++)
* Float 16 unsupported (not a Rust native type)
* Decimal 256 unsupported (not a Rust native type)
* FFI supported for all Arrow types
* C data interface supported for all Arrow types (read and write)
* C stream interface supported for all Arrow types (read and write)
* Full interoperability with Rust's `Vec`
* MutableArray API to work with bitmaps and arrays in-place
* Full support for timestamps with timezones, including arithmetics that take
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Note that this crate uses two languages and an external ABI:

Pyarrow exposes a C ABI to convert arrow arrays from and to its C implementation, see [here](https://arrow.apache.org/docs/format/CDataInterface.html).

This package uses the equivalent struct in Rust (`arrow::array::ArrowArray`), and verifies that
This package uses the equivalent struct in Rust (`arrow::array::InternalArrowArray`), and verifies that
we can use pyarrow's interface to move pointers from and to Rust.

## Relevant literature
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[build-system]
requires = ["maturin"]
requires = ["maturin>=0.12,<0.13"]
build-backend = "maturin"
63 changes: 63 additions & 0 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

use arrow2::array::{Int32Array, StructArray};
use arrow2::datatypes::DataType;
use arrow2::ffi;

use super::*;

pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
let stream = Box::new(ffi::ArrowArrayStream::empty());

let stream_ptr = &*stream as *const ffi::ArrowArrayStream;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?;

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?;

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
arrays.push(py_array)
}
Ok(arrays)
}

pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::from_data(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
vec![Arc::new(array)],
None,
);
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
31 changes: 22 additions & 9 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
mod c_stream;

use std::error;
use std::fmt;
Expand Down Expand Up @@ -51,11 +52,11 @@ impl From<PyO3ArrowError> for PyErr {

fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::Ffi_ArrowArray::empty());
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let array_ptr = &*array as *const ffi::Ffi_ArrowArray;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let array_ptr = &*array as *const ffi::ArrowArray;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -73,8 +74,8 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
}

fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);
Expand All @@ -101,9 +102,9 @@ fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {

fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
// prepare a pointer to receive the Array struct
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -115,7 +116,7 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
}

fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
Expand Down Expand Up @@ -153,9 +154,21 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
c_stream::to_rust_iterator(ob, py)
}

#[pyfunction]
pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
c_stream::from_rust_iterator(py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?;
m.add_function(wrap_pyfunction!(from_rust_iterator, m)?)?;
Ok(())
}
29 changes: 29 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_c_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import unittest

import pyarrow.ipc

import arrow_pyarrow_integration_testing


class TestCase(unittest.TestCase):
def test_rust_reads(self):
schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())])
a = pyarrow.array([1, None, 2], type=pyarrow.int32())

batch = pyarrow.record_batch([a], schema)
reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch])

arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader)

array = arrays[0].field(0)
assert array == a

def test_pyarrow_reads(self):
stream = arrow_pyarrow_integration_testing.from_rust_iterator()

arrays = [a for a in stream]

expected = pyarrow.RecordBatch.from_arrays([pyarrow.array([2, None, 1, None], pyarrow.int32())], names=["a"])
expected = [expected, expected, expected]

self.assertEqual(arrays, expected)
13 changes: 5 additions & 8 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ use std::sync::Arc;

unsafe fn export(
array: Arc<dyn Array>,
array_ptr: *mut ffi::Ffi_ArrowArray,
schema_ptr: *mut ffi::Ffi_ArrowSchema,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
let field = Field::new("a", array.data_type().clone(), true);
ffi::export_array_to_c(array, array_ptr);
ffi::export_field_to_c(&field, schema_ptr);
}

unsafe fn import(
array: Box<ffi::Ffi_ArrowArray>,
schema: &ffi::Ffi_ArrowSchema,
) -> Result<Box<dyn Array>> {
unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
let field = ffi::import_field_from_c(schema)?;
ffi::import_array_from_c(array, field.data_type)
}
Expand All @@ -28,8 +25,8 @@ fn main() -> Result<()> {

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

// since FFIs work in raw pointers, let's temporarily relinquish ownership so that producers
// can write into it in a thread-safe manner
Expand Down
2 changes: 1 addition & 1 deletion src/buffer/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::ArrowArray>),
Foreign(Arc<ffi::InternalArrowArray>),
}

impl Debug for Deallocation {
Expand Down
Loading

0 comments on commit 7b822a0

Please sign in to comment.