Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyO3 bridge for pyarrow interoperability / fix arrow integration test #691

Merged
merged 8 commits into from
Sep 1, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
PyO3 bridge for pyarrow interoperability
kszucs committed Aug 31, 2021
commit d5af0527ec6d4937c53ace41c3fe94b256ee0ffc
4 changes: 2 additions & 2 deletions arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ name = "arrow_pyarrow_integration_testing"
crate-type = ["cdylib"]

[dependencies]
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT" }
pyo3 = { version = "0.12.1", features = ["extension-module"] }
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT", features = ["pyarrow"] }
pyo3 = { version = "0.14", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
206 changes: 44 additions & 162 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,21 +18,21 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.

use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::{libc::uintptr_t, prelude::*};
//use libc::uintptr_t;

use arrow::array::{make_array_from_raw, ArrayRef, Int64Array};
use arrow::array::{ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi;
use arrow::ffi::FFI_ArrowSchema;
use arrow::pyarrow::PyArrowConvert;
use arrow::record_batch::RecordBatch;

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
@@ -71,216 +71,98 @@ impl From<PyO3ArrowError> for PyErr {
}
}

#[pyclass]
struct PyDataType {
inner: DataType,
}

#[pyclass]
struct PyField {
inner: Field,
}

#[pyclass]
struct PySchema {
inner: Schema,
}

#[pymethods]
impl PyDataType {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: dtype })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PyField {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: field })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PySchema {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: schema })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(schema.into())
}
}

impl<'source> FromPyObject<'source> for PyDataType {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyDataType::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PyField {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyField::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PySchema {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PySchema::from_pyarrow(value)
}
}

fn array_to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array_pointer, schema_pointer) =
ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() });

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(
py,
"_export_to_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;

let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) }
.map_err(PyO3ArrowError::from)?;
Ok(array)
}

fn array_to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) = array.to_raw().map_err(PyO3ArrowError::from)?;

let pa = py.import("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
Ok(array.to_object(py))
}

/// Returns `array + array` of an int64 array.
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
fn double(array: &PyAny, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
let array = ArrayRef::from_pyarrow(array)?;

// perform some operation
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::ParseError("Expects an int64".to_string()))
})?;
let array = kernels::arithmetic::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
array_to_py(array, py)
array.to_pyarrow(py)
}

/// calls a lambda function that receives and returns an array
/// whose result must be the array multiplied by two
#[pyfunction]
fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
fn double_py(lambda: &PyAny, py: Python) -> PyResult<bool> {
// create
let array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef;

// to py
let pyarray = array_to_py(array, py)?;
let pyarray = lambda.call1(py, (pyarray,))?;
let array = array_to_rust(pyarray, py)?;
let pyarray = array.to_pyarrow(py)?;
let pyarray = lambda.call1((pyarray,))?;
let array = ArrayRef::from_pyarrow(pyarray)?;

Ok(array == expected)
}

/// Returns the substring
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
fn substring(array: ArrayData, start: i64) -> PyResult<ArrayData> {
// import
let array = array_to_rust(array, py)?;
let array = ArrayRef::from(array);

// substring
let array = kernels::substring::substring(array.as_ref(), start, &None)
.map_err(PyO3ArrowError::from)?;

// export
array_to_py(array, py)
Ok(array.data().to_owned())
}

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
fn concatenate(array: ArrayData, py: Python) -> PyResult<PyObject> {
let array = ArrayRef::from(array);

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
.map_err(PyO3ArrowError::from)?;

// export
array_to_py(array, py)
array.to_pyarrow(py)
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(pyarray, py)?;
fn round_trip_type(obj: DataType) -> PyResult<DataType> {
Ok(obj)
}

// export
array_to_py(array, py)
#[pyfunction]
fn round_trip_field(obj: Field) -> PyResult<Field> {
Ok(obj)
}

#[pyfunction]
fn round_trip_schema(obj: Schema) -> PyResult<Schema> {
Ok(obj)
}

#[pyfunction]
fn round_trip_array(obj: ArrayData) -> PyResult<ArrayData> {
Ok(obj)
}

#[pyfunction]
fn round_trip_record_batch(obj: RecordBatch) -> PyResult<RecordBatch> {
Ok(obj)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDataType>()?;
m.add_class::<PyField>()?;
m.add_class::<PySchema>()?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
m.add_wrapped(wrap_pyfunction!(concatenate))?;
m.add_wrapped(wrap_pyfunction!(round_trip))?;
m.add_wrapped(wrap_pyfunction!(round_trip_type))?;
m.add_wrapped(wrap_pyfunction!(round_trip_field))?;
m.add_wrapped(wrap_pyfunction!(round_trip_schema))?;
m.add_wrapped(wrap_pyfunction!(round_trip_array))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
Ok(())
}
26 changes: 12 additions & 14 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
import pyarrow as pa
import pytz

from arrow_pyarrow_integration_testing import PyDataType, PyField, PySchema
import arrow_pyarrow_integration_testing as rust


@@ -113,43 +112,42 @@ def assert_pyarrow_leak():

@pytest.mark.parametrize("pyarrow_type", _supported_pyarrow_types, ids=str)
def test_type_roundtrip(pyarrow_type):
ty = PyDataType.from_pyarrow(pyarrow_type)
restored = ty.to_pyarrow()
restored = rust.round_trip_type(pyarrow_type)
assert restored == pyarrow_type
assert restored is not pyarrow_type


@pytest.mark.parametrize("pyarrow_type", _unsupported_pyarrow_types, ids=str)
def test_type_roundtrip_raises(pyarrow_type):
with pytest.raises(Exception):
PyDataType.from_pyarrow(pyarrow_type)
rust.round_trip_type(pyarrow_type)


def test_dictionary_type_roundtrip():
# the dictionary type conversion is incomplete
pyarrow_type = pa.dictionary(pa.int32(), pa.string())
ty = PyDataType.from_pyarrow(pyarrow_type)
assert ty.to_pyarrow() == pa.int32()
ty = rust.round_trip_type(pyarrow_type)
assert ty == pa.int32()


@pytest.mark.parametrize('pyarrow_type', _supported_pyarrow_types, ids=str)
def test_field_roundtrip(pyarrow_type):
pyarrow_field = pa.field("test", pyarrow_type, nullable=True)
field = PyField.from_pyarrow(pyarrow_field)
assert field.to_pyarrow() == pyarrow_field
field = rust.round_trip_field(pyarrow_field)
assert field == pyarrow_field

if pyarrow_type != pa.null():
# A null type field may not be non-nullable
pyarrow_field = pa.field("test", pyarrow_type, nullable=False)
field = PyField.from_pyarrow(pyarrow_field)
assert field.to_pyarrow() == pyarrow_field
field = rust.round_trip_field(pyarrow_field)
assert field == pyarrow_field


def test_schema_roundtrip():
pyarrow_fields = zip(string.ascii_lowercase, _supported_pyarrow_types)
pyarrow_schema = pa.schema(pyarrow_fields)
schema = PySchema.from_pyarrow(pyarrow_schema)
assert schema.to_pyarrow() == pyarrow_schema
schema = rust.round_trip_schema(pyarrow_schema)
assert schema == pyarrow_schema


def test_primitive_python():
@@ -205,7 +203,7 @@ def test_list_array():
Python -> Rust -> Python
"""
a = pa.array([[], None, [1, 2], [4, 5, 6]], pa.list_(pa.int64()))
b = rust.round_trip(a)
b = rust.round_trip_array(a)
b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
@@ -261,7 +259,7 @@ def test_decimal_python():
None
]
a = pa.array(data, pa.decimal128(6, 2))
b = rust.round_trip(a)
b = rust.round_trip_array(a)
assert a == b
del a
del b
Loading