Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write to Arrow Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 21, 2022
1 parent 22f0c3e commit cf2b10b
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 10 deletions.
20 changes: 19 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,25 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [

"io_csv",
"io_csv_async",
"io_json",
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_read_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
"regex",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[build-system]
requires = ["maturin"]
requires = ["maturin>=0.12,<0.13"]
build-backend = "maturin"
30 changes: 30 additions & 0 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

use arrow2::array::Int32Array;
use arrow2::ffi;

use super::*;
Expand All @@ -26,3 +27,32 @@ pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
}
Ok(arrays)
}

pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
//let arrays = vec![array.clone(), array.clone(), array];
let arrays: Vec<Arc<dyn Array>> = vec![];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
6 changes: 6 additions & 0 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,16 @@ pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
c_stream::to_rust_iterator(ob, py)
}

#[pyfunction]
pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
c_stream::from_rust_iterator(py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?;
m.add_function(wrap_pyfunction!(from_rust_iterator, m)?)?;
Ok(())
}
11 changes: 8 additions & 3 deletions arrow-pyarrow-integration-testing/tests/test_c_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

class TestCase(unittest.TestCase):
def test_rust_reads(self):
"""
Python -> Rust -> Python
"""
schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())])
a = pyarrow.array([1, None, 2], type=pyarrow.int32())

Expand All @@ -20,3 +17,11 @@ def test_rust_reads(self):

array = arrays[0].field(0)
assert array == a

# see https://issues.apache.org/jira/browse/ARROW-15747
def _test_pyarrow_reads(self):
stream = arrow_pyarrow_integration_testing.from_rust_iterator()

arrays = [a for a in stream]

assert False
2 changes: 1 addition & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::error::Result;
use self::schema::to_field;

pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema};
pub use stream::ArrowArrayStreamReader;
pub use stream::{export_iterator, ArrowArrayStreamReader};

/// Exports an [`Arc<dyn Array>`] to the C data interface.
/// # Safety
Expand Down
101 changes: 97 additions & 4 deletions src/ffi/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ffi::CStr;
use std::ffi::{CStr, CString};
use std::sync::Arc;

use crate::{array::Array, datatypes::Field, error::ArrowError};

use super::{import_array_from_c, import_field_from_c};
use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c};
use super::{ArrowArray, ArrowArrayStream, ArrowSchema};

impl Drop for ArrowArrayStream {
Expand Down Expand Up @@ -48,8 +49,7 @@ unsafe fn handle_error(iter: &mut ArrowArrayStream) -> ArrowError {
)
}

/// Interface for the Arrow C stream interface. Implements an iterator of [`Array`].
///
/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html).
pub struct ArrowArrayStreamReader {
iter: Box<ArrowArrayStream>,
field: Field,
Expand Down Expand Up @@ -127,3 +127,96 @@ impl ArrowArrayStreamReader {
.transpose()
}
}

struct PrivateData {
iter: Box<dyn Iterator<Item = Result<Arc<dyn Array>, ArrowError>>>,
field: Field,
error: Option<CString>,
}

unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
if iter.is_null() {
return 2001;
}
let mut private = &mut *((*iter).private_data as *mut PrivateData);

match private.iter.next() {
Some(Ok(item)) => {
// check that the array has the same data_type as field
let item_dt = item.data_type();
let expected_dt = private.field.data_type();
if item_dt != expected_dt {
private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
return 2001; // custom application specific error (since this is never a result of this interface)
}

export_array_to_c(item, array);
private.error = None;
0
}
Some(Err(err)) => {
private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
2001 // custom application specific error (since this is never a result of this interface)
}
None => {
*array = ArrowArray::empty();
private.error = None;
0
}
}
}

unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);

export_field_to_c(&private.field, schema);
0
}

unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
if iter.is_null() {
return std::ptr::null();
}
let private = &mut *((*iter).private_data as *mut PrivateData);

private
.error
.as_ref()
.map(|x| x.as_ptr())
.unwrap_or(std::ptr::null())
}

unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
if iter.is_null() {
return;
}
let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
(*iter).release = None;
// private drops automatically
}

/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html)
/// # Safety
/// The pointer `consumer` must be allocated
pub unsafe fn export_iterator(
iter: Box<dyn Iterator<Item = Result<Arc<dyn Array>, ArrowError>>>,
field: Field,
consumer: *mut ArrowArrayStream,
) {
let private_data = Box::new(PrivateData {
iter,
field,
error: None,
});

*consumer = ArrowArrayStream {
get_schema: Some(get_schema),
get_next: Some(get_next),
get_last_error: Some(get_last_error),
release: Some(release),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
File renamed without changes.
2 changes: 2 additions & 0 deletions tests/it/ffi/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod data;
mod stream;
35 changes: 35 additions & 0 deletions tests/it/ffi/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use arrow2::array::*;
use arrow2::bitmap::Bitmap;
use arrow2::datatypes::{DataType, Field, TimeUnit};
use arrow2::{error::Result, ffi};

fn _test_round_trip(arrays: Vec<Arc<dyn Array>>) -> Result<()> {
let field = Field::new("a", arrays[0].data_type().clone(), true);
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;

let mut stream = Box::new(ffi::ArrowArrayStream::empty());

unsafe { ffi::export_iterator(iter, field.clone(), &mut *stream) }

let mut stream = unsafe { ffi::ArrowArrayStreamReader::try_new(stream)? };

let mut produced_arrays: Vec<Arc<dyn Array>> = vec![];
while let Some(array) = unsafe { stream.next() } {
produced_arrays.push(array?.into());
}

assert_eq!(produced_arrays, arrays);
assert_eq!(stream.field(), &field);
Ok(())
}

#[test]
fn round_trip() -> Result<()> {
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array: Arc<dyn Array> = Arc::new(array.clone());

_test_round_trip(vec![array.clone(), array.clone(), array])
}

0 comments on commit cf2b10b

Please sign in to comment.