Skip to content

Commit

Permalink
Add initial test case
Browse files Browse the repository at this point in the history
  • Loading branch information
danepitkin committed Aug 20, 2024
1 parent 46c049c commit da657fc
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 30 deletions.
14 changes: 8 additions & 6 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ class SimpleRecordBatch : public RecordBatch {
public:
SimpleRecordBatch(std::shared_ptr<Schema> schema, int64_t num_rows,
std::vector<std::shared_ptr<Array>> columns,
DeviceAllocationType device_type = DeviceAllocationType::kCPU,
std::shared_ptr<Device::SyncEvent> sync_event = nullptr)
: RecordBatch(std::move(schema), num_rows),
boxed_columns_(std::move(columns)),
device_type_(DeviceAllocationType::kCPU),
device_type_(device_type),
sync_event_(std::move(sync_event)) {
if (boxed_columns_.size() > 0) {
device_type_ = boxed_columns_[0]->device_type();
Expand Down Expand Up @@ -210,11 +211,12 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows

std::shared_ptr<RecordBatch> RecordBatch::Make(
std::shared_ptr<Schema> schema, int64_t num_rows,
std::vector<std::shared_ptr<Array>> columns,
std::vector<std::shared_ptr<Array>> columns, DeviceAllocationType device_type,
std::shared_ptr<Device::SyncEvent> sync_event) {
DCHECK_EQ(schema->num_fields(), static_cast<int>(columns.size()));
return std::make_shared<SimpleRecordBatch>(std::move(schema), num_rows,
std::move(columns), std::move(sync_event));
std::move(columns), device_type,
std::move(sync_event));
}

std::shared_ptr<RecordBatch> RecordBatch::Make(
Expand Down Expand Up @@ -354,7 +356,7 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ReplaceSchema(
", did not match new schema field type: ", replace_type->ToString());
}
}
return RecordBatch::Make(std::move(schema), num_rows(), columns(), GetSyncEvent());
return RecordBatch::Make(std::move(schema), num_rows(), columns(), device_type(), GetSyncEvent());
}

std::vector<std::string> RecordBatch::ColumnNames() const {
Expand Down Expand Up @@ -383,7 +385,7 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::RenameColumns(
}

return RecordBatch::Make(::arrow::schema(std::move(fields)), num_rows(),
std::move(columns), GetSyncEvent());
std::move(columns), device_type(), GetSyncEvent());
}

Result<std::shared_ptr<RecordBatch>> RecordBatch::SelectColumns(
Expand All @@ -405,7 +407,7 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::SelectColumns(
auto new_schema =
std::make_shared<arrow::Schema>(std::move(fields), schema()->metadata());
return RecordBatch::Make(std::move(new_schema), num_rows(), std::move(columns),
GetSyncEvent());
device_type(), GetSyncEvent());
}

std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ARROW_EXPORT RecordBatch {
static std::shared_ptr<RecordBatch> Make(
std::shared_ptr<Schema> schema, int64_t num_rows,
std::vector<std::shared_ptr<Array>> columns,
DeviceAllocationType device_type = DeviceAllocationType::kCPU,
std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR);

/// \brief Construct record batch from vector of internal data structures
Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,13 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
const shared_ptr[CSchema]& schema, int64_t num_rows,
const vector[shared_ptr[CArray]]& columns)

@staticmethod
shared_ptr[CRecordBatch] MakeWithDevice "Make"(
const shared_ptr[CSchema]& schema, int64_t num_rows,
const vector[shared_ptr[CArray]]& columns,
CDeviceAllocationType device_type,
shared_ptr[CSyncEvent] sync_event)

CResult[shared_ptr[CStructArray]] ToStructArray() const

@staticmethod
Expand Down
17 changes: 14 additions & 3 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,6 @@ cdef class _Tabular(_PandasConvertible):
f"one of the `{self.__class__.__name__}.from_*` functions instead.")

def __array__(self, dtype=None, copy=None):
self._assert_cpu()
if copy is False:
raise ValueError(
"Unable to avoid a copy while creating a numpy array as requested "
Expand Down Expand Up @@ -3359,6 +3358,7 @@ cdef class RecordBatch(_Tabular):
metadata : dict or Mapping, default None
Optional metadata for the schema (if inferred).
Returns
-------
pyarrow.RecordBatch
Expand Down Expand Up @@ -3413,6 +3413,7 @@ cdef class RecordBatch(_Tabular):
shared_ptr[CSchema] c_schema
vector[shared_ptr[CArray]] c_arrays
int64_t num_rows
CDeviceAllocationType c_device_type

if len(arrays) > 0:
num_rows = len(arrays[0])
Expand All @@ -3437,8 +3438,18 @@ cdef class RecordBatch(_Tabular):
'{0} vs {1}'.format(len(arr), num_rows))
c_arrays.push_back(arr.sp_array)

result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows,
c_arrays))
if c_arrays.size() > 0:
c_device_type = deref(c_arrays[0]).device_type()
result = pyarrow_wrap_batch(CRecordBatch.MakeWithDevice(c_schema,
num_rows,
c_arrays,
c_device_type,
<shared_ptr[CSyncEvent]>NULL))
else:
result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema,
num_rows,
c_arrays))

result.validate()
return result

Expand Down
44 changes: 23 additions & 21 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,27 +1276,29 @@ def _table_like_slice_tests(factory):
assert obj.slice(len(obj) - 4, 2).equals(obj[-4:-2])


# def test_recordbatch_non_cpu():
# cuda = pytest.importorskip("pyarrow.cuda")
# ctx = cuda.Context(0)

# cpu_data = [
# pa.array(range(5), type='int16'),
# pa.array([-10, -5, 0, None, 10], type='int32')
# ]
# cuda_data = [cuda.buffer_from_data(x) for x in cpu_data]
# batch = pa.record_batch(cpu_data, ['c0', 'c1'])

# # Supported
# batch.validate()
# assert batch.offset == 0
# assert batch.buffers() == [None, cuda_data[0], cuda_data[1]]
# assert batch.device_type == pa.DeviceAllocationType.CUDA
# assert batch.is_cpu is False
# assert len(batch) == 4
# assert batch.slice(2, 2).offset == 2
# assert repr(batch)
# assert str(batch)
def test_recordbatch_non_cpu():
cuda = pytest.importorskip("pyarrow.cuda")
ctx = cuda.Context(0)

cpu_data = [
np.array(range(5), dtype=np.int16),
np.array([-10, -5, 0, 1, 10], dtype=np.int32)
]
cuda_data = [ctx.buffer_from_data(x) for x in cpu_data]
cuda_arrays = [
pa.Array.from_buffers(pa.int16(), 5, [None, cuda_data[0]]),
pa.Array.from_buffers(pa.int32(), 5, [None, cuda_data[1]]),
]
batch = pa.record_batch(cuda_arrays, ['c0', 'c1'])

# Supported
batch.validate()
assert batch.device_type == pa.DeviceAllocationType.CUDA
assert batch.is_cpu is False
assert len(batch) == 5
assert repr(batch)
assert str(batch)
#assert batch.slice(2, 2) == 2

# # TODO support DLPack for CUDA
# with pytest.raises(NotImplementedError):
Expand Down

0 comments on commit da657fc

Please sign in to comment.