Skip to content

Commit

Permalink
apacheGH-38717: [C++] Add ImportChunkedArray and ExportChunkedArray t…
Browse files Browse the repository at this point in the history
…o/from ArrowArrayStream (apache#39455)

### Rationale for this change

The `ChunkedArray` has no equivalent in the C data interface; however, it is the primary array structure that higher level bindings interact with (because it is a column in a `Table`). In the Python capsule interface, this means that ChunkedArrays always require a workaround involving loops in Python.

### What changes are included in this PR?

- Added `ImportChunkedArray()` and `ExportChunkedArray()`
- Generalized the classes that support import/export to relax the assumption that every `ArrowArray` in an `ArrowArrayStream` is a `RecordBatch`.

### Are these changes tested?

TODO

### Are there any user-facing changes?

Yes, two new functions are added to bridge.h.
* Closes: apache#38717

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
  • Loading branch information
2 people authored and dgreiss committed Feb 17, 2024
1 parent 292a051 commit 65f0925
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 61 deletions.
252 changes: 191 additions & 61 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2002,13 +2002,49 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(

namespace {

Status ExportStreamSchema(const std::shared_ptr<RecordBatchReader>& src,
struct ArrowSchema* out_schema) {
return ExportSchema(*src->schema(), out_schema);
}

Status ExportStreamSchema(const std::shared_ptr<ChunkedArray>& src,
struct ArrowSchema* out_schema) {
return ExportType(*src->type(), out_schema);
}

Status ExportStreamNext(const std::shared_ptr<RecordBatchReader>& src, int64_t i,
struct ArrowArray* out_array) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(src->ReadNext(&batch));
if (batch == nullptr) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportRecordBatch(*batch, out_array);
}
}

Status ExportStreamNext(const std::shared_ptr<ChunkedArray>& src, int64_t i,
struct ArrowArray* out_array) {
if (i >= src->num_chunks()) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportArray(*src->chunk(static_cast<int>(i)), out_array);
}
}

template <typename T>
class ExportedArrayStream {
public:
struct PrivateData {
explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)) {}
explicit PrivateData(std::shared_ptr<T> reader)
: reader_(std::move(reader)), batch_num_(0) {}

std::shared_ptr<RecordBatchReader> reader_;
std::shared_ptr<T> reader_;
int64_t batch_num_;
std::string last_error_;

PrivateData() = default;
Expand All @@ -2018,19 +2054,11 @@ class ExportedArrayStream {
explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {}

Status GetSchema(struct ArrowSchema* out_schema) {
return ExportSchema(*reader()->schema(), out_schema);
return ExportStreamSchema(reader(), out_schema);
}

Status GetNext(struct ArrowArray* out_array) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader()->ReadNext(&batch));
if (batch == nullptr) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportRecordBatch(*batch, out_array);
}
return ExportStreamNext(reader(), next_batch_num(), out_array);
}

const char* GetLastError() {
Expand Down Expand Up @@ -2070,6 +2098,15 @@ class ExportedArrayStream {
return ExportedArrayStream{stream}.GetLastError();
}

static Status Make(std::shared_ptr<T> reader, struct ArrowArrayStream* out) {
out->get_schema = ExportedArrayStream::StaticGetSchema;
out->get_next = ExportedArrayStream::StaticGetNext;
out->get_last_error = ExportedArrayStream::StaticGetLastError;
out->release = ExportedArrayStream::StaticRelease;
out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
return Status::OK();
}

private:
int ToCError(const Status& status) {
if (ARROW_PREDICT_TRUE(status.ok())) {
Expand All @@ -2093,7 +2130,9 @@ class ExportedArrayStream {
return reinterpret_cast<PrivateData*>(stream_->private_data);
}

const std::shared_ptr<RecordBatchReader>& reader() { return private_data()->reader_; }
const std::shared_ptr<T>& reader() { return private_data()->reader_; }

int64_t next_batch_num() { return private_data()->batch_num_++; }

struct ArrowArrayStream* stream_;
};
Expand All @@ -2102,79 +2141,71 @@ class ExportedArrayStream {

Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out) {
out->get_schema = ExportedArrayStream::StaticGetSchema;
out->get_next = ExportedArrayStream::StaticGetNext;
out->get_last_error = ExportedArrayStream::StaticGetLastError;
out->release = ExportedArrayStream::StaticRelease;
out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
return Status::OK();
return ExportedArrayStream<RecordBatchReader>::Make(std::move(reader), out);
}

Status ExportChunkedArray(std::shared_ptr<ChunkedArray> chunked_array,
struct ArrowArrayStream* out) {
return ExportedArrayStream<ChunkedArray>::Make(std::move(chunked_array), out);
}

//////////////////////////////////////////////////////////////////////////
// C stream import

namespace {

class ArrayStreamBatchReader : public RecordBatchReader {
class ArrayStreamReader {
public:
explicit ArrayStreamBatchReader(std::shared_ptr<Schema> schema,
struct ArrowArrayStream* stream)
: schema_(std::move(schema)) {
explicit ArrayStreamReader(struct ArrowArrayStream* stream) {
ArrowArrayStreamMove(stream, &stream_);
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() override {
~ArrayStreamReader() { ReleaseStream(); }

void ReleaseStream() {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
struct ArrowArray c_array;
if (ArrowArrayStreamIsReleased(&stream_)) {
return Status::Invalid(
"Attempt to read from a reader that has already been closed");
}
RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
if (ArrowArrayIsReleased(&c_array)) {
// End of stream
batch->reset();
return Status::OK();
} else {
return ImportRecordBatch(&c_array, schema_).Value(batch);
protected:
Status ReadNextArrayInternal(struct ArrowArray* array) {
ArrowArrayMarkReleased(array);
Status status = StatusFromCError(stream_.get_next(&stream_, array));
if (!status.ok() && !ArrowArrayIsReleased(array)) {
ArrowArrayRelease(array);
}

return status;
}

Status Close() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
return Status::OK();
Result<std::shared_ptr<Schema>> ReadSchema() {
struct ArrowSchema c_schema = {};
ARROW_RETURN_NOT_OK(
StatusFromCError(&stream_, stream_.get_schema(&stream_, &c_schema)));
ARROW_ASSIGN_OR_RAISE(auto schema, ImportSchema(&c_schema));
return schema;
}

static Result<std::shared_ptr<RecordBatchReader>> Make(
struct ArrowArrayStream* stream) {
if (ArrowArrayStreamIsReleased(stream)) {
return Status::Invalid("Cannot import released ArrowArrayStream");
}
std::shared_ptr<Schema> schema;
Result<std::shared_ptr<Field>> ReadField() {
struct ArrowSchema c_schema = {};
auto status = StatusFromCError(stream, stream->get_schema(stream, &c_schema));
if (status.ok()) {
status = ImportSchema(&c_schema).Value(&schema);
}
if (!status.ok()) {
ArrowArrayStreamRelease(stream);
return status;
ARROW_RETURN_NOT_OK(
StatusFromCError(&stream_, stream_.get_schema(&stream_, &c_schema)));
ARROW_ASSIGN_OR_RAISE(auto schema, ImportField(&c_schema));
return schema;
}

Status CheckNotReleased() {
if (ArrowArrayStreamIsReleased(&stream_)) {
return Status::Invalid(
"Attempt to read from a stream that has already been closed");
} else {
return Status::OK();
}
return std::make_shared<ArrayStreamBatchReader>(std::move(schema), stream);
}

private:
Status StatusFromCError(int errno_like) const {
return StatusFromCError(&stream_, errno_like);
}
Expand Down Expand Up @@ -2203,15 +2234,114 @@ class ArrayStreamBatchReader : public RecordBatchReader {
return {code, last_error ? std::string(last_error) : ""};
}

private:
mutable struct ArrowArrayStream stream_;
};

class ArrayStreamBatchReader : public RecordBatchReader, public ArrayStreamReader {
public:
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream)
: ArrayStreamReader(stream) {}

Status Init() {
ARROW_ASSIGN_OR_RAISE(schema_, ReadSchema());
return Status::OK();
}

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
ARROW_RETURN_NOT_OK(CheckNotReleased());

struct ArrowArray c_array;
ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));

if (ArrowArrayIsReleased(&c_array)) {
// End of stream
batch->reset();
return Status::OK();
} else {
return ImportRecordBatch(&c_array, schema_).Value(batch);
}
}

Status Close() override {
ReleaseStream();
return Status::OK();
}

private:
std::shared_ptr<Schema> schema_;
};

class ArrayStreamArrayReader : public ArrayStreamReader {
public:
explicit ArrayStreamArrayReader(struct ArrowArrayStream* stream)
: ArrayStreamReader(stream) {}

Status Init() {
ARROW_ASSIGN_OR_RAISE(field_, ReadField());
return Status::OK();
}

std::shared_ptr<DataType> data_type() const { return field_->type(); }

Status ReadNext(std::shared_ptr<Array>* array) {
ARROW_RETURN_NOT_OK(CheckNotReleased());

struct ArrowArray c_array;
ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));

if (ArrowArrayIsReleased(&c_array)) {
// End of stream
array->reset();
return Status::OK();
} else {
return ImportArray(&c_array, field_->type()).Value(array);
}
}

private:
std::shared_ptr<Field> field_;
};

} // namespace

Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream) {
return ArrayStreamBatchReader::Make(stream);
if (ArrowArrayStreamIsReleased(stream)) {
return Status::Invalid("Cannot import released ArrowArrayStream");
}

auto reader = std::make_shared<ArrayStreamBatchReader>(stream);
ARROW_RETURN_NOT_OK(reader->Init());
return reader;
}

Result<std::shared_ptr<ChunkedArray>> ImportChunkedArray(
struct ArrowArrayStream* stream) {
if (ArrowArrayStreamIsReleased(stream)) {
return Status::Invalid("Cannot import released ArrowArrayStream");
}

auto reader = std::make_shared<ArrayStreamArrayReader>(stream);
ARROW_RETURN_NOT_OK(reader->Init());

std::shared_ptr<DataType> data_type = reader->data_type();

ArrayVector chunks;
std::shared_ptr<Array> chunk;
while (true) {
ARROW_RETURN_NOT_OK(reader->ReadNext(&chunk));
if (!chunk) {
break;
}

chunks.push_back(std::move(chunk));
}

reader->ReleaseStream();
return ChunkedArray::Make(std::move(chunks), std::move(data_type));
}

} // namespace arrow
22 changes: 22 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);

/// \brief Export C++ ChunkedArray using the C data interface format.
///
/// The resulting ArrowArrayStream struct keeps the chunked array data and buffers alive
/// until its release callback is called by the consumer.
///
/// \param[in] chunked_array ChunkedArray object to export
/// \param[out] out C struct where to export the stream
ARROW_EXPORT
Status ExportChunkedArray(std::shared_ptr<ChunkedArray> chunked_array,
struct ArrowArrayStream* out);

/// \brief Import C++ RecordBatchReader from the C stream interface.
///
/// The ArrowArrayStream struct has its contents moved to a private object
Expand All @@ -313,6 +324,17 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream);

/// \brief Import C++ ChunkedArray from the C stream interface
///
/// The ArrowArrayStream struct has its contents moved to a private object,
/// is consumed in its entirity, and released before returning all chunks
/// as a ChunkedArray.
///
/// \param[in,out] stream C stream interface struct
/// \return Imported ChunkedArray object
ARROW_EXPORT
Result<std::shared_ptr<ChunkedArray>> ImportChunkedArray(struct ArrowArrayStream* stream);

/// @}

} // namespace arrow
Loading

0 comments on commit 65f0925

Please sign in to comment.