diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 0d8bda9b66e24..6f3b8e75a20d0 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -18,6 +18,7 @@ #include "arrow/record_batch.h" #include +#include #include #include #include @@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor { using In = typename T::c_type; auto in_values = ArraySpan(in_data).GetSpan(1, in_data.length); - if constexpr (std::is_same_v) { - memcpy(out_values, in_values.data(), in_values.size_bytes()); - out_values += in_values.size(); + if (in_data.null_count == 0) { + if constexpr (std::is_same_v) { + memcpy(out_values, in_values.data(), in_values.size_bytes()); + out_values += in_values.size(); + } else { + for (In in_value : in_values) { + *out_values++ = static_cast(in_value); + } + } } else { - for (In in_value : in_values) { - *out_values++ = static_cast(in_value); + for (int64_t i = 0; i < in_data.length; ++i) { + *out_values++ = + in_data.IsNull(i) ? static_cast(NAN) : static_cast(in_values[i]); } } return Status::OK(); @@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) { } } -Result> RecordBatch::ToTensor(MemoryPool* pool) const { +Result> RecordBatch::ToTensor(bool null_to_nan, + MemoryPool* pool) const { if (num_columns() == 0) { return Status::TypeError( "Conversion to Tensor for RecordBatches without columns/schema is not " "supported."); } // Check for no validity bitmap of each field + // if null_to_nan conversion is set to false for (int i = 0; i < num_columns(); ++i) { - if (column(i)->null_count() > 0) { - return Status::TypeError("Can only convert a RecordBatch with no nulls."); + if (column(i)->null_count() > 0 && !null_to_nan) { + return Status::TypeError( + "Can only convert a RecordBatch with no nulls. Set null_to_nan to true to " + "convert nulls to NaN"); } } @@ -308,12 +320,12 @@ Result> RecordBatch::ToTensor(MemoryPool* pool) const { std::shared_ptr result_field = schema_->field(0); std::shared_ptr result_type = result_field->type(); - if (num_columns() > 1) { - Field::MergeOptions options; - options.promote_integer_to_float = true; - options.promote_integer_sign = true; - options.promote_numeric_width = true; + Field::MergeOptions options; + options.promote_integer_to_float = true; + options.promote_integer_sign = true; + options.promote_numeric_width = true; + if (num_columns() > 1) { for (int i = 1; i < num_columns(); ++i) { if (!is_numeric(column(i)->type()->id())) { return Status::TypeError("DataType is not supported: ", @@ -334,6 +346,15 @@ Result> RecordBatch::ToTensor(MemoryPool* pool) const { result_type = result_field->type(); } + // Check if result_type is signed or unsigned integer and null_to_nan is set to true + // Then all columns should be promoted to float type + if (is_integer(result_type->id()) && null_to_nan) { + ARROW_ASSIGN_OR_RAISE( + result_field, + result_field->MergeWith(field(result_field->name(), float32()), options)); + result_type = result_field->type(); + } + // Allocate memory ARROW_ASSIGN_OR_RAISE( std::shared_ptr result, diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 16d721caad443..5202ff4abfa0b 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch { /// Create a Tensor object with shape (number of rows, number of columns) and /// strides (type size in bytes, type size in bytes * number of rows). /// Generated Tensor will have column-major layout. + /// + /// \param[in] null_to_nan if true, convert nulls to NaN + /// \param[in] pool the memory pool to allocate the tensor buffer + /// \return the resulting Tensor Result> ToTensor( - MemoryPool* pool = default_memory_pool()) const; + bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const; /// \brief Construct record batch from struct array /// diff --git a/cpp/src/arrow/record_batch_test.cc b/cpp/src/arrow/record_batch_test.cc index 81154452d7229..7e0eb1d460555 100644 --- a/cpp/src/arrow/record_batch_test.cc +++ b/cpp/src/arrow/record_batch_test.cc @@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) { auto batch = RecordBatch::Make(schema, length, {a0, a1}); ASSERT_RAISES_WITH_MESSAGE(TypeError, - "Type error: Can only convert a RecordBatch with no nulls.", + "Type error: Can only convert a RecordBatch with no nulls. " + "Set null_to_nan to true to convert nulls to NaN", batch->ToTensor()); } @@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) { CheckTensor(tensor, 18, shape, f_strides); } +TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) { + const int length = 9; + + // int32 + float32 = float64 + auto f0 = field("f0", int32()); + auto f1 = field("f1", float32()); + + std::vector> fields = {f0, f1}; + auto schema = ::arrow::schema(fields); + + auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]"); + + auto batch = RecordBatch::Make(schema, length, {a0, a1}); + + ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true)); + ASSERT_OK(tensor->Validate()); + + std::vector shape = {9, 2}; + const int64_t f64_size = sizeof(double); + std::vector f_strides = {f64_size, f64_size * shape[0]}; + std::shared_ptr tensor_expected = TensorFromJSON( + float64(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]", + shape, f_strides); + + EXPECT_FALSE(tensor_expected->Equals(*tensor)); + EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true))); + + CheckTensor(tensor, 18, shape, f_strides); + + // int32 -> float64 + auto f2 = field("f2", int32()); + + std::vector> fields1 = {f0, f2}; + auto schema1 = ::arrow::schema(fields1); + + auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]"); + auto batch1 = RecordBatch::Make(schema1, length, {a0, a2}); + + ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true)); + ASSERT_OK(tensor1->Validate()); + + EXPECT_FALSE(tensor_expected->Equals(*tensor1)); + EXPECT_TRUE(tensor_expected->Equals(*tensor1, EqualOptions().nans_equal(true))); + + CheckTensor(tensor1, 18, shape, f_strides); + + // int8 -> float32 + auto f3 = field("f3", int8()); + auto f4 = field("f4", int8()); + + std::vector> fields2 = {f3, f4}; + auto schema2 = ::arrow::schema(fields2); + + auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]"); + auto batch2 = RecordBatch::Make(schema2, length, {a3, a4}); + + ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true)); + ASSERT_OK(tensor2->Validate()); + + const int64_t f32_size = sizeof(float); + std::vector f_strides_2 = {f32_size, f32_size * shape[0]}; + std::shared_ptr tensor_expected_2 = TensorFromJSON( + float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]", + shape, f_strides_2); + + EXPECT_FALSE(tensor_expected_2->Equals(*tensor2)); + EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true))); + + CheckTensor(tensor2, 18, shape, f_strides_2); +} + TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) { const int length = 9; diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9e5e3d3fa683b..aa50dd189a82d 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) - CResult[shared_ptr[CTensor]] ToTensor() const + CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* pool) const cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata": shared_ptr[CRecordBatch] batch diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 1ab3fd04ed9f0..54fda1da7dcaf 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular): deref(c_record_batch).ToStructArray()) return pyarrow_wrap_array(c_array) - def to_tensor(self): + def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None): """ Convert to a :class:`~pyarrow.Tensor`. RecordBatches that can be converted have fields of type signed or unsigned - integer or float, including all bit-widths, with no validity bitmask. + integer or float, including all bit-widths. RecordBatches with validity bitmask + for any of the arrays can be converted with ``null_to_nan``turned to ``True``. + In this case null values are converted to NaN and signed or unsigned integer + type arrays are promoted to appropriate float type. + + Parameters + ---------- + null_to_nan : bool, default False + Whether to write null values in the result as ``NaN``. + memory_pool : MemoryPool, default None + For memory allocations, if required, otherwise use default pool + + Examples + -------- + >>> import pyarrow as pa + >>> batch = pa.record_batch( + ... [ + ... pa.array([1, 2, 3, 4, None], type=pa.int32()), + ... pa.array([10, 20, 30, 40, None], type=pa.float32()), + ... ], names = ["a", "b"] + ... ) + + >>> batch + pyarrow.RecordBatch + a: int32 + b: float + ---- + a: [1,2,3,4,null] + b: [10,20,30,40,null] + + >>> batch.to_tensor(null_to_nan=True) + + type: double + shape: (5, 2) + strides: (8, 40) + + >>> batch.to_tensor(null_to_nan=True).to_numpy() + array([[ 1., 10.], + [ 2., 20.], + [ 3., 30.], + [ 4., 40.], + [nan, nan]]) """ cdef: shared_ptr[CRecordBatch] c_record_batch shared_ptr[CTensor] c_tensor + CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) c_record_batch = pyarrow_unwrap_batch(self) with nogil: c_tensor = GetResultValue( - deref(c_record_batch).ToTensor()) + deref(c_record_batch).ToTensor(null_to_nan, + pool)) return pyarrow_wrap_tensor(c_tensor) def _export_to_c(self, out_ptr, out_schema_ptr=0): diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index a7d917c2baf2d..8e30574188763 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null(): arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90] batch = pa.RecordBatch.from_arrays( [ - pa.array(arr1, type=pa.float32()), + pa.array(arr1, type=pa.int32()), pa.array(arr2, type=pa.float32()), ], ["a", "b"] ) @@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null(): ): batch.to_tensor() + result = batch.to_tensor(null_to_nan=True) + + x = np.array([arr1, arr2], np.float64).transpose() + expected = pa.Tensor.from_numpy(x) + + np.testing.assert_equal(result.to_numpy(), x) + assert result.size == 18 + assert result.type == pa.float64() + assert result.shape == expected.shape + assert result.strides == expected.strides + + # int32 -> float64 + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.int32()), + pa.array(arr2, type=pa.int32()), + ], ["a", "b"] + ) + + result = batch.to_tensor(null_to_nan=True) + + np.testing.assert_equal(result.to_numpy(), x) + assert result.size == 18 + assert result.type == pa.float64() + assert result.shape == expected.shape + assert result.strides == expected.strides + + # int8 -> float32 + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.int8()), + pa.array(arr2, type=pa.int8()), + ], ["a", "b"] + ) + + result = batch.to_tensor(null_to_nan=True) + + x = np.array([arr1, arr2], np.float32).transpose() + expected = pa.Tensor.from_numpy(x) + + np.testing.assert_equal(result.to_numpy(), x) + assert result.size == 18 + assert result.type == pa.float32() + assert result.shape == expected.shape + assert result.strides == expected.strides + def test_recordbatch_to_tensor_empty(): batch = pa.RecordBatch.from_arrays(