diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index e3a8c0d710cb8..456c3b3835f69 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -59,10 +59,11 @@ class SimpleRecordBatch : public RecordBatch { public: SimpleRecordBatch(std::shared_ptr schema, int64_t num_rows, std::vector> columns, + DeviceAllocationType device_type = DeviceAllocationType::kCPU, std::shared_ptr sync_event = nullptr) : RecordBatch(std::move(schema), num_rows), boxed_columns_(std::move(columns)), - device_type_(DeviceAllocationType::kCPU), + device_type_(device_type), sync_event_(std::move(sync_event)) { if (boxed_columns_.size() > 0) { device_type_ = boxed_columns_[0]->device_type(); @@ -210,11 +211,12 @@ RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows std::shared_ptr RecordBatch::Make( std::shared_ptr schema, int64_t num_rows, - std::vector> columns, + std::vector> columns, DeviceAllocationType device_type, std::shared_ptr sync_event) { DCHECK_EQ(schema->num_fields(), static_cast(columns.size())); return std::make_shared(std::move(schema), num_rows, - std::move(columns), std::move(sync_event)); + std::move(columns), device_type, + std::move(sync_event)); } std::shared_ptr RecordBatch::Make( @@ -354,7 +356,7 @@ Result> RecordBatch::ReplaceSchema( ", did not match new schema field type: ", replace_type->ToString()); } } - return RecordBatch::Make(std::move(schema), num_rows(), columns(), GetSyncEvent()); + return RecordBatch::Make(std::move(schema), num_rows(), columns(), device_type(), GetSyncEvent()); } std::vector RecordBatch::ColumnNames() const { @@ -383,7 +385,7 @@ Result> RecordBatch::RenameColumns( } return RecordBatch::Make(::arrow::schema(std::move(fields)), num_rows(), - std::move(columns), GetSyncEvent()); + std::move(columns), device_type(), GetSyncEvent()); } Result> RecordBatch::SelectColumns( @@ -405,7 +407,7 @@ Result> RecordBatch::SelectColumns( auto new_schema = std::make_shared(std::move(fields), schema()->metadata()); return RecordBatch::Make(std::move(new_schema), num_rows(), std::move(columns), - GetSyncEvent()); + device_type(), GetSyncEvent()); } std::shared_ptr RecordBatch::Slice(int64_t offset) const { diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 95596e9c15594..bccaf66735a12 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -51,6 +51,7 @@ class ARROW_EXPORT RecordBatch { static std::shared_ptr Make( std::shared_ptr schema, int64_t num_rows, std::vector> columns, + DeviceAllocationType device_type = DeviceAllocationType::kCPU, std::shared_ptr sync_event = NULLPTR); /// \brief Construct record batch from vector of internal data structures diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9b008d150f1f1..9f9093100b174 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -990,6 +990,13 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: const shared_ptr[CSchema]& schema, int64_t num_rows, const vector[shared_ptr[CArray]]& columns) + @staticmethod + shared_ptr[CRecordBatch] MakeWithDevice "Make"( + const shared_ptr[CSchema]& schema, int64_t num_rows, + const vector[shared_ptr[CArray]]& columns, + CDeviceAllocationType device_type, + shared_ptr[CSyncEvent] sync_event) + CResult[shared_ptr[CStructArray]] ToStructArray() const @staticmethod diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 5ee8aa149971f..fc5acfa4285ba 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1571,7 +1571,6 @@ cdef class _Tabular(_PandasConvertible): f"one of the `{self.__class__.__name__}.from_*` functions instead.") def __array__(self, dtype=None, copy=None): - self._assert_cpu() if copy is False: raise ValueError( "Unable to avoid a copy while creating a numpy array as requested " @@ -3359,6 +3358,7 @@ cdef class RecordBatch(_Tabular): metadata : dict or Mapping, default None Optional metadata for the schema (if inferred). + Returns ------- pyarrow.RecordBatch @@ -3413,6 +3413,7 @@ cdef class RecordBatch(_Tabular): shared_ptr[CSchema] c_schema vector[shared_ptr[CArray]] c_arrays int64_t num_rows + CDeviceAllocationType c_device_type if len(arrays) > 0: num_rows = len(arrays[0]) @@ -3437,8 +3438,18 @@ cdef class RecordBatch(_Tabular): '{0} vs {1}'.format(len(arr), num_rows)) c_arrays.push_back(arr.sp_array) - result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows, - c_arrays)) + if c_arrays.size() > 0: + c_device_type = deref(c_arrays[0]).device_type() + result = pyarrow_wrap_batch(CRecordBatch.MakeWithDevice(c_schema, + num_rows, + c_arrays, + c_device_type, + NULL)) + else: + result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, + num_rows, + c_arrays)) + result.validate() return result diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index a303a87db6860..18ba35f6c9a50 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1276,27 +1276,29 @@ def _table_like_slice_tests(factory): assert obj.slice(len(obj) - 4, 2).equals(obj[-4:-2]) -# def test_recordbatch_non_cpu(): -# cuda = pytest.importorskip("pyarrow.cuda") -# ctx = cuda.Context(0) - -# cpu_data = [ -# pa.array(range(5), type='int16'), -# pa.array([-10, -5, 0, None, 10], type='int32') -# ] -# cuda_data = [cuda.buffer_from_data(x) for x in cpu_data] -# batch = pa.record_batch(cpu_data, ['c0', 'c1']) - -# # Supported -# batch.validate() -# assert batch.offset == 0 -# assert batch.buffers() == [None, cuda_data[0], cuda_data[1]] -# assert batch.device_type == pa.DeviceAllocationType.CUDA -# assert batch.is_cpu is False -# assert len(batch) == 4 -# assert batch.slice(2, 2).offset == 2 -# assert repr(batch) -# assert str(batch) +def test_recordbatch_non_cpu(): + cuda = pytest.importorskip("pyarrow.cuda") + ctx = cuda.Context(0) + + cpu_data = [ + np.array(range(5), dtype=np.int16), + np.array([-10, -5, 0, 1, 10], dtype=np.int32) + ] + cuda_data = [ctx.buffer_from_data(x) for x in cpu_data] + cuda_arrays = [ + pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]), + pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]), + ] + batch = pa.record_batch(cuda_arrays, ['c0', 'c1']) + + # Supported + batch.validate() + assert batch.device_type == pa.DeviceAllocationType.CUDA + assert batch.is_cpu is False + assert len(batch) == 5 + assert repr(batch) + assert str(batch) + #assert batch.slice(2, 2) == 2 # # TODO support DLPack for CUDA # with pytest.raises(NotImplementedError):