From 65f09252e3812251b77373f901334c514259dd9c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Feb 2024 09:45:36 -0400 Subject: [PATCH] GH-38717: [C++] Add ImportChunkedArray and ExportChunkedArray to/from ArrowArrayStream (#39455) ### Rationale for this change The `ChunkedArray` has no equivalent in the C data interface; however, it is the primary array structure that higher level bindings interact with (because it is a column in a `Table`). In the Python capsule interface, this means that ChunkedArrays always require a workaround involving loops in Python. ### What changes are included in this PR? - Added `ImportChunkedArray()` and `ExportChunkedArray()` - Generalized the classes that support import/export to relax the assumption that every `ArrowArray` in an `ArrowArrayStream` is a `RecordBatch`. ### Are these changes tested? TODO ### Are there any user-facing changes? Yes, two new functions are added to bridge.h. * Closes: #38717 Lead-authored-by: Dewey Dunnington Co-authored-by: Dewey Dunnington Co-authored-by: Antoine Pitrou Signed-off-by: Dewey Dunnington --- cpp/src/arrow/c/bridge.cc | 252 +++++++++++++++++++++++++-------- cpp/src/arrow/c/bridge.h | 22 +++ cpp/src/arrow/c/bridge_test.cc | 115 +++++++++++++++ 3 files changed, 328 insertions(+), 61 deletions(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 119249da99a6d..022fce72f59b8 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -2002,13 +2002,49 @@ Result> ImportDeviceRecordBatch( namespace { +Status ExportStreamSchema(const std::shared_ptr& src, + struct ArrowSchema* out_schema) { + return ExportSchema(*src->schema(), out_schema); +} + +Status ExportStreamSchema(const std::shared_ptr& src, + struct ArrowSchema* out_schema) { + return ExportType(*src->type(), out_schema); +} + +Status ExportStreamNext(const std::shared_ptr& src, int64_t i, + struct ArrowArray* out_array) { + std::shared_ptr batch; + RETURN_NOT_OK(src->ReadNext(&batch)); + if (batch == nullptr) { + // End of stream + ArrowArrayMarkReleased(out_array); + return Status::OK(); + } else { + return ExportRecordBatch(*batch, out_array); + } +} + +Status ExportStreamNext(const std::shared_ptr& src, int64_t i, + struct ArrowArray* out_array) { + if (i >= src->num_chunks()) { + // End of stream + ArrowArrayMarkReleased(out_array); + return Status::OK(); + } else { + return ExportArray(*src->chunk(static_cast(i)), out_array); + } +} + +template class ExportedArrayStream { public: struct PrivateData { - explicit PrivateData(std::shared_ptr reader) - : reader_(std::move(reader)) {} + explicit PrivateData(std::shared_ptr reader) + : reader_(std::move(reader)), batch_num_(0) {} - std::shared_ptr reader_; + std::shared_ptr reader_; + int64_t batch_num_; std::string last_error_; PrivateData() = default; @@ -2018,19 +2054,11 @@ class ExportedArrayStream { explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {} Status GetSchema(struct ArrowSchema* out_schema) { - return ExportSchema(*reader()->schema(), out_schema); + return ExportStreamSchema(reader(), out_schema); } Status GetNext(struct ArrowArray* out_array) { - std::shared_ptr batch; - RETURN_NOT_OK(reader()->ReadNext(&batch)); - if (batch == nullptr) { - // End of stream - ArrowArrayMarkReleased(out_array); - return Status::OK(); - } else { - return ExportRecordBatch(*batch, out_array); - } + return ExportStreamNext(reader(), next_batch_num(), out_array); } const char* GetLastError() { @@ -2070,6 +2098,15 @@ class ExportedArrayStream { return ExportedArrayStream{stream}.GetLastError(); } + static Status Make(std::shared_ptr reader, struct ArrowArrayStream* out) { + out->get_schema = ExportedArrayStream::StaticGetSchema; + out->get_next = ExportedArrayStream::StaticGetNext; + out->get_last_error = ExportedArrayStream::StaticGetLastError; + out->release = ExportedArrayStream::StaticRelease; + out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)}; + return Status::OK(); + } + private: int ToCError(const Status& status) { if (ARROW_PREDICT_TRUE(status.ok())) { @@ -2093,7 +2130,9 @@ class ExportedArrayStream { return reinterpret_cast(stream_->private_data); } - const std::shared_ptr& reader() { return private_data()->reader_; } + const std::shared_ptr& reader() { return private_data()->reader_; } + + int64_t next_batch_num() { return private_data()->batch_num_++; } struct ArrowArrayStream* stream_; }; @@ -2102,12 +2141,12 @@ class ExportedArrayStream { Status ExportRecordBatchReader(std::shared_ptr reader, struct ArrowArrayStream* out) { - out->get_schema = ExportedArrayStream::StaticGetSchema; - out->get_next = ExportedArrayStream::StaticGetNext; - out->get_last_error = ExportedArrayStream::StaticGetLastError; - out->release = ExportedArrayStream::StaticRelease; - out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)}; - return Status::OK(); + return ExportedArrayStream::Make(std::move(reader), out); +} + +Status ExportChunkedArray(std::shared_ptr chunked_array, + struct ArrowArrayStream* out) { + return ExportedArrayStream::Make(std::move(chunked_array), out); } ////////////////////////////////////////////////////////////////////////// @@ -2115,66 +2154,58 @@ Status ExportRecordBatchReader(std::shared_ptr reader, namespace { -class ArrayStreamBatchReader : public RecordBatchReader { +class ArrayStreamReader { public: - explicit ArrayStreamBatchReader(std::shared_ptr schema, - struct ArrowArrayStream* stream) - : schema_(std::move(schema)) { + explicit ArrayStreamReader(struct ArrowArrayStream* stream) { ArrowArrayStreamMove(stream, &stream_); DCHECK(!ArrowArrayStreamIsReleased(&stream_)); } - ~ArrayStreamBatchReader() override { + ~ArrayStreamReader() { ReleaseStream(); } + + void ReleaseStream() { if (!ArrowArrayStreamIsReleased(&stream_)) { ArrowArrayStreamRelease(&stream_); } DCHECK(ArrowArrayStreamIsReleased(&stream_)); } - std::shared_ptr schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr* batch) override { - struct ArrowArray c_array; - if (ArrowArrayStreamIsReleased(&stream_)) { - return Status::Invalid( - "Attempt to read from a reader that has already been closed"); - } - RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array))); - if (ArrowArrayIsReleased(&c_array)) { - // End of stream - batch->reset(); - return Status::OK(); - } else { - return ImportRecordBatch(&c_array, schema_).Value(batch); + protected: + Status ReadNextArrayInternal(struct ArrowArray* array) { + ArrowArrayMarkReleased(array); + Status status = StatusFromCError(stream_.get_next(&stream_, array)); + if (!status.ok() && !ArrowArrayIsReleased(array)) { + ArrowArrayRelease(array); } + + return status; } - Status Close() override { - if (!ArrowArrayStreamIsReleased(&stream_)) { - ArrowArrayStreamRelease(&stream_); - } - return Status::OK(); + Result> ReadSchema() { + struct ArrowSchema c_schema = {}; + ARROW_RETURN_NOT_OK( + StatusFromCError(&stream_, stream_.get_schema(&stream_, &c_schema))); + ARROW_ASSIGN_OR_RAISE(auto schema, ImportSchema(&c_schema)); + return schema; } - static Result> Make( - struct ArrowArrayStream* stream) { - if (ArrowArrayStreamIsReleased(stream)) { - return Status::Invalid("Cannot import released ArrowArrayStream"); - } - std::shared_ptr schema; + Result> ReadField() { struct ArrowSchema c_schema = {}; - auto status = StatusFromCError(stream, stream->get_schema(stream, &c_schema)); - if (status.ok()) { - status = ImportSchema(&c_schema).Value(&schema); - } - if (!status.ok()) { - ArrowArrayStreamRelease(stream); - return status; + ARROW_RETURN_NOT_OK( + StatusFromCError(&stream_, stream_.get_schema(&stream_, &c_schema))); + ARROW_ASSIGN_OR_RAISE(auto schema, ImportField(&c_schema)); + return schema; + } + + Status CheckNotReleased() { + if (ArrowArrayStreamIsReleased(&stream_)) { + return Status::Invalid( + "Attempt to read from a stream that has already been closed"); + } else { + return Status::OK(); } - return std::make_shared(std::move(schema), stream); } - private: Status StatusFromCError(int errno_like) const { return StatusFromCError(&stream_, errno_like); } @@ -2203,15 +2234,114 @@ class ArrayStreamBatchReader : public RecordBatchReader { return {code, last_error ? std::string(last_error) : ""}; } + private: mutable struct ArrowArrayStream stream_; +}; + +class ArrayStreamBatchReader : public RecordBatchReader, public ArrayStreamReader { + public: + explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) + : ArrayStreamReader(stream) {} + + Status Init() { + ARROW_ASSIGN_OR_RAISE(schema_, ReadSchema()); + return Status::OK(); + } + + std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* batch) override { + ARROW_RETURN_NOT_OK(CheckNotReleased()); + + struct ArrowArray c_array; + ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array)); + + if (ArrowArrayIsReleased(&c_array)) { + // End of stream + batch->reset(); + return Status::OK(); + } else { + return ImportRecordBatch(&c_array, schema_).Value(batch); + } + } + + Status Close() override { + ReleaseStream(); + return Status::OK(); + } + + private: std::shared_ptr schema_; }; +class ArrayStreamArrayReader : public ArrayStreamReader { + public: + explicit ArrayStreamArrayReader(struct ArrowArrayStream* stream) + : ArrayStreamReader(stream) {} + + Status Init() { + ARROW_ASSIGN_OR_RAISE(field_, ReadField()); + return Status::OK(); + } + + std::shared_ptr data_type() const { return field_->type(); } + + Status ReadNext(std::shared_ptr* array) { + ARROW_RETURN_NOT_OK(CheckNotReleased()); + + struct ArrowArray c_array; + ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array)); + + if (ArrowArrayIsReleased(&c_array)) { + // End of stream + array->reset(); + return Status::OK(); + } else { + return ImportArray(&c_array, field_->type()).Value(array); + } + } + + private: + std::shared_ptr field_; +}; + } // namespace Result> ImportRecordBatchReader( struct ArrowArrayStream* stream) { - return ArrayStreamBatchReader::Make(stream); + if (ArrowArrayStreamIsReleased(stream)) { + return Status::Invalid("Cannot import released ArrowArrayStream"); + } + + auto reader = std::make_shared(stream); + ARROW_RETURN_NOT_OK(reader->Init()); + return reader; +} + +Result> ImportChunkedArray( + struct ArrowArrayStream* stream) { + if (ArrowArrayStreamIsReleased(stream)) { + return Status::Invalid("Cannot import released ArrowArrayStream"); + } + + auto reader = std::make_shared(stream); + ARROW_RETURN_NOT_OK(reader->Init()); + + std::shared_ptr data_type = reader->data_type(); + + ArrayVector chunks; + std::shared_ptr chunk; + while (true) { + ARROW_RETURN_NOT_OK(reader->ReadNext(&chunk)); + if (!chunk) { + break; + } + + chunks.push_back(std::move(chunk)); + } + + reader->ReleaseStream(); + return ChunkedArray::Make(std::move(chunks), std::move(data_type)); } } // namespace arrow diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 45583109a761f..e98a42818f628 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -302,6 +302,17 @@ ARROW_EXPORT Status ExportRecordBatchReader(std::shared_ptr reader, struct ArrowArrayStream* out); +/// \brief Export C++ ChunkedArray using the C data interface format. +/// +/// The resulting ArrowArrayStream struct keeps the chunked array data and buffers alive +/// until its release callback is called by the consumer. +/// +/// \param[in] chunked_array ChunkedArray object to export +/// \param[out] out C struct where to export the stream +ARROW_EXPORT +Status ExportChunkedArray(std::shared_ptr chunked_array, + struct ArrowArrayStream* out); + /// \brief Import C++ RecordBatchReader from the C stream interface. /// /// The ArrowArrayStream struct has its contents moved to a private object @@ -313,6 +324,17 @@ ARROW_EXPORT Result> ImportRecordBatchReader( struct ArrowArrayStream* stream); +/// \brief Import C++ ChunkedArray from the C stream interface +/// +/// The ArrowArrayStream struct has its contents moved to a private object, +/// is consumed in its entirity, and released before returning all chunks +/// as a ChunkedArray. +/// +/// \param[in,out] stream C stream interface struct +/// \return Imported ChunkedArray object +ARROW_EXPORT +Result> ImportChunkedArray(struct ArrowArrayStream* stream); + /// @} } // namespace arrow diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index b8d5e0fcd3845..dba6e4736b673 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -4400,6 +4400,17 @@ class TestArrayStreamExport : public BaseArrayStreamTest { ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array, expected.schema())); AssertBatchesEqual(expected, *batch); } + + void AssertStreamNext(struct ArrowArrayStream* c_stream, const Array& expected) { + struct ArrowArray c_array; + ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array)); + + ArrayExportGuard guard(&c_array); + ASSERT_FALSE(ArrowArrayIsReleased(&c_array)); + + ASSERT_OK_AND_ASSIGN(auto array, ImportArray(&c_array, expected.type())); + AssertArraysEqual(expected, *array); + } }; TEST_F(TestArrayStreamExport, Empty) { @@ -4495,6 +4506,67 @@ TEST_F(TestArrayStreamExport, Errors) { ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array)); } +TEST_F(TestArrayStreamExport, ChunkedArrayExportEmpty) { + ASSERT_OK_AND_ASSIGN(auto chunked_array, ChunkedArray::Make({}, int32())); + + struct ArrowArrayStream c_stream; + struct ArrowSchema c_schema; + + ASSERT_OK(ExportChunkedArray(chunked_array, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + { + ArrayStreamExportGuard guard(&c_stream); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + AssertStreamEnd(&c_stream); + } + + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto got_type, ImportType(&c_schema)); + AssertTypeEqual(*chunked_array->type(), *got_type); + } +} + +TEST_F(TestArrayStreamExport, ChunkedArrayExport) { + ASSERT_OK_AND_ASSIGN(auto chunked_array, + ChunkedArray::Make({ArrayFromJSON(int32(), "[1, 2]"), + ArrayFromJSON(int32(), "[4, 5, null]")})); + + struct ArrowArrayStream c_stream; + struct ArrowSchema c_schema; + struct ArrowArray c_array0, c_array1; + + ASSERT_OK(ExportChunkedArray(chunked_array, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + { + ArrayStreamExportGuard guard(&c_stream); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array0)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array1)); + AssertStreamEnd(&c_stream); + } + + ArrayExportGuard guard0(&c_array0), guard1(&c_array1); + + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto got_type, ImportType(&c_schema)); + AssertTypeEqual(*chunked_array->type(), *got_type); + } + + ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); + ASSERT_OK_AND_ASSIGN(auto array, ImportArray(&c_array0, chunked_array->type())); + AssertArraysEqual(*chunked_array->chunk(0), *array); + ASSERT_OK_AND_ASSIGN(array, ImportArray(&c_array1, chunked_array->type())); + AssertArraysEqual(*chunked_array->chunk(1), *array); +} + //////////////////////////////////////////////////////////////////////////// // Array stream roundtrip tests @@ -4534,6 +4606,29 @@ class TestArrayStreamRoundtrip : public BaseArrayStreamTest { ASSERT_TRUE(weak_reader.expired()); } + void Roundtrip(std::shared_ptr src, + std::function&)> check_func) { + ArrowArrayStream c_stream; + + // One original copy which to compare the result, one copy held by the stream + std::weak_ptr weak_src(src); + int64_t initial_use_count = weak_src.use_count(); + + ASSERT_OK(ExportChunkedArray(std::move(src), &c_stream)); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + { + ASSERT_OK_AND_ASSIGN(auto dst, ImportChunkedArray(&c_stream)); + // Stream was moved, consumed, and released + ASSERT_TRUE(ArrowArrayStreamIsReleased(&c_stream)); + + // Stream was released by ImportChunkedArray but original copy remains + ASSERT_EQ(weak_src.use_count(), initial_use_count - 1); + + check_func(dst); + } + } + void AssertReaderNext(const std::shared_ptr& reader, const RecordBatch& expected) { ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); @@ -4631,4 +4726,24 @@ TEST_F(TestArrayStreamRoundtrip, SchemaError) { ASSERT_TRUE(state.released); } +TEST_F(TestArrayStreamRoundtrip, ChunkedArrayRoundtrip) { + ASSERT_OK_AND_ASSIGN(auto src, + ChunkedArray::Make({ArrayFromJSON(int32(), "[1, 2]"), + ArrayFromJSON(int32(), "[4, 5, null]")})); + + Roundtrip(src, [&](const std::shared_ptr& dst) { + AssertTypeEqual(*dst->type(), *src->type()); + AssertChunkedEqual(*dst, *src); + }); +} + +TEST_F(TestArrayStreamRoundtrip, ChunkedArrayRoundtripEmpty) { + ASSERT_OK_AND_ASSIGN(auto src, ChunkedArray::Make({}, int32())); + + Roundtrip(src, [&](const std::shared_ptr& dst) { + AssertTypeEqual(*dst->type(), *src->type()); + AssertChunkedEqual(*dst, *src); + }); +} + } // namespace arrow