Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-14577: [C++] Enable fine grained IO for async IPC reader #11616

9 changes: 6 additions & 3 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ struct RangeCacheEntry {
};

struct ReadRangeCache::Impl {
std::shared_ptr<RandomAccessFile> file;
std::shared_ptr<RandomAccessFile> owned_file;
RandomAccessFile* file;
IOContext ctx;
CacheOptions options;

Expand Down Expand Up @@ -289,10 +290,12 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
}
};

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file,
RandomAccessFile* file, IOContext ctx,
CacheOptions options)
: impl_(options.lazy ? new LazyImpl() : new Impl()) {
impl_->file = std::move(file);
impl_->owned_file = std::move(owned_file);
impl_->file = file;
impl_->ctx = std::move(ctx);
impl_->options = options;
}
Expand Down
13 changes: 11 additions & 2 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,17 @@ class ARROW_EXPORT ReadRangeCache {

/// Construct a read cache with default
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx)
: ReadRangeCache(file, std::move(ctx), CacheOptions::Defaults()) {}
: ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {}

/// Construct a read cache with given options
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
CacheOptions options);
CacheOptions options)
: ReadRangeCache(file, file.get(), ctx, options) {}

/// Construct a read cache with an unowned file
ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options)
: ReadRangeCache(NULLPTR, file, ctx, options) {}

~ReadRangeCache();

/// \brief Cache the given ranges in the background.
Expand All @@ -130,6 +136,9 @@ class ARROW_EXPORT ReadRangeCache {
struct Impl;
struct LazyImpl;

ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file, RandomAccessFile* file,
IOContext ctx, CacheOptions options);

std::unique_ptr<Impl> impl_;
};

Expand Down
61 changes: 54 additions & 7 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,50 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
return Status::OK();
}

Result<std::unique_ptr<Message>> ReadMessage(std::shared_ptr<Buffer> metadata,
std::shared_ptr<Buffer> body) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
// If the user does not pass in a body buffer then we assume they are skipping it
MessageDecoder decoder(listener, default_memory_pool(), body == nullptr);

if (metadata->size() < decoder.next_required_size()) {
return Status::Invalid("metadata_length should be at least ",
decoder.next_required_size());
}

ARROW_RETURN_NOT_OK(decoder.Consume(metadata));

switch (decoder.state()) {
case MessageDecoder::State::INITIAL:
// Metadata did not request a body so we better not have provided one
DCHECK_EQ(body, nullptr);
return std::move(result);
case MessageDecoder::State::METADATA_LENGTH:
return Status::Invalid("metadata length is missing from the metadata buffer");
case MessageDecoder::State::METADATA:
return Status::Invalid("flatbuffer size ", decoder.next_required_size(),
" invalid. Buffer size: ", metadata->size());
case MessageDecoder::State::BODY: {
if (body == nullptr) {
// Caller didn't give a body so just give them a message without body
return std::move(result);
}
if (body->size() != decoder.next_required_size()) {
return Status::IOError("Expected body buffer to be ",
decoder.next_required_size(),
" bytes for message body, got ", body->size());
}
RETURN_NOT_OK(decoder.Consume(body));
return std::move(result);
}
case MessageDecoder::State::EOS:
return Status::Invalid("Unexpected empty message in IPC file format");
default:
return Status::Invalid("Unexpected state: ", decoder.state());
}
}

Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
Expand Down Expand Up @@ -560,14 +604,15 @@ class MessageDecoder::MessageDecoderImpl {
public:
explicit MessageDecoderImpl(std::shared_ptr<MessageDecoderListener> listener,
State initial_state, int64_t initial_next_required_size,
MemoryPool* pool)
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
buffered_size_(0),
metadata_(nullptr) {}
metadata_(nullptr),
skip_body_(skip_body) {}

Status ConsumeData(const uint8_t* data, int64_t size) {
if (buffered_size_ == 0) {
Expand Down Expand Up @@ -798,7 +843,7 @@ class MessageDecoder::MessageDecoderImpl {
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata_, &body_length));

state_ = State::BODY;
next_required_size_ = body_length;
next_required_size_ = skip_body_ ? 0 : body_length;
RETURN_NOT_OK(listener_->OnBody());
if (next_required_size_ == 0) {
ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_));
Expand Down Expand Up @@ -894,19 +939,21 @@ class MessageDecoder::MessageDecoderImpl {
std::vector<std::shared_ptr<Buffer>> chunks_;
int64_t buffered_size_;
std::shared_ptr<Buffer> metadata_; // Must be CPU buffer
bool skip_body_;
};

MessageDecoder::MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
MemoryPool* pool) {
MemoryPool* pool, bool skip_body) {
impl_.reset(new MessageDecoderImpl(std::move(listener), State::INITIAL,
kMessageDecoderNextRequiredSizeInitial, pool));
kMessageDecoderNextRequiredSizeInitial, pool,
skip_body));
}

MessageDecoder::MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
State initial_state, int64_t initial_next_required_size,
MemoryPool* pool) {
MemoryPool* pool, bool skip_body) {
impl_.reset(new MessageDecoderImpl(std::move(listener), initial_state,
initial_next_required_size, pool));
initial_next_required_size, pool, skip_body));
}

MessageDecoder::~MessageDecoder() {}
Expand Down
26 changes: 24 additions & 2 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,11 @@ class ARROW_EXPORT MessageDecoder {
/// \param[in] listener a MessageDecoderListener that responds events from
/// the decoder
/// \param[in] pool an optional MemoryPool to copy metadata on the
/// \param[in] skip_body if true the body will be skipped even if the message has a body
/// CPU, if required
explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
MemoryPool* pool = default_memory_pool());
MemoryPool* pool = default_memory_pool(),
bool skip_body = false);

/// \brief Construct a message decoder with the specified state.
///
Expand All @@ -282,9 +284,10 @@ class ARROW_EXPORT MessageDecoder {
/// to run the next action
/// \param[in] pool an optional MemoryPool to copy metadata on the
/// CPU, if required
/// \param[in] skip_body if true the body will be skipped even if the message has a body
MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state,
int64_t initial_next_required_size,
MemoryPool* pool = default_memory_pool());
MemoryPool* pool = default_memory_pool(), bool skip_body = false);

virtual ~MessageDecoder();

Expand Down Expand Up @@ -466,6 +469,25 @@ Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});

/// \brief Read encapsulated RPC message from cached buffers
///
/// The buffers should contain an entire message. Partial reads are not handled.
///
/// This method can be used to read just the metadata by passing in a nullptr for the
/// body. The body will then be skipped and the body size will not be validated.
///
/// If the body buffer is provided then it must be the complete body buffer
///
/// This is similar to Message::Open but performs slightly more validation (e.g. checks
/// to see that the metadata length is correct and that the body is the size the metadata
/// expected)
///
/// \param metadata The bytes for the metadata
/// \param body The bytes for the body
/// \return The message represented by the buffers
ARROW_EXPORT Result<std::unique_ptr<Message>> ReadMessage(
std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body);

ARROW_EXPORT
Future<std::shared_ptr<Message>> ReadMessageAsync(
const int64_t offset, const int32_t metadata_length, const int64_t body_length,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <vector>

#include "arrow/io/caching.h"
#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -148,6 +149,11 @@ struct ARROW_EXPORT IpcReadOptions {
/// RecordBatchStreamReader and StreamDecoder classes.
bool ensure_native_endian = true;

/// \brief Options to control caching behavior when pre-buffering is requested
///
/// The lazy property will always be reset to true to deliver the expected behavior
io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults();

static IpcReadOptions Defaults();
};

Expand Down
Loading