Skip to content

Commit

Permalink
ARROW-9761: [C/C++] Add experimental C stream inferface
Browse files Browse the repository at this point in the history
The goal is to have a standardized ABI to communicate streams of homogeneous arrays or record batches (for example for database result sets).

The trickiest part is error reporting.  This proposal tries to strike a compromise between simplicity (an integer error code mapping to errno values) and expressivity (an optional description string for application-specific and context-specific details).

Closes #8052 from pitrou/ARROW-9761-c-array-stream

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou committed Oct 1, 2020
1 parent 991a55f commit 97879eb
Show file tree
Hide file tree
Showing 24 changed files with 1,229 additions and 60 deletions.
38 changes: 38 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,44 @@ struct ArrowArray {
void* private_data;
};

// EXPERIMENTAL: C stream interface

struct ArrowArrayStream {
// Callback to get the stream type
// (will be the same for all arrays in the stream).
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowSchema must be released independently from the stream.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);

// Callback to get the next array
// (if no error and the array is released, the stream has ended)
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowArray must be released independently from the stream.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code.
//
// Return value: pointer to a null-terminated character array describing
// the last error, or NULL if no description is available.
//
// The returned pointer is only valid until the next operation on this stream
// (including release).
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
void (*release)(struct ArrowArrayStream*);

// Opaque producer-specific data
void* private_data;
};

#ifdef __cplusplus
}
#endif
194 changes: 194 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/c/bridge.h"

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <string>
#include <utility>
Expand Down Expand Up @@ -1501,4 +1502,197 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}

//////////////////////////////////////////////////////////////////////////
// C stream export

namespace {

class ExportedArrayStream {
public:
struct PrivateData {
explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)) {}

std::shared_ptr<RecordBatchReader> reader_;
std::string last_error_;

PrivateData() = default;
ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData);
};

explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {}

Status GetSchema(struct ArrowSchema* out_schema) {
return ExportSchema(*reader()->schema(), out_schema);
}

Status GetNext(struct ArrowArray* out_array) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader()->ReadNext(&batch));
if (batch == nullptr) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportRecordBatch(*batch, out_array);
}
}

const char* GetLastError() {
const auto& last_error = private_data()->last_error_;
return last_error.empty() ? nullptr : last_error.c_str();
}

void Release() {
if (ArrowArrayStreamIsReleased(stream_)) {
return;
}
DCHECK_NE(private_data(), nullptr);
delete private_data();

ArrowArrayStreamMarkReleased(stream_);
}

// C-compatible callbacks

static int StaticGetSchema(struct ArrowArrayStream* stream,
struct ArrowSchema* out_schema) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetSchema(out_schema));
}

static int StaticGetNext(struct ArrowArrayStream* stream,
struct ArrowArray* out_array) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetNext(out_array));
}

static void StaticRelease(struct ArrowArrayStream* stream) {
ExportedArrayStream{stream}.Release();
}

static const char* StaticGetLastError(struct ArrowArrayStream* stream) {
return ExportedArrayStream{stream}.GetLastError();
}

private:
int ToCError(const Status& status) {
if (ARROW_PREDICT_TRUE(status.ok())) {
private_data()->last_error_.clear();
return 0;
}
private_data()->last_error_ = status.ToString();
switch (status.code()) {
case StatusCode::IOError:
return EIO;
case StatusCode::NotImplemented:
return ENOSYS;
case StatusCode::OutOfMemory:
return ENOMEM;
default:
return EINVAL; // Fallback for Invalid, TypeError, etc.
}
}

PrivateData* private_data() {
return reinterpret_cast<PrivateData*>(stream_->private_data);
}

const std::shared_ptr<RecordBatchReader>& reader() { return private_data()->reader_; }

struct ArrowArrayStream* stream_;
};

} // namespace

Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out) {
out->get_schema = ExportedArrayStream::StaticGetSchema;
out->get_next = ExportedArrayStream::StaticGetNext;
out->get_last_error = ExportedArrayStream::StaticGetLastError;
out->release = ExportedArrayStream::StaticRelease;
out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
return Status::OK();
}

//////////////////////////////////////////////////////////////////////////
// C stream import

namespace {

class ArrayStreamBatchReader : public RecordBatchReader {
public:
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) {
ArrowArrayStreamMove(stream, &stream_);
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() {
ArrowArrayStreamRelease(&stream_);
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

std::shared_ptr<Schema> schema() const override { return CacheSchema(); }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
struct ArrowArray c_array;
RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
if (ArrowArrayIsReleased(&c_array)) {
// End of stream
batch->reset();
return Status::OK();
} else {
return ImportRecordBatch(&c_array, CacheSchema()).Value(batch);
}
}

private:
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
struct ArrowSchema c_schema;
ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, &c_schema)));
schema_ = ImportSchema(&c_schema).ValueOrDie();
}
return schema_;
}

Status StatusFromCError(int errno_like) const {
if (ARROW_PREDICT_TRUE(errno_like == 0)) {
return Status::OK();
}
StatusCode code;
switch (errno_like) {
case EDOM:
case EINVAL:
case ERANGE:
code = StatusCode::Invalid;
break;
case ENOMEM:
code = StatusCode::OutOfMemory;
break;
case ENOSYS:
code = StatusCode::NotImplemented;
default:
code = StatusCode::IOError;
break;
}
const char* last_error = stream_.get_last_error(&stream_);
return Status(code, last_error ? std::string(last_error) : "");
}

mutable struct ArrowArrayStream stream_;
mutable std::shared_ptr<Schema> schema_;
};

} // namespace

Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream) {
if (ArrowArrayStreamIsReleased(stream)) {
return Status::Invalid("Cannot import released ArrowArrayStream");
}
// XXX should we call get_schema() here to avoid crashing on error?
return std::make_shared<ArrayStreamBatchReader>(stream);
}

} // namespace arrow
34 changes: 34 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

namespace arrow {

/// \defgroup c-data-interface Functions for working with the C data interface.
///
/// @{

/// \brief Export C++ DataType using the C data interface format.
///
/// The root type is considered to have empty name and metadata.
Expand Down Expand Up @@ -160,4 +164,34 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
struct ArrowSchema* schema);

/// @}

/// \defgroup c-stream-interface Functions for working with the C data interface.
///
/// @{

/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream interface.
///
/// The resulting ArrowArrayStream struct keeps the record batch reader alive
/// until its release callback is called by the consumer.
///
/// \param[in] reader RecordBatchReader object to export
/// \param[out] out C struct where to export the stream
ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);

/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream interface.
///
/// The ArrowArrayStream struct has its contents moved to a private object
/// held alive by the resulting record batch reader.
///
/// \param[in,out] stream C stream interface struct
/// \return Imported RecordBatchReader object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream);

/// @}

} // namespace arrow
Loading

0 comments on commit 97879eb

Please sign in to comment.