Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40061: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor - add option to cast NULL to NaN #40803

Merged
47 changes: 34 additions & 13 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/record_batch.h"

#include <algorithm>
#include <cmath>
#include <cstdlib>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);

if constexpr (std::is_same_v<In, Out>) {
memcpy(out_values, in_values.data(), in_values.size_bytes());
out_values += in_values.size();
if (in_data.null_count == 0) {
if constexpr (std::is_same_v<In, Out>) {
memcpy(out_values, in_values.data(), in_values.size_bytes());
out_values += in_values.size();
} else {
for (In in_value : in_values) {
*out_values++ = static_cast<Out>(in_value);
}
}
} else {
for (In in_value : in_values) {
*out_values++ = static_cast<Out>(in_value);
for (int64_t i = 0; i < in_data.length; ++i) {
*out_values++ =
in_data.IsNull(i) ? static_cast<Out>(NAN) : static_cast<Out>(in_values[i]);
}
}
return Status::OK();
Expand All @@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
}
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
"Conversion to Tensor for RecordBatches without columns/schema is not "
"supported.");
}
// Check for no validity bitmap of each field
// if null_to_nan conversion is set to false
for (int i = 0; i < num_columns(); ++i) {
if (column(i)->null_count() > 0) {
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
if (column(i)->null_count() > 0 && !null_to_nan) {
return Status::TypeError(
"Can only convert a RecordBatch with no nulls. Set null_to_nan to true to "
"convert nulls to NaN");
}
}

Expand All @@ -308,12 +320,12 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
std::shared_ptr<Field> result_field = schema_->field(0);
std::shared_ptr<DataType> result_type = result_field->type();

if (num_columns() > 1) {
Field::MergeOptions options;
options.promote_integer_to_float = true;
options.promote_integer_sign = true;
options.promote_numeric_width = true;
Field::MergeOptions options;
options.promote_integer_to_float = true;
options.promote_integer_sign = true;
options.promote_numeric_width = true;

if (num_columns() > 1) {
for (int i = 1; i < num_columns(); ++i) {
if (!is_numeric(column(i)->type()->id())) {
return Status::TypeError("DataType is not supported: ",
Expand All @@ -334,6 +346,15 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
result_type = result_field->type();
}

// Check if result_type is signed or unsigned integer and null_to_nan is set to true
// Then all columns should be promoted to float type
if (is_integer(result_type->id()) && null_to_nan) {
ARROW_ASSIGN_OR_RAISE(
result_field,
result_field->MergeWith(field(result_field->name(), float32()), options));
result_type = result_field->type();
}

// Allocate memory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> result,
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch {
/// Create a Tensor object with shape (number of rows, number of columns) and
/// strides (type size in bytes, type size in bytes * number of rows).
/// Generated Tensor will have column-major layout.
///
/// \param[in] null_to_nan if true, convert nulls to NaN
/// \param[in] pool the memory pool to allocate the tensor buffer
/// \return the resulting Tensor
Result<std::shared_ptr<Tensor>> ToTensor(
MemoryPool* pool = default_memory_pool()) const;
bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const;
AlenkaF marked this conversation as resolved.
Show resolved Hide resolved

/// \brief Construct record batch from struct array
///
Expand Down
76 changes: 75 additions & 1 deletion cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(TypeError,
"Type error: Can only convert a RecordBatch with no nulls.",
"Type error: Can only convert a RecordBatch with no nulls. "
"Set null_to_nan to true to convert nulls to NaN",
batch->ToTensor());
}

Expand Down Expand Up @@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
}

TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
const int length = 9;

// int32 + float32 = float64
auto f0 = field("f0", int32());
auto f1 = field("f1", float32());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 2};
const int64_t f64_size = sizeof(double);
std::vector<int64_t> f_strides = {f64_size, f64_size * shape[0]};
std::shared_ptr<Tensor> tensor_expected = TensorFromJSON(
float64(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
shape, f_strides);

EXPECT_FALSE(tensor_expected->Equals(*tensor));
EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor, 18, shape, f_strides);

// int32 -> float64
auto f2 = field("f2", int32());

std::vector<std::shared_ptr<Field>> fields1 = {f0, f2};
auto schema1 = ::arrow::schema(fields1);

auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});

ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor1->Validate());

EXPECT_FALSE(tensor_expected->Equals(*tensor1));
EXPECT_TRUE(tensor_expected->Equals(*tensor1, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);

// int8 -> float32
auto f3 = field("f3", int8());
auto f4 = field("f4", int8());

std::vector<std::shared_ptr<Field>> fields2 = {f3, f4};
auto schema2 = ::arrow::schema(fields2);

auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});

ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor2->Validate());

const int64_t f32_size = sizeof(float);
std::vector<int64_t> f_strides_2 = {f32_size, f32_size * shape[0]};
std::shared_ptr<Tensor> tensor_expected_2 = TensorFromJSON(
float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
shape, f_strides_2);

EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));

CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);
}

TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
const int length = 9;

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CTensor]] ToTensor() const
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* pool) const

cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
shared_ptr[CRecordBatch] batch
Expand Down
49 changes: 46 additions & 3 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular):
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
return pyarrow_wrap_array(c_array)

def to_tensor(self):
def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
"""
Convert to a :class:`~pyarrow.Tensor`.

RecordBatches that can be converted have fields of type signed or unsigned
integer or float, including all bit-widths, with no validity bitmask.
integer or float, including all bit-widths. RecordBatches with validity bitmask
for any of the arrays can be converted with ``null_to_nan``turned to ``True``.
In this case null values are converted to NaN and signed or unsigned integer
type arrays are promoted to appropriate float type.

Parameters
----------
null_to_nan : bool, default False
Whether to write null values in the result as ``NaN``.
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool

Examples
--------
>>> import pyarrow as pa
>>> batch = pa.record_batch(
... [
... pa.array([1, 2, 3, 4, None], type=pa.int32()),
... pa.array([10, 20, 30, 40, None], type=pa.float32()),
... ], names = ["a", "b"]
... )

>>> batch
pyarrow.RecordBatch
a: int32
b: float
----
a: [1,2,3,4,null]
b: [10,20,30,40,null]

>>> batch.to_tensor(null_to_nan=True)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)

>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
[ 4., 40.],
[nan, nan]])
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CTensor] c_tensor
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)

c_record_batch = pyarrow_unwrap_batch(self)
with nogil:
c_tensor = GetResultValue(
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
pool))
return pyarrow_wrap_tensor(c_tensor)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
Expand Down
48 changes: 47 additions & 1 deletion python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null():
arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90]
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.float32()),
pa.array(arr1, type=pa.int32()),
pa.array(arr2, type=pa.float32()),
], ["a", "b"]
)
Expand All @@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null():
):
batch.to_tensor()

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float64).transpose()
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float64()
assert result.shape == expected.shape
assert result.strides == expected.strides

# int32 -> float64
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.int32()),
pa.array(arr2, type=pa.int32()),
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float64()
assert result.shape == expected.shape
assert result.strides == expected.strides

# int8 -> float32
batch = pa.RecordBatch.from_arrays(
[
pa.array(arr1, type=pa.int8()),
pa.array(arr2, type=pa.int8()),
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float32).transpose()
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
assert result.type == pa.float32()
assert result.shape == expected.shape
assert result.strides == expected.strides


def test_recordbatch_to_tensor_empty():
batch = pa.RecordBatch.from_arrays(
Expand Down
Loading