Skip to content

Commit

Permalink
GH-40059: [C++][Python] Basic conversion of RecordBatch to Arrow Tens…
Browse files Browse the repository at this point in the history
…or (#40064)

### Rationale for this change

There is no method currently in Arrow C++ to convert `Table` or `RecordBatch` to a `Tensor`. In #40058 we are proposing to add the conversion and this PR starts with the basic implementation for `RecordBatch`.

### What changes are included in this PR?

Basic conversion `RecordBatch` → `Tensor` is added together with Python bindings. The implementation details are:

- One data type (all columns having for example an `int32` data type) support.
- No missing values support (only `NaN`).
- Column-major layout of the resulting `Tensor`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: #40059

Lead-authored-by: AlenkaF <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
AlenkaF and bkietz authored Mar 5, 2024
1 parent 4ce9a5e commit 3ba6d28
Show file tree
Hide file tree
Showing 6 changed files with 487 additions and 0 deletions.
92 changes: 92 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/tensor.h"
#include "arrow/type.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -247,6 +248,97 @@ Result<std::shared_ptr<StructArray>> RecordBatch::ToStructArray() const {
/*offset=*/0);
}

template <typename DataType>
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
using CType = typename arrow::TypeTraits<DataType>::CType;
auto* out_values = reinterpret_cast<CType*>(out);

// Loop through all of the columns
for (int i = 0; i < batch.num_columns(); ++i) {
const auto* in_values = batch.column(i)->data()->GetValues<CType>(1);

// Copy data of each column
memcpy(out_values, in_values, sizeof(CType) * batch.num_rows());
out_values += batch.num_rows();
} // End loop through columns
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
"Conversion to Tensor for RecordBatches without columns/schema is not "
"supported.");
}
const auto& type = column(0)->type();
// Check for supported data types
if (!is_integer(type->id()) && !is_floating(type->id())) {
return Status::TypeError("DataType is not supported: ", type->ToString());
}
// Check for uniform data type
// Check for no validity bitmap of each field
for (int i = 0; i < num_columns(); ++i) {
if (column(i)->null_count() > 0) {
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
}
if (column(i)->type() != type) {
return Status::TypeError("Can only convert a RecordBatch with uniform data type.");
}
}

// Allocate memory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> result,
AllocateBuffer(type->bit_width() * num_columns() * num_rows(), pool));
// Copy data
switch (type->id()) {
case Type::UINT8:
ConvertColumnsToTensor<UInt8Type>(*this, result->mutable_data());
break;
case Type::UINT16:
case Type::HALF_FLOAT:
ConvertColumnsToTensor<UInt16Type>(*this, result->mutable_data());
break;
case Type::UINT32:
ConvertColumnsToTensor<UInt32Type>(*this, result->mutable_data());
break;
case Type::UINT64:
ConvertColumnsToTensor<UInt64Type>(*this, result->mutable_data());
break;
case Type::INT8:
ConvertColumnsToTensor<Int8Type>(*this, result->mutable_data());
break;
case Type::INT16:
ConvertColumnsToTensor<Int16Type>(*this, result->mutable_data());
break;
case Type::INT32:
ConvertColumnsToTensor<Int32Type>(*this, result->mutable_data());
break;
case Type::INT64:
ConvertColumnsToTensor<Int64Type>(*this, result->mutable_data());
break;
case Type::FLOAT:
ConvertColumnsToTensor<FloatType>(*this, result->mutable_data());
break;
case Type::DOUBLE:
ConvertColumnsToTensor<DoubleType>(*this, result->mutable_data());
break;
default:
return Status::TypeError("DataType is not supported: ", type->ToString());
}

// Construct Tensor object
const auto& fixed_width_type =
internal::checked_cast<const FixedWidthType&>(*column(0)->type());
std::vector<int64_t> shape = {num_rows(), num_columns()};
std::vector<int64_t> strides;
ARROW_RETURN_NOT_OK(
internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides));
ARROW_ASSIGN_OR_RAISE(auto tensor,
Tensor::Make(type, std::move(result), shape, strides));

return tensor;
}

const std::string& RecordBatch::column_name(int i) const {
return schema_->field(i)->name();
}
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ class ARROW_EXPORT RecordBatch {
/// in the resulting struct array.
Result<std::shared_ptr<StructArray>> ToStructArray() const;

/// \brief Convert record batch with one data type to Tensor
///
/// Create a Tensor object with shape (number of rows, number of columns) and
/// strides (type size in bytes, type size in bytes * number of rows).
/// Generated Tensor will have column-major layout.
Result<std::shared_ptr<Tensor>> ToTensor(
MemoryPool* pool = default_memory_pool()) const;

/// \brief Construct record batch from struct array
///
/// This constructs a record batch using the child arrays of the given
Expand Down
229 changes: 229 additions & 0 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/chunked_array.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/tensor.h"
#include "arrow/testing/builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -592,4 +593,232 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
ASSERT_BATCHES_EQUAL(*batch, *null_batch);
}

TEST_F(TestRecordBatch, ToTensorUnsupported) {
const int length = 9;

// Mixed data type
auto f0 = field("f0", int32());
auto f1 = field("f1", int64());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(int64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(
TypeError, "Type error: Can only convert a RecordBatch with uniform data type.",
batch->ToTensor());

// Unsupported data type
auto f2 = field("f2", utf8());

std::vector<std::shared_ptr<Field>> fields_1 = {f2};
auto schema_2 = ::arrow::schema(fields_1);

auto a2 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])");
auto batch_2 = RecordBatch::Make(schema_2, length, {a2});

ASSERT_RAISES_WITH_MESSAGE(
TypeError, "Type error: DataType is not supported: " + a2->type()->ToString(),
batch_2->ToTensor());
}

TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
const int length = 9;

auto f0 = field("f0", int32());
auto f1 = field("f1", int32());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(TypeError,
"Type error: Can only convert a RecordBatch with no nulls.",
batch->ToTensor());
}

TEST_F(TestRecordBatch, ToTensorEmptyBatch) {
auto f0 = field("f0", int32());
auto f1 = field("f1", int32());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> empty,
RecordBatch::MakeEmpty(schema));

ASSERT_OK_AND_ASSIGN(auto tensor, empty->ToTensor());
ASSERT_OK(tensor->Validate());

const std::vector<int64_t> strides = {4, 4};
const std::vector<int64_t> shape = {0, 2};

EXPECT_EQ(strides, tensor->strides());
EXPECT_EQ(shape, tensor->shape());

auto batch_no_columns =
RecordBatch::Make(::arrow::schema({}), 10, std::vector<std::shared_ptr<Array>>{});

ASSERT_RAISES_WITH_MESSAGE(TypeError,
"Type error: Conversion to Tensor for RecordBatches without "
"columns/schema is not supported.",
batch_no_columns->ToTensor());
}

template <typename DataType>
void CheckTensor(const std::shared_ptr<Tensor>& tensor, const int size,
const std::vector<int64_t> shape, const std::vector<int64_t> f_strides) {
EXPECT_EQ(size, tensor->size());
EXPECT_EQ(TypeTraits<DataType>::type_singleton(), tensor->type());
EXPECT_EQ(shape, tensor->shape());
EXPECT_EQ(f_strides, tensor->strides());
EXPECT_FALSE(tensor->is_row_major());
EXPECT_TRUE(tensor->is_column_major());
EXPECT_TRUE(tensor->is_contiguous());
}

TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
const int length = 9;

auto f0 = field("f0", float32());
auto f1 = field("f1", float32());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, NaN, 60, 70, 80, 90]");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor());
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 2};
const int64_t f32_size = sizeof(float);
std::vector<int64_t> f_strides = {f32_size, f32_size * shape[0]};
std::vector<float> f_values = {
static_cast<float>(NAN), 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40,
static_cast<float>(NAN), 60, 70, 80, 90};
auto data = Buffer::Wrap(f_values);

std::shared_ptr<Tensor> tensor_expected;
ASSERT_OK_AND_ASSIGN(tensor_expected, Tensor::Make(float32(), data, shape, f_strides));

EXPECT_FALSE(tensor_expected->Equals(*tensor));
EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true)));

CheckTensor<FloatType>(tensor, 18, shape, f_strides);
}

template <typename DataType>
class TestBatchToTensor : public ::testing::Test {};

TYPED_TEST_SUITE_P(TestBatchToTensor);

TYPED_TEST_P(TestBatchToTensor, SupportedTypes) {
using DataType = TypeParam;
using c_data_type = typename DataType::c_type;
const int unit_size = sizeof(c_data_type);

const int length = 9;

auto f0 = field("f0", TypeTraits<DataType>::type_singleton());
auto f1 = field("f1", TypeTraits<DataType>::type_singleton());
auto f2 = field("f2", TypeTraits<DataType>::type_singleton());

std::vector<std::shared_ptr<Field>> fields = {f0, f1, f2};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(TypeTraits<DataType>::type_singleton(),
"[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(TypeTraits<DataType>::type_singleton(),
"[10, 20, 30, 40, 50, 60, 70, 80, 90]");
auto a2 = ArrayFromJSON(TypeTraits<DataType>::type_singleton(),
"[100, 100, 100, 100, 100, 100, 100, 100, 100]");

auto batch = RecordBatch::Make(schema, length, {a0, a1, a2});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor());
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 3};
std::vector<int64_t> f_strides = {unit_size, unit_size * shape[0]};
std::vector<c_data_type> f_values = {1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 20, 30, 40, 50, 60, 70, 80, 90,
100, 100, 100, 100, 100, 100, 100, 100, 100};
auto data = Buffer::Wrap(f_values);

std::shared_ptr<Tensor> tensor_expected;
ASSERT_OK_AND_ASSIGN(
tensor_expected,
Tensor::Make(TypeTraits<DataType>::type_singleton(), data, shape, f_strides));

EXPECT_TRUE(tensor_expected->Equals(*tensor));
CheckTensor<DataType>(tensor, 27, shape, f_strides);

// Test offsets
auto batch_slice = batch->Slice(1);

ASSERT_OK_AND_ASSIGN(auto tensor_sliced, batch_slice->ToTensor());
ASSERT_OK(tensor_sliced->Validate());

std::vector<int64_t> shape_sliced = {8, 3};
std::vector<int64_t> f_strides_sliced = {unit_size, unit_size * shape_sliced[0]};
std::vector<c_data_type> f_values_sliced = {2, 3, 4, 5, 6, 7, 8, 9,
20, 30, 40, 50, 60, 70, 80, 90,
100, 100, 100, 100, 100, 100, 100, 100};
auto data_sliced = Buffer::Wrap(f_values_sliced);

std::shared_ptr<Tensor> tensor_expected_sliced;
ASSERT_OK_AND_ASSIGN(tensor_expected_sliced,
Tensor::Make(TypeTraits<DataType>::type_singleton(), data_sliced,
shape_sliced, f_strides_sliced));

EXPECT_TRUE(tensor_expected_sliced->Equals(*tensor_sliced));
CheckTensor<DataType>(tensor_expected_sliced, 24, shape_sliced, f_strides_sliced);

auto batch_slice_1 = batch->Slice(1, 5);

ASSERT_OK_AND_ASSIGN(auto tensor_sliced_1, batch_slice_1->ToTensor());
ASSERT_OK(tensor_sliced_1->Validate());

std::vector<int64_t> shape_sliced_1 = {5, 3};
std::vector<int64_t> f_strides_sliced_1 = {unit_size, unit_size * shape_sliced_1[0]};
std::vector<c_data_type> f_values_sliced_1 = {
2, 3, 4, 5, 6, 20, 30, 40, 50, 60, 100, 100, 100, 100, 100,
};
auto data_sliced_1 = Buffer::Wrap(f_values_sliced_1);

std::shared_ptr<Tensor> tensor_expected_sliced_1;
ASSERT_OK_AND_ASSIGN(tensor_expected_sliced_1,
Tensor::Make(TypeTraits<DataType>::type_singleton(), data_sliced_1,
shape_sliced_1, f_strides_sliced_1));

EXPECT_TRUE(tensor_expected_sliced_1->Equals(*tensor_sliced_1));
CheckTensor<DataType>(tensor_expected_sliced_1, 15, shape_sliced_1, f_strides_sliced_1);
}

REGISTER_TYPED_TEST_SUITE_P(TestBatchToTensor, SupportedTypes);

INSTANTIATE_TYPED_TEST_SUITE_P(UInt8, TestBatchToTensor, UInt8Type);
INSTANTIATE_TYPED_TEST_SUITE_P(UInt16, TestBatchToTensor, UInt16Type);
INSTANTIATE_TYPED_TEST_SUITE_P(UInt32, TestBatchToTensor, UInt32Type);
INSTANTIATE_TYPED_TEST_SUITE_P(UInt64, TestBatchToTensor, UInt64Type);
INSTANTIATE_TYPED_TEST_SUITE_P(Int8, TestBatchToTensor, Int8Type);
INSTANTIATE_TYPED_TEST_SUITE_P(Int16, TestBatchToTensor, Int16Type);
INSTANTIATE_TYPED_TEST_SUITE_P(Int32, TestBatchToTensor, Int32Type);
INSTANTIATE_TYPED_TEST_SUITE_P(Int64, TestBatchToTensor, Int64Type);
INSTANTIATE_TYPED_TEST_SUITE_P(Float16, TestBatchToTensor, HalfFloatType);
INSTANTIATE_TYPED_TEST_SUITE_P(Float32, TestBatchToTensor, FloatType);
INSTANTIATE_TYPED_TEST_SUITE_P(Float64, TestBatchToTensor, DoubleType);

} // namespace arrow
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CTensor]] ToTensor() const

cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
shared_ptr[CRecordBatch] batch
# The struct in C++ does not actually have these two `const` qualifiers, but
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3010,6 +3010,20 @@ cdef class RecordBatch(_Tabular):
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
return pyarrow_wrap_array(c_array)

def to_tensor(self):
"""
Convert to a :class:`~pyarrow.Tensor`.
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CTensor] c_tensor

c_record_batch = pyarrow_unwrap_batch(self)
with nogil:
c_tensor = GetResultValue(
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
return pyarrow_wrap_tensor(c_tensor)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
Expand Down
Loading

0 comments on commit 3ba6d28

Please sign in to comment.