Skip to content

Commit

Permalink
e
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Aug 12, 2021
1 parent fa5acd9 commit 90318df
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 160 deletions.
4 changes: 2 additions & 2 deletions arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ name = "arrow_pyarrow_integration_testing"
crate-type = ["cdylib"]

[dependencies]
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT" }
pyo3 = { version = "0.12.1", features = ["extension-module"] }
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT", features = ["pyarrow"] }
pyo3 = { version = "0.14", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
216 changes: 61 additions & 155 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::{libc::uintptr_t, prelude::*};
//use libc::uintptr_t;

use arrow::array::{make_array_from_raw, ArrayRef, Int64Array};
use arrow::array::{Array, ArrayData, ArrayRef, Int32Array, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi;
//use arrow::ffi;
use arrow::ffi::FFI_ArrowSchema;
use arrow::pyarrow::PyArrowConvert;

use pyo3::FromPyObject;

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
Expand Down Expand Up @@ -86,136 +90,38 @@ struct PySchema {
inner: Schema,
}

#[pymethods]
impl PyDataType {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: dtype })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PyField {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: field })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PySchema {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: schema })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(schema.into())
}
}

impl<'source> FromPyObject<'source> for PyDataType {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyDataType::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PyField {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyField::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PySchema {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PySchema::from_pyarrow(value)
}
}

fn array_to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array_pointer, schema_pointer) =
ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() });

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(
py,
"_export_to_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
// impl<'source> FromPyObject<'source> for PyDataType {
// fn extract(value: &'source PyAny) -> PyResult<Self> {
// PyDataType::from_pyarrow(value)
// }
// }

let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) }
.map_err(PyO3ArrowError::from)?;
Ok(array)
}
// impl<'source> FromPyObject<'source> for PyField {
// fn extract(value: &'source PyAny) -> PyResult<Self> {
// PyField::from_pyarrow(value)
// }
// }

fn array_to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) = array.to_raw().map_err(PyO3ArrowError::from)?;

let pa = py.import("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
Ok(array.to_object(py))
}
// impl<'source> FromPyObject<'source> for PySchema {
// fn extract(value: &'source PyAny) -> PyResult<Self> {
// PySchema::from_pyarrow(value)
// }
// }

/// Returns `array + array` of an int64 array.
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
fn double(array: &PyAny, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
let array = ArrayRef::from_pyarrow(array)?;

// perform some operation
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::ParseError("Expects an int64".to_string()))
})?;
let array = kernels::arithmetic::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
array_to_py(array, py)
array.to_pyarrow(py)
}

/// calls a lambda function that receives and returns an array
Expand All @@ -227,60 +133,60 @@ fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef;

// to py
let pyarray = array_to_py(array, py)?;
let pyarray = array.into_py()?;
let pyarray = lambda.call1(py, (pyarray,))?;
let array = array_to_rust(pyarray, py)?;
let array = ArrayRef::from_pyarrow(pyarray)?;

Ok(array == expected)
}

/// Returns the substring
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
// /// Returns the substring
// #[pyfunction]
// fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// // import
// let array = array_to_rust(array, py)?;

// substring
let array = kernels::substring::substring(array.as_ref(), start, &None)
.map_err(PyO3ArrowError::from)?;
// // substring
// let array = kernels::substring::substring(array.as_ref(), start, &None)
// .map_err(PyO3ArrowError::from)?;

// export
array_to_py(array, py)
}
// // export
// array_to_py(array, py)
// }

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
// /// Returns the concatenate
// #[pyfunction]
// fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// // import
// let array = array_to_rust(array, py)?;

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
.map_err(PyO3ArrowError::from)?;
// // concat
// let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
// .map_err(PyO3ArrowError::from)?;

// export
array_to_py(array, py)
}
// // export
// array_to_py(array, py)
// }

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(pyarray, py)?;
// /// Converts to rust and back to python
// #[pyfunction]
// fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// // import
// let array = array_to_rust(pyarray, py)?;

// export
array_to_py(array, py)
}
// // export
// array_to_py(array, py)
// }

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDataType>()?;
m.add_class::<PyField>()?;
m.add_class::<PySchema>()?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
m.add_wrapped(wrap_pyfunction!(concatenate))?;
m.add_wrapped(wrap_pyfunction!(round_trip))?;
// m.add_wrapped(wrap_pyfunction!(double_py))?;
// m.add_wrapped(wrap_pyfunction!(substring))?;
// m.add_wrapped(wrap_pyfunction!(concatenate))?;
// m.add_wrapped(wrap_pyfunction!(round_trip))?;
Ok(())
}
7 changes: 5 additions & 2 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ chrono = "0.4"
flatbuffers = { version = "=2.0.0", optional = true }
hex = "0.4"
prettytable-rs = { version = "0.8.0", optional = true }
pyo3 = { version = "0.14", optional = true } #features = ["extension-module"] }
libc = { version = "0.2", optional = true }
lexical-core = "^0.7"
multiversion = "0.6.1"
bitflags = "1.2.1"
Expand All @@ -62,14 +64,15 @@ ipc = ["flatbuffers"]
simd = ["packed_simd"]
prettyprint = ["prettytable-rs"]
# The test utils feature enables code used in benchmarks and tests but
# not the core arrow code itself. Be aware that `rand` must be kept as
# an optional dependency for supporting compile to wasm32-unknown-unknown
# not the core arrow code itself. Be aware that `rand` must be kept as
# an optional dependency for supporting compile to wasm32-unknown-unknown
# target without assuming an environment containing JavaScript.
test_utils = ["rand"]
# this is only intended to be used in single-threaded programs: it verifies that
# all allocated memory is being released (no memory leaks).
# See README for details
memory-check = []
pyarrow = ["pyo3", "libc"]

[dev-dependencies]
rand = "0.8"
Expand Down
9 changes: 8 additions & 1 deletion arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::convert::{From, TryFrom};
use std::fmt;
use std::sync::Arc;
use std::{any::Any, convert::TryFrom};

use super::*;
use crate::array::equal_json::JsonEqual;
Expand Down Expand Up @@ -334,6 +335,12 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
}
}

impl From<ArrayData> for ArrayRef {
fn from(data: ArrayData) -> Self {
make_array(data)
}
}

/// Creates a new empty array
///
/// ```
Expand Down
2 changes: 2 additions & 0 deletions arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub mod ffi;
#[cfg(feature = "ipc")]
pub mod ipc;
pub mod json;
#[cfg(feature = "pyarrow")]
pub mod pyarrow;
pub mod record_batch;
pub mod temporal_conversions;
pub mod tensor;
Expand Down
Loading

0 comments on commit 90318df

Please sign in to comment.