From 3ba6d286caad328b8572a3b9228045da8c8d2043 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Tue, 5 Mar 2024 09:15:42 +0100 Subject: [PATCH] GH-40059: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor (#40064) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Rationale for this change There is no method currently in Arrow C++ to convert `Table` or `RecordBatch` to a `Tensor`. In https://github.com/apache/arrow/issues/40058 we are proposing to add the conversion and this PR starts with the basic implementation for `RecordBatch`. ### What changes are included in this PR? Basic conversion `RecordBatch` → `Tensor` is added together with Python bindings. The implementation details are: - One data type (all columns having for example an `int32` data type) support. - No missing values support (only `NaN`). - Column-major layout of the resulting `Tensor`. ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * Closes: #40059 Lead-authored-by: AlenkaF Co-authored-by: Alenka Frim Co-authored-by: Benjamin Kietzman Signed-off-by: Joris Van den Bossche --- cpp/src/arrow/record_batch.cc | 92 +++++++++++ cpp/src/arrow/record_batch.h | 8 + cpp/src/arrow/record_batch_test.cc | 229 +++++++++++++++++++++++++++ python/pyarrow/includes/libarrow.pxd | 2 + python/pyarrow/table.pxi | 14 ++ python/pyarrow/tests/test_table.py | 142 +++++++++++++++++ 6 files changed, 487 insertions(+) diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index ca6b45af3d6b4..d23b2b584bc20 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -30,6 +30,7 @@ #include "arrow/pretty_print.h" #include "arrow/status.h" #include "arrow/table.h" +#include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" @@ -247,6 +248,97 @@ Result> RecordBatch::ToStructArray() const { /*offset=*/0); } +template +inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) { + using CType = typename arrow::TypeTraits::CType; + auto* out_values = reinterpret_cast(out); + + // Loop through all of the columns + for (int i = 0; i < batch.num_columns(); ++i) { + const auto* in_values = batch.column(i)->data()->GetValues(1); + + // Copy data of each column + memcpy(out_values, in_values, sizeof(CType) * batch.num_rows()); + out_values += batch.num_rows(); + } // End loop through columns +} + +Result> RecordBatch::ToTensor(MemoryPool* pool) const { + if (num_columns() == 0) { + return Status::TypeError( + "Conversion to Tensor for RecordBatches without columns/schema is not " + "supported."); + } + const auto& type = column(0)->type(); + // Check for supported data types + if (!is_integer(type->id()) && !is_floating(type->id())) { + return Status::TypeError("DataType is not supported: ", type->ToString()); + } + // Check for uniform data type + // Check for no validity bitmap of each field + for (int i = 0; i < num_columns(); ++i) { + if (column(i)->null_count() > 0) { + return Status::TypeError("Can only convert a RecordBatch with no nulls."); + } + if (column(i)->type() != type) { + return Status::TypeError("Can only convert a RecordBatch with uniform data type."); + } + } + + // Allocate memory + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr result, + AllocateBuffer(type->bit_width() * num_columns() * num_rows(), pool)); + // Copy data + switch (type->id()) { + case Type::UINT8: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::UINT16: + case Type::HALF_FLOAT: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::UINT32: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::UINT64: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::INT8: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::INT16: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::INT32: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::INT64: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::FLOAT: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + case Type::DOUBLE: + ConvertColumnsToTensor(*this, result->mutable_data()); + break; + default: + return Status::TypeError("DataType is not supported: ", type->ToString()); + } + + // Construct Tensor object + const auto& fixed_width_type = + internal::checked_cast(*column(0)->type()); + std::vector shape = {num_rows(), num_columns()}; + std::vector strides; + ARROW_RETURN_NOT_OK( + internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides)); + ARROW_ASSIGN_OR_RAISE(auto tensor, + Tensor::Make(type, std::move(result), shape, strides)); + + return tensor; +} + const std::string& RecordBatch::column_name(int i) const { return schema_->field(i)->name(); } diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 79f93a7b5997f..8a2c1ba6d7497 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -80,6 +80,14 @@ class ARROW_EXPORT RecordBatch { /// in the resulting struct array. Result> ToStructArray() const; + /// \brief Convert record batch with one data type to Tensor + /// + /// Create a Tensor object with shape (number of rows, number of columns) and + /// strides (type size in bytes, type size in bytes * number of rows). + /// Generated Tensor will have column-major layout. + Result> ToTensor( + MemoryPool* pool = default_memory_pool()) const; + /// \brief Construct record batch from struct array /// /// This constructs a record batch using the child arrays of the given diff --git a/cpp/src/arrow/record_batch_test.cc b/cpp/src/arrow/record_batch_test.cc index db3a2d3def73f..05a20aa487abc 100644 --- a/cpp/src/arrow/record_batch_test.cc +++ b/cpp/src/arrow/record_batch_test.cc @@ -31,6 +31,7 @@ #include "arrow/chunked_array.h" #include "arrow/status.h" #include "arrow/table.h" +#include "arrow/tensor.h" #include "arrow/testing/builder.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -592,4 +593,232 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) { ASSERT_BATCHES_EQUAL(*batch, *null_batch); } +TEST_F(TestRecordBatch, ToTensorUnsupported) { + const int length = 9; + + // Mixed data type + auto f0 = field("f0", int32()); + auto f1 = field("f1", int64()); + + std::vector> fields = {f0, f1}; + auto schema = ::arrow::schema(fields); + + auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a1 = ArrayFromJSON(int64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]"); + + auto batch = RecordBatch::Make(schema, length, {a0, a1}); + + ASSERT_RAISES_WITH_MESSAGE( + TypeError, "Type error: Can only convert a RecordBatch with uniform data type.", + batch->ToTensor()); + + // Unsupported data type + auto f2 = field("f2", utf8()); + + std::vector> fields_1 = {f2}; + auto schema_2 = ::arrow::schema(fields_1); + + auto a2 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])"); + auto batch_2 = RecordBatch::Make(schema_2, length, {a2}); + + ASSERT_RAISES_WITH_MESSAGE( + TypeError, "Type error: DataType is not supported: " + a2->type()->ToString(), + batch_2->ToTensor()); +} + +TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) { + const int length = 9; + + auto f0 = field("f0", int32()); + auto f1 = field("f1", int32()); + + std::vector> fields = {f0, f1}; + auto schema = ::arrow::schema(fields); + + auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a1 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]"); + + auto batch = RecordBatch::Make(schema, length, {a0, a1}); + + ASSERT_RAISES_WITH_MESSAGE(TypeError, + "Type error: Can only convert a RecordBatch with no nulls.", + batch->ToTensor()); +} + +TEST_F(TestRecordBatch, ToTensorEmptyBatch) { + auto f0 = field("f0", int32()); + auto f1 = field("f1", int32()); + + std::vector> fields = {f0, f1}; + auto schema = ::arrow::schema(fields); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr empty, + RecordBatch::MakeEmpty(schema)); + + ASSERT_OK_AND_ASSIGN(auto tensor, empty->ToTensor()); + ASSERT_OK(tensor->Validate()); + + const std::vector strides = {4, 4}; + const std::vector shape = {0, 2}; + + EXPECT_EQ(strides, tensor->strides()); + EXPECT_EQ(shape, tensor->shape()); + + auto batch_no_columns = + RecordBatch::Make(::arrow::schema({}), 10, std::vector>{}); + + ASSERT_RAISES_WITH_MESSAGE(TypeError, + "Type error: Conversion to Tensor for RecordBatches without " + "columns/schema is not supported.", + batch_no_columns->ToTensor()); +} + +template +void CheckTensor(const std::shared_ptr& tensor, const int size, + const std::vector shape, const std::vector f_strides) { + EXPECT_EQ(size, tensor->size()); + EXPECT_EQ(TypeTraits::type_singleton(), tensor->type()); + EXPECT_EQ(shape, tensor->shape()); + EXPECT_EQ(f_strides, tensor->strides()); + EXPECT_FALSE(tensor->is_row_major()); + EXPECT_TRUE(tensor->is_column_major()); + EXPECT_TRUE(tensor->is_contiguous()); +} + +TEST_F(TestRecordBatch, ToTensorSupportedNaN) { + const int length = 9; + + auto f0 = field("f0", float32()); + auto f1 = field("f1", float32()); + + std::vector> fields = {f0, f1}; + auto schema = ::arrow::schema(fields); + + auto a0 = ArrayFromJSON(float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, NaN, 60, 70, 80, 90]"); + + auto batch = RecordBatch::Make(schema, length, {a0, a1}); + + ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor()); + ASSERT_OK(tensor->Validate()); + + std::vector shape = {9, 2}; + const int64_t f32_size = sizeof(float); + std::vector f_strides = {f32_size, f32_size * shape[0]}; + std::vector f_values = { + static_cast(NAN), 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, + static_cast(NAN), 60, 70, 80, 90}; + auto data = Buffer::Wrap(f_values); + + std::shared_ptr tensor_expected; + ASSERT_OK_AND_ASSIGN(tensor_expected, Tensor::Make(float32(), data, shape, f_strides)); + + EXPECT_FALSE(tensor_expected->Equals(*tensor)); + EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true))); + + CheckTensor(tensor, 18, shape, f_strides); +} + +template +class TestBatchToTensor : public ::testing::Test {}; + +TYPED_TEST_SUITE_P(TestBatchToTensor); + +TYPED_TEST_P(TestBatchToTensor, SupportedTypes) { + using DataType = TypeParam; + using c_data_type = typename DataType::c_type; + const int unit_size = sizeof(c_data_type); + + const int length = 9; + + auto f0 = field("f0", TypeTraits::type_singleton()); + auto f1 = field("f1", TypeTraits::type_singleton()); + auto f2 = field("f2", TypeTraits::type_singleton()); + + std::vector> fields = {f0, f1, f2}; + auto schema = ::arrow::schema(fields); + + auto a0 = ArrayFromJSON(TypeTraits::type_singleton(), + "[1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto a1 = ArrayFromJSON(TypeTraits::type_singleton(), + "[10, 20, 30, 40, 50, 60, 70, 80, 90]"); + auto a2 = ArrayFromJSON(TypeTraits::type_singleton(), + "[100, 100, 100, 100, 100, 100, 100, 100, 100]"); + + auto batch = RecordBatch::Make(schema, length, {a0, a1, a2}); + + ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor()); + ASSERT_OK(tensor->Validate()); + + std::vector shape = {9, 3}; + std::vector f_strides = {unit_size, unit_size * shape[0]}; + std::vector f_values = {1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 20, 30, 40, 50, 60, 70, 80, 90, + 100, 100, 100, 100, 100, 100, 100, 100, 100}; + auto data = Buffer::Wrap(f_values); + + std::shared_ptr tensor_expected; + ASSERT_OK_AND_ASSIGN( + tensor_expected, + Tensor::Make(TypeTraits::type_singleton(), data, shape, f_strides)); + + EXPECT_TRUE(tensor_expected->Equals(*tensor)); + CheckTensor(tensor, 27, shape, f_strides); + + // Test offsets + auto batch_slice = batch->Slice(1); + + ASSERT_OK_AND_ASSIGN(auto tensor_sliced, batch_slice->ToTensor()); + ASSERT_OK(tensor_sliced->Validate()); + + std::vector shape_sliced = {8, 3}; + std::vector f_strides_sliced = {unit_size, unit_size * shape_sliced[0]}; + std::vector f_values_sliced = {2, 3, 4, 5, 6, 7, 8, 9, + 20, 30, 40, 50, 60, 70, 80, 90, + 100, 100, 100, 100, 100, 100, 100, 100}; + auto data_sliced = Buffer::Wrap(f_values_sliced); + + std::shared_ptr tensor_expected_sliced; + ASSERT_OK_AND_ASSIGN(tensor_expected_sliced, + Tensor::Make(TypeTraits::type_singleton(), data_sliced, + shape_sliced, f_strides_sliced)); + + EXPECT_TRUE(tensor_expected_sliced->Equals(*tensor_sliced)); + CheckTensor(tensor_expected_sliced, 24, shape_sliced, f_strides_sliced); + + auto batch_slice_1 = batch->Slice(1, 5); + + ASSERT_OK_AND_ASSIGN(auto tensor_sliced_1, batch_slice_1->ToTensor()); + ASSERT_OK(tensor_sliced_1->Validate()); + + std::vector shape_sliced_1 = {5, 3}; + std::vector f_strides_sliced_1 = {unit_size, unit_size * shape_sliced_1[0]}; + std::vector f_values_sliced_1 = { + 2, 3, 4, 5, 6, 20, 30, 40, 50, 60, 100, 100, 100, 100, 100, + }; + auto data_sliced_1 = Buffer::Wrap(f_values_sliced_1); + + std::shared_ptr tensor_expected_sliced_1; + ASSERT_OK_AND_ASSIGN(tensor_expected_sliced_1, + Tensor::Make(TypeTraits::type_singleton(), data_sliced_1, + shape_sliced_1, f_strides_sliced_1)); + + EXPECT_TRUE(tensor_expected_sliced_1->Equals(*tensor_sliced_1)); + CheckTensor(tensor_expected_sliced_1, 15, shape_sliced_1, f_strides_sliced_1); +} + +REGISTER_TYPED_TEST_SUITE_P(TestBatchToTensor, SupportedTypes); + +INSTANTIATE_TYPED_TEST_SUITE_P(UInt8, TestBatchToTensor, UInt8Type); +INSTANTIATE_TYPED_TEST_SUITE_P(UInt16, TestBatchToTensor, UInt16Type); +INSTANTIATE_TYPED_TEST_SUITE_P(UInt32, TestBatchToTensor, UInt32Type); +INSTANTIATE_TYPED_TEST_SUITE_P(UInt64, TestBatchToTensor, UInt64Type); +INSTANTIATE_TYPED_TEST_SUITE_P(Int8, TestBatchToTensor, Int8Type); +INSTANTIATE_TYPED_TEST_SUITE_P(Int16, TestBatchToTensor, Int16Type); +INSTANTIATE_TYPED_TEST_SUITE_P(Int32, TestBatchToTensor, Int32Type); +INSTANTIATE_TYPED_TEST_SUITE_P(Int64, TestBatchToTensor, Int64Type); +INSTANTIATE_TYPED_TEST_SUITE_P(Float16, TestBatchToTensor, HalfFloatType); +INSTANTIATE_TYPED_TEST_SUITE_P(Float32, TestBatchToTensor, FloatType); +INSTANTIATE_TYPED_TEST_SUITE_P(Float64, TestBatchToTensor, DoubleType); + } // namespace arrow diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index bc9d05ddbbc37..5ae0f2e0b55b9 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -977,6 +977,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) + CResult[shared_ptr[CTensor]] ToTensor() const + cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata": shared_ptr[CRecordBatch] batch # The struct in C++ does not actually have these two `const` qualifiers, but diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index d7f7895b538e8..dfd549befc2fe 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3010,6 +3010,20 @@ cdef class RecordBatch(_Tabular): deref(c_record_batch).ToStructArray()) return pyarrow_wrap_array(c_array) + def to_tensor(self): + """ + Convert to a :class:`~pyarrow.Tensor`. + """ + cdef: + shared_ptr[CRecordBatch] c_record_batch + shared_ptr[CTensor] c_tensor + + c_record_batch = pyarrow_unwrap_batch(self) + with nogil: + c_tensor = GetResultValue( + deref(c_record_batch).ToTensor()) + return pyarrow_wrap_tensor(c_tensor) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index f0fd5518de067..87b17c35011c4 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -962,6 +962,148 @@ def test_table_to_struct_array_with_max_chunksize(): )) +def check_tensors(tensor, expected_tensor, type, size): + assert tensor.equals(expected_tensor) + assert tensor.size == size + assert tensor.type == type + assert tensor.shape == expected_tensor.shape + assert tensor.strides == expected_tensor.strides + + +@pytest.mark.parametrize('typ', [ + np.uint8, np.uint16, np.uint32, np.uint64, + np.int8, np.int16, np.int32, np.int64, + np.float32, np.float64, +]) +def test_recordbatch_to_tensor(typ): + arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] + arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] + arr3 = [100, 100, 100, 100, 100, 100, 100, 100, 100] + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.from_numpy_dtype(typ)), + pa.array(arr2, type=pa.from_numpy_dtype(typ)), + pa.array(arr3, type=pa.from_numpy_dtype(typ)), + ], ["a", "b", "c"] + ) + result = batch.to_tensor() + + x = np.array([arr1, arr2, arr3], typ).transpose() + expected = pa.Tensor.from_numpy(x) + + check_tensors(result, expected, pa.from_numpy_dtype(typ), 27) + + # Test offset + batch1 = batch.slice(1) + result = batch1.to_tensor() + + arr1 = [2, 3, 4, 5, 6, 7, 8, 9] + arr2 = [20, 30, 40, 50, 60, 70, 80, 90] + arr3 = [100, 100, 100, 100, 100, 100, 100, 100] + + x = np.array([arr1, arr2, arr3], typ).transpose() + expected = pa.Tensor.from_numpy(x) + + check_tensors(result, expected, pa.from_numpy_dtype(typ), 24) + + batch2 = batch.slice(1, 5) + result = batch2.to_tensor() + + arr1 = [2, 3, 4, 5, 6] + arr2 = [20, 30, 40, 50, 60] + arr3 = [100, 100, 100, 100, 100] + + x = np.array([arr1, arr2, arr3], typ).transpose() + expected = pa.Tensor.from_numpy(x) + + check_tensors(result, expected, pa.from_numpy_dtype(typ), 15) + + +def test_recordbatch_to_tensor_nan(): + arr1 = [1, 2, 3, 4, np.nan, 6, 7, 8, 9] + arr2 = [10, 20, 30, 40, 50, 60, 70, np.nan, 90] + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.float32()), + pa.array(arr2, type=pa.float32()), + ], ["a", "b"] + ) + result = batch.to_tensor() + + x = np.array([arr1, arr2], np.float32).transpose() + expected = pa.Tensor.from_numpy(x) + + np.testing.assert_equal(result.to_numpy(), x) + assert result.size == 18 + assert result.type == pa.float32() + assert result.shape == expected.shape + assert result.strides == expected.strides + + +def test_recordbatch_to_tensor_null(): + arr1 = [1, 2, 3, 4, None, 6, 7, 8, 9] + arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90] + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.float32()), + pa.array(arr2, type=pa.float32()), + ], ["a", "b"] + ) + with pytest.raises( + pa.ArrowTypeError, + match="Can only convert a RecordBatch with no nulls." + ): + batch.to_tensor() + + +def test_recordbatch_to_tensor_empty(): + batch = pa.RecordBatch.from_arrays( + [ + pa.array([], type=pa.float32()), + pa.array([], type=pa.float32()), + ], ["a", "b"] + ) + result = batch.to_tensor() + + x = np.array([[], []], np.float32).transpose() + expected = pa.Tensor.from_numpy(x) + + assert result.size == expected.size + assert result.type == pa.float32() + assert result.shape == expected.shape + assert result.strides == (4, 4) + + +def test_recordbatch_to_tensor_unsupported(): + # Mixed data type + arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] + arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr1, type=pa.int32()), + pa.array(arr2, type=pa.float32()), + ], ["a", "b"] + ) + with pytest.raises( + pa.ArrowTypeError, + match="Can only convert a RecordBatch with uniform data type." + ): + batch.to_tensor() + + # Unsupported data type + arr3 = ["a", "b", "c", "a", "b", "c", "a", "b", "c"] + batch = pa.RecordBatch.from_arrays( + [ + pa.array(arr3, type=pa.utf8()), + ], ["c"] + ) + with pytest.raises( + pa.ArrowTypeError, + match="DataType is not supported" + ): + batch.to_tensor() + + def _table_like_slice_tests(factory): data = [ pa.array(range(5)),