Skip to content

Commit

Permalink
Python FFI bridge for Schema, Field and DataType (#439)
Browse files Browse the repository at this point in the history
* FFI bridge for Schema, Field and DataType

* Factor out conversion to datatypes/ffi.rs

* Add flags

* Rust tests

* Test datatypes from the python test suite

* Install a pinned nightly pyarrow wheel

* Python tests for Field and Schema

* Cleanup

* Remove comment

* cleanup

* Fix python tests after rebase

* fix clippy

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
kszucs and alamb authored Jul 2, 2021
1 parent ef88876 commit 31bc052
Show file tree
Hide file tree
Showing 8 changed files with 870 additions and 474 deletions.
53 changes: 52 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on:

jobs:

docker:
integration:
name: Integration Test
runs-on: ubuntu-latest
steps:
Expand All @@ -46,3 +46,54 @@ jobs:
run: pip install -e dev/archery[docker]
- name: Execute Docker Build
run: archery docker run -e ARCHERY_INTEGRATION_WITH_RUST=1 conda-integration

# test FFI against the C-Data interface exposed by pyarrow
pyarrow-integration-test:
name: Test Pyarrow C Data Interface
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt clippy
- name: Cache Cargo
uses: actions/cache@v2
with:
path: /home/runner/.cargo
key: cargo-maturin-cache-
- name: Cache Rust dependencies
uses: actions/cache@v2
with:
path: /home/runner/target
# this key is not equal because maturin uses different compilation flags.
key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}-
- uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Upgrade pip and setuptools
run: pip install --upgrade pip setuptools wheel
- name: Install python dependencies
run: pip install maturin==0.8.2 toml==0.10.1 pytest pytz
- name: Install nightly pyarrow wheel
# this points to a nightly pyarrow build containing neccessary
# API for integration testing (https://github.com/apache/arrow/pull/10529)
# the hardcoded version is wrong and should be removed either
# after https://issues.apache.org/jira/browse/ARROW-13083
# gets fixes or pyarrow 5.0 gets released
hardcoded version is wrong, bot contains
run: pip install --index-url https://pypi.fury.io/arrow-nightlies/ pyarrow==3.1.0.dev1030
- name: Run tests
env:
CARGO_HOME: "/home/runner/.cargo"
CARGO_TARGET_DIR: "/home/runner/target"
working-directory: arrow-pyarrow-integration-testing
run: |
maturin develop
pytest -v .
46 changes: 0 additions & 46 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -283,52 +283,6 @@ jobs:
continue-on-error: true
run: bash <(curl -s https://codecov.io/bash)

# test FFI against the C-Data interface exposed by pyarrow
pyarrow-integration-test:
name: Test Pyarrow C Data Interface
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt clippy
- name: Cache Cargo
uses: actions/cache@v2
with:
path: /home/runner/.cargo
key: cargo-maturin-cache-
- name: Cache Rust dependencies
uses: actions/cache@v2
with:
path: /home/runner/target
# this key is not equal because maturin uses different compilation flags.
key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}-
- uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Install Python dependencies
run: python -m pip install --upgrade pip setuptools wheel
- name: Run tests
run: |
export CARGO_HOME="/home/runner/.cargo"
export CARGO_TARGET_DIR="/home/runner/target"
cd arrow-pyarrow-integration-testing
python -m venv venv
source venv/bin/activate
pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 pytz
maturin develop
python -m unittest discover tests
# test the arrow crate builds against wasm32 in stable rust
wasm32-build:
name: Build wasm32 on AMD64 Rust ${{ matrix.rust }}
Expand Down
158 changes: 128 additions & 30 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::sync::Arc;
Expand All @@ -28,8 +29,10 @@ use pyo3::{libc::uintptr_t, prelude::*};

use arrow::array::{make_array_from_raw, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi;
use arrow::ffi::FFI_ArrowSchema;

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
Expand Down Expand Up @@ -68,7 +71,107 @@ impl From<PyO3ArrowError> for PyErr {
}
}

fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
#[pyclass]
struct PyDataType {
inner: DataType,
}

#[pyclass]
struct PyField {
inner: Field,
}

#[pyclass]
struct PySchema {
inner: Schema,
}

#[pymethods]
impl PyDataType {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: dtype })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PyField {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: field })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PySchema {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: schema })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(schema.into())
}
}

impl<'source> FromPyObject<'source> for PyDataType {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyDataType::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PyField {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyField::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PySchema {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PySchema::from_pyarrow(value)
}
}

fn array_to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array_pointer, schema_pointer) =
ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() });
Expand All @@ -82,13 +185,12 @@ fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
)?;

let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) }
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;
Ok(array)
}

fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) =
array.to_raw().map_err(|e| PyO3ArrowError::from(e))?;
fn array_to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) = array.to_raw().map_err(PyO3ArrowError::from)?;

let pa = py.import("pyarrow")?;

Expand All @@ -103,22 +205,17 @@ fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// perform some operation
let array =
array
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(PyO3ArrowError::ArrowError(ArrowError::ParseError(
"Expects an int64".to_string(),
)))?;
let array =
kernels::arithmetic::add(&array, &array).map_err(|e| PyO3ArrowError::from(e))?;
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::ParseError("Expects an int64".to_string()))
})?;
let array = kernels::arithmetic::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
to_py(array, py)
array_to_py(array, py)
}

/// calls a lambda function that receives and returns an array
Expand All @@ -130,11 +227,9 @@ fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef;

// to py
let array = to_py(array, py)?;

let array = lambda.call1(py, (array,))?;

let array = to_rust(array, py)?;
let pyarray = array_to_py(array, py)?;
let pyarray = lambda.call1(py, (pyarray,))?;
let array = array_to_rust(pyarray, py)?;

Ok(array == expected)
}
Expand All @@ -143,42 +238,45 @@ fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// substring
let array = kernels::substring::substring(array.as_ref(), start, &None)
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;

// export
to_py(array, py)
array_to_py(array, py)
}

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;

// export
to_py(array, py)
array_to_py(array, py)
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(array: PyObject, py: Python) -> PyResult<PyObject> {
fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(pyarray, py)?;

// export
to_py(array, py)
array_to_py(array, py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDataType>()?;
m.add_class::<PyField>()?;
m.add_class::<PySchema>()?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
Expand Down
Loading

0 comments on commit 31bc052

Please sign in to comment.