diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index 722026ccd9b66..1cbebfd935efb 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -144,7 +144,8 @@ struct RangeCacheEntry { }; struct ReadRangeCache::Impl { - std::shared_ptr file; + std::shared_ptr owned_file; + RandomAccessFile* file; IOContext ctx; CacheOptions options; @@ -289,10 +290,12 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { } }; -ReadRangeCache::ReadRangeCache(std::shared_ptr file, IOContext ctx, +ReadRangeCache::ReadRangeCache(std::shared_ptr owned_file, + RandomAccessFile* file, IOContext ctx, CacheOptions options) : impl_(options.lazy ? new LazyImpl() : new Impl()) { - impl_->file = std::move(file); + impl_->owned_file = std::move(owned_file); + impl_->file = file; impl_->ctx = std::move(ctx); impl_->options = options; } diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index 59a9b60e82f7a..9f047fd62fb92 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -104,11 +104,17 @@ class ARROW_EXPORT ReadRangeCache { /// Construct a read cache with default explicit ReadRangeCache(std::shared_ptr file, IOContext ctx) - : ReadRangeCache(file, std::move(ctx), CacheOptions::Defaults()) {} + : ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::Defaults()) {} /// Construct a read cache with given options explicit ReadRangeCache(std::shared_ptr file, IOContext ctx, - CacheOptions options); + CacheOptions options) + : ReadRangeCache(file, file.get(), ctx, options) {} + + /// Construct a read cache with an unowned file + ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options) + : ReadRangeCache(NULLPTR, file, ctx, options) {} + ~ReadRangeCache(); /// \brief Cache the given ranges in the background. @@ -130,6 +136,9 @@ class ARROW_EXPORT ReadRangeCache { struct Impl; struct LazyImpl; + ReadRangeCache(std::shared_ptr owned_file, RandomAccessFile* file, + IOContext ctx, CacheOptions options); + std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index c256e42c3d02e..fc7e8b8c00f58 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -311,6 +311,50 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length, return Status::OK(); } +Result> ReadMessage(std::shared_ptr metadata, + std::shared_ptr body) { + std::unique_ptr result; + auto listener = std::make_shared(&result); + // If the user does not pass in a body buffer then we assume they are skipping it + MessageDecoder decoder(listener, default_memory_pool(), body == nullptr); + + if (metadata->size() < decoder.next_required_size()) { + return Status::Invalid("metadata_length should be at least ", + decoder.next_required_size()); + } + + ARROW_RETURN_NOT_OK(decoder.Consume(metadata)); + + switch (decoder.state()) { + case MessageDecoder::State::INITIAL: + // Metadata did not request a body so we better not have provided one + DCHECK_EQ(body, nullptr); + return std::move(result); + case MessageDecoder::State::METADATA_LENGTH: + return Status::Invalid("metadata length is missing from the metadata buffer"); + case MessageDecoder::State::METADATA: + return Status::Invalid("flatbuffer size ", decoder.next_required_size(), + " invalid. Buffer size: ", metadata->size()); + case MessageDecoder::State::BODY: { + if (body == nullptr) { + // Caller didn't give a body so just give them a message without body + return std::move(result); + } + if (body->size() != decoder.next_required_size()) { + return Status::IOError("Expected body buffer to be ", + decoder.next_required_size(), + " bytes for message body, got ", body->size()); + } + RETURN_NOT_OK(decoder.Consume(body)); + return std::move(result); + } + case MessageDecoder::State::EOS: + return Status::Invalid("Unexpected empty message in IPC file format"); + default: + return Status::Invalid("Unexpected state: ", decoder.state()); + } +} + Result> ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { @@ -560,14 +604,15 @@ class MessageDecoder::MessageDecoderImpl { public: explicit MessageDecoderImpl(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool) + MemoryPool* pool, bool skip_body) : listener_(std::move(listener)), pool_(pool), state_(initial_state), next_required_size_(initial_next_required_size), chunks_(), buffered_size_(0), - metadata_(nullptr) {} + metadata_(nullptr), + skip_body_(skip_body) {} Status ConsumeData(const uint8_t* data, int64_t size) { if (buffered_size_ == 0) { @@ -798,7 +843,7 @@ class MessageDecoder::MessageDecoderImpl { RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata_, &body_length)); state_ = State::BODY; - next_required_size_ = body_length; + next_required_size_ = skip_body_ ? 0 : body_length; RETURN_NOT_OK(listener_->OnBody()); if (next_required_size_ == 0) { ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_)); @@ -894,19 +939,21 @@ class MessageDecoder::MessageDecoderImpl { std::vector> chunks_; int64_t buffered_size_; std::shared_ptr metadata_; // Must be CPU buffer + bool skip_body_; }; MessageDecoder::MessageDecoder(std::shared_ptr listener, - MemoryPool* pool) { + MemoryPool* pool, bool skip_body) { impl_.reset(new MessageDecoderImpl(std::move(listener), State::INITIAL, - kMessageDecoderNextRequiredSizeInitial, pool)); + kMessageDecoderNextRequiredSizeInitial, pool, + skip_body)); } MessageDecoder::MessageDecoder(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool) { + MemoryPool* pool, bool skip_body) { impl_.reset(new MessageDecoderImpl(std::move(listener), initial_state, - initial_next_required_size, pool)); + initial_next_required_size, pool, skip_body)); } MessageDecoder::~MessageDecoder() {} diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 9c0ed8ced2ebd..1cd72ce993ed2 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -266,9 +266,11 @@ class ARROW_EXPORT MessageDecoder { /// \param[in] listener a MessageDecoderListener that responds events from /// the decoder /// \param[in] pool an optional MemoryPool to copy metadata on the + /// \param[in] skip_body if true the body will be skipped even if the message has a body /// CPU, if required explicit MessageDecoder(std::shared_ptr listener, - MemoryPool* pool = default_memory_pool()); + MemoryPool* pool = default_memory_pool(), + bool skip_body = false); /// \brief Construct a message decoder with the specified state. /// @@ -282,9 +284,10 @@ class ARROW_EXPORT MessageDecoder { /// to run the next action /// \param[in] pool an optional MemoryPool to copy metadata on the /// CPU, if required + /// \param[in] skip_body if true the body will be skipped even if the message has a body MessageDecoder(std::shared_ptr listener, State initial_state, int64_t initial_next_required_size, - MemoryPool* pool = default_memory_pool()); + MemoryPool* pool = default_memory_pool(), bool skip_body = false); virtual ~MessageDecoder(); @@ -466,6 +469,25 @@ Result> ReadMessage( const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); +/// \brief Read encapsulated RPC message from cached buffers +/// +/// The buffers should contain an entire message. Partial reads are not handled. +/// +/// This method can be used to read just the metadata by passing in a nullptr for the +/// body. The body will then be skipped and the body size will not be validated. +/// +/// If the body buffer is provided then it must be the complete body buffer +/// +/// This is similar to Message::Open but performs slightly more validation (e.g. checks +/// to see that the metadata length is correct and that the body is the size the metadata +/// expected) +/// +/// \param metadata The bytes for the metadata +/// \param body The bytes for the body +/// \return The message represented by the buffers +ARROW_EXPORT Result> ReadMessage( + std::shared_ptr metadata, std::shared_ptr body); + ARROW_EXPORT Future> ReadMessageAsync( const int64_t offset, const int32_t metadata_length, const int64_t body_length, diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 1dbfc5d1473d8..396344216f1f0 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -20,6 +20,7 @@ #include #include +#include "arrow/io/caching.h" #include "arrow/ipc/type_fwd.h" #include "arrow/status.h" #include "arrow/type_fwd.h" @@ -148,6 +149,11 @@ struct ARROW_EXPORT IpcReadOptions { /// RecordBatchStreamReader and StreamDecoder classes. bool ensure_native_endian = true; + /// \brief Options to control caching behavior when pre-buffering is requested + /// + /// The lazy property will always be reset to true to deliver the expected behavior + io::CacheOptions pre_buffer_cache_options = io::CacheOptions::LazyDefaults(); + static IpcReadOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ee54cdaa2a0ae..a30cb40046edf 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -2645,33 +2646,43 @@ TEST(IoRecordedRandomAccessFile, ReadWithCurrentPosition) { ASSERT_EQ(file.GetReadRanges()[0], (io::ReadRange{0, 20})); } -Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr* out) { - // Make the schema +std::shared_ptr MakeBooleanInt32Int64Schema() { auto f0 = field("f0", boolean()); auto f1 = field("f1", int32()); auto f2 = field("f2", int64()); - auto schema = ::arrow::schema({f0, f1, f2}); + return ::arrow::schema({f0, f1, f2}); +} +Status MakeBooleanInt32Int64Batch(const int length, std::shared_ptr* out) { + auto schema_ = MakeBooleanInt32Int64Schema(); std::shared_ptr a0, a1, a2; RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a0)); RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &a1)); RETURN_NOT_OK(MakeRandomInt64Array(length, false, arrow::default_memory_pool(), &a2)); - *out = RecordBatch::Make(schema, length, {a0, a1, a2}); + *out = RecordBatch::Make(std::move(schema_), length, {a0, a1, a2}); return Status::OK(); } +std::shared_ptr MakeBooleanInt32Int64File(int num_rows, int num_batches) { + auto schema_ = MakeBooleanInt32Int64Schema(); + EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0)); + EXPECT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), schema_)); + + std::shared_ptr batch; + for (int i = 0; i < num_batches; i++) { + ARROW_EXPECT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch)); + ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch)); + } + + ARROW_EXPECT_OK(writer->Close()); + EXPECT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + return buffer; +} + void GetReadRecordBatchReadRanges( uint32_t num_rows, const std::vector& included_fields, const std::vector& expected_body_read_lengths) { - std::shared_ptr batch; - // [bool, int32, int64] batch - ASSERT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch)); - - ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(0)); - ASSERT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); - ASSERT_OK(writer->WriteRecordBatch(*batch)); - ASSERT_OK(writer->Close()); - ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + auto buffer = MakeBooleanInt32Int64File(num_rows, /*num_batches=*/1); io::BufferReader buffer_reader(buffer); TrackedRandomAccessFile tracked(&buffer_reader); @@ -2770,6 +2781,180 @@ TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) { GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4}); } +constexpr static int kNumBatches = 10; +// It can be difficult to know the exact size of the schema. Instead we just make the +// row data big enough that we can easily identify if a read is for a schema or for +// row data. +// +// This needs to be large enough to space record batches kDefaultHoleSizeLimit bytes apart +// and also large enough that record batch data is more than kMaxMetadataSizeBytes bytes +constexpr static int kRowsPerBatch = 1000; +constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13; +// There are always 2 reads when the file is opened +constexpr static int kNumReadsOnOpen = 2; + +class PreBufferingTest : public ::testing::TestWithParam { + protected: + void SetUp() override { + file_buffer_ = MakeBooleanInt32Int64File(kRowsPerBatch, kNumBatches); + } + + void OpenReader() { + buffer_reader_ = std::make_shared(file_buffer_); + tracked_ = std::make_shared(buffer_reader_.get()); + auto read_options = IpcReadOptions::Defaults(); + if (ReadsArePlugged()) { + // This will ensure that all reads get globbed together into one large read + read_options.pre_buffer_cache_options.hole_size_limit = + std::numeric_limits::max() - 1; + read_options.pre_buffer_cache_options.range_size_limit = + std::numeric_limits::max(); + } + ASSERT_OK_AND_ASSIGN(reader_, RecordBatchFileReader::Open(tracked_, read_options)); + } + + bool ReadsArePlugged() { return GetParam(); } + + std::vector AllBatchIndices() { + std::vector all_batch_indices(kNumBatches); + std::iota(all_batch_indices.begin(), all_batch_indices.end(), 0); + return all_batch_indices; + } + + void AssertMetadataLoaded(std::vector batch_indices) { + if (batch_indices.size() == 0) { + batch_indices = AllBatchIndices(); + } + const auto& read_ranges = tracked_->get_read_ranges(); + if (ReadsArePlugged()) { + // The read should have arrived as one large read + ASSERT_EQ(kNumReadsOnOpen + 1, read_ranges.size()); + if (batch_indices.size() > 1) { + ASSERT_GT(read_ranges[kNumReadsOnOpen].length, kMaxMetadataSizeBytes); + } + } else { + // We should get many small reads of metadata only + ASSERT_EQ(batch_indices.size() + kNumReadsOnOpen, read_ranges.size()); + for (const auto& read_range : read_ranges) { + ASSERT_LT(read_range.length, kMaxMetadataSizeBytes); + } + } + } + + std::vector> LoadExpected() { + auto buffer_reader = std::make_shared(file_buffer_); + auto read_options = IpcReadOptions::Defaults(); + EXPECT_OK_AND_ASSIGN(auto reader, + RecordBatchFileReader::Open(buffer_reader.get(), read_options)); + std::vector> expected_batches; + for (int i = 0; i < reader->num_record_batches(); i++) { + EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i)); + expected_batches.push_back(expected_batch); + } + return expected_batches; + } + + void CheckFileRead(int num_indices_pre_buffered) { + auto expected_batches = LoadExpected(); + const std::vector& read_ranges = tracked_->get_read_ranges(); + std::size_t starting_reads = read_ranges.size(); + for (int i = 0; i < reader_->num_record_batches(); i++) { + ASSERT_OK_AND_ASSIGN(auto next_batch, reader_->ReadRecordBatch(i)); + AssertBatchesEqual(*expected_batches[i], *next_batch); + } + int metadata_reads = 0; + int data_reads = 0; + for (std::size_t i = starting_reads; i < read_ranges.size(); i++) { + if (read_ranges[i].length > kMaxMetadataSizeBytes) { + data_reads++; + } else { + metadata_reads++; + } + } + ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered); + ASSERT_EQ(data_reads, reader_->num_record_batches()); + } + + std::vector> batches_; + std::shared_ptr file_buffer_; + std::shared_ptr buffer_reader_; + std::shared_ptr tracked_; + std::shared_ptr reader_; +}; + +TEST_P(PreBufferingTest, MetadataOnlyAllBatches) { + OpenReader(); + // Should pre_buffer all metadata + ASSERT_OK(reader_->PreBufferMetadata({})); + AssertMetadataLoaded({}); + CheckFileRead(kNumBatches); +} + +TEST_P(PreBufferingTest, MetadataOnlySomeBatches) { + OpenReader(); + // Should pre_buffer all metadata + ASSERT_OK(reader_->PreBufferMetadata({1, 2, 3})); + AssertMetadataLoaded({1, 2, 3}); + CheckFileRead(3); +} + +INSTANTIATE_TEST_SUITE_P(PreBufferingTests, PreBufferingTest, + ::testing::Values(false, true), + [](const ::testing::TestParamInfo& info) { + if (info.param) { + return "plugged"; + } else { + return "not_plugged"; + } + }); + +Result> MakeBatchWithDictionaries(const int length) { + auto dict_type = dictionary(int32(), int32()); + auto schema_ = ::arrow::schema( + {::arrow::field("i32", int32()), ::arrow::field("i32d", dict_type)}); + std::shared_ptr i32, i32d_values, i32d_indices; + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &i32)); + RETURN_NOT_OK( + MakeRandomInt32Array(length, false, arrow::default_memory_pool(), &i32d_values)); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, arrow::default_memory_pool(), + &i32d_indices, 0, 0, length)); + std::shared_ptr i32d = + std::make_shared(dict_type, i32d_indices, i32d_values); + return RecordBatch::Make(std::move(schema_), length, {i32, i32d}); +} + +Result> MakeFileWithDictionaries( + const std::unique_ptr& tempdir, int rows_per_batch, int num_batches) { + EXPECT_OK_AND_ASSIGN(auto temppath, tempdir->path().Join("testfile")); + EXPECT_OK_AND_ASSIGN(auto batch, MakeBatchWithDictionaries(rows_per_batch)); + EXPECT_OK_AND_ASSIGN(auto sink, io::FileOutputStream::Open(temppath.ToString())); + EXPECT_OK_AND_ASSIGN(auto writer, MakeFileWriter(sink.get(), batch->schema())); + + for (int i = 0; i < num_batches; i++) { + ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch)); + } + + ARROW_EXPECT_OK(writer->Close()); + ARROW_EXPECT_OK(sink->Close()); + return io::ReadableFile::Open(temppath.ToString()); +} + +TEST(PreBuffering, MixedAccess) { + ASSERT_OK_AND_ASSIGN(auto tempdir, TemporaryDir::Make("arrow-ipc-read-write-test-")); + ASSERT_OK_AND_ASSIGN(auto readable_file, MakeFileWithDictionaries(tempdir, 50, 2)); + auto read_options = IpcReadOptions::Defaults(); + ASSERT_OK_AND_ASSIGN(auto reader, + RecordBatchFileReader::Open(readable_file, read_options)); + ASSERT_OK(reader->PreBufferMetadata({0})); + ASSERT_OK_AND_ASSIGN(auto batch, reader->ReadRecordBatch(1)); + ASSERT_EQ(50, batch->num_rows()); + ASSERT_OK_AND_ASSIGN(batch, reader->ReadRecordBatch(0)); + ASSERT_EQ(50, batch->num_rows()); + auto stats = reader->stats(); + ASSERT_EQ(1, stats.num_dictionary_batches); + ASSERT_EQ(2, stats.num_record_batches); +} + } // namespace test } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index c12254d271d03..22e30d7611828 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -18,11 +18,13 @@ #include "arrow/ipc/reader.h" #include -#include #include #include +#include #include #include +#include +#include #include #include @@ -37,7 +39,6 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata_internal.h" #include "arrow/ipc/reader_internal.h" -#include "arrow/ipc/util.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/sparse_tensor.h" @@ -51,6 +52,7 @@ #include "arrow/util/endian.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/util/parallel.h" #include "arrow/util/string.h" #include "arrow/util/thread_pool.h" @@ -137,6 +139,29 @@ struct IpcReadContext { const bool swap_endian; }; +/// A collection of ranges to read and pointers to set to those ranges when they are +/// available. This allows the ArrayLoader to utilize a two pass cache-then-read +/// strategy with a ReadRangeCache +class BatchDataReadRequest { + public: + const std::vector& ranges_to_read() const { return ranges_to_read_; } + + void RequestRange(int64_t offset, int64_t length, std::shared_ptr* out) { + ranges_to_read_.push_back({offset, length}); + destinations_.push_back(out); + } + + void FulfillRequest(const std::vector>& buffers) { + for (std::size_t i = 0; i < buffers.size(); i++) { + *destinations_[i] = buffers[i]; + } + } + + private: + std::vector ranges_to_read_; + std::vector*> destinations_; +}; + /// The field_index and buffer_index are incremented based on how much of the /// batch is "consumed" (through nested data reconstruction, for example) class ArrayLoader { @@ -147,6 +172,16 @@ class ArrayLoader { : metadata_(metadata), metadata_version_(metadata_version), file_(file), + file_offset_(0), + max_recursion_depth_(options.max_recursion_depth) {} + + explicit ArrayLoader(const flatbuf::RecordBatch* metadata, + MetadataVersion metadata_version, const IpcReadOptions& options, + int64_t file_offset) + : metadata_(metadata), + metadata_version_(metadata_version), + file_(nullptr), + file_offset_(file_offset), max_recursion_depth_(options.max_recursion_depth) {} Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr* out) { @@ -164,7 +199,12 @@ class ArrayLoader { return Status::Invalid("Buffer ", buffer_index_, " did not start on 8-byte aligned offset: ", offset); } - return file_->ReadAt(offset, length).Value(out); + if (file_) { + return file_->ReadAt(offset, length).Value(out); + } else { + read_request_.RequestRange(offset + file_offset_, length, out); + return Status::OK(); + } } Status LoadType(const DataType& type) { return VisitTypeInline(type, this); } @@ -384,17 +424,21 @@ class ArrayLoader { Status Visit(const ExtensionType& type) { return LoadType(*type.storage_type()); } + BatchDataReadRequest& read_request() { return read_request_; } + private: const flatbuf::RecordBatch* metadata_; const MetadataVersion metadata_version_; io::RandomAccessFile* file_; + int64_t file_offset_; int max_recursion_depth_; int buffer_index_ = 0; int field_index_ = 0; bool skip_io_ = false; - const Field* field_; - ArrayData* out_; + BatchDataReadRequest read_request_; + const Field* field_ = nullptr; + ArrayData* out_ = nullptr; }; Result> DecompressBuffer(const std::shared_ptr& buf, @@ -533,7 +577,8 @@ Result> LoadRecordBatch( if (inclusion_mask.size() > 0) { return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context, file); } else { - return LoadRecordBatchSubset(metadata, schema, /*param_name=*/nullptr, context, file); + return LoadRecordBatchSubset(metadata, schema, /*inclusion_mask=*/nullptr, context, + file); } } @@ -968,15 +1013,19 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } -static Result> ReadMessageFromBlock( - const FileBlock& block, io::RandomAccessFile* file, - const FieldsLoaderFunction& fields_loader) { +Status CheckAligned(const FileBlock& block) { if (!bit_util::IsMultipleOf8(block.offset) || !bit_util::IsMultipleOf8(block.metadata_length) || !bit_util::IsMultipleOf8(block.body_length)) { return Status::Invalid("Unaligned block in IPC file"); } + return Status::OK(); +} +static Result> ReadMessageFromBlock( + const FileBlock& block, io::RandomAccessFile* file, + const FieldsLoaderFunction& fields_loader) { + RETURN_NOT_OK(CheckAligned(block)); // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); @@ -1018,11 +1067,11 @@ class RecordBatchFileReaderImpl; /// A generator of record batches. /// /// All batches are yielded in order. -class ARROW_EXPORT IpcFileRecordBatchGenerator { +class ARROW_EXPORT WholeIpcFileRecordBatchGenerator { public: using Item = std::shared_ptr; - explicit IpcFileRecordBatchGenerator( + explicit WholeIpcFileRecordBatchGenerator( std::shared_ptr state, std::shared_ptr cached_source, const io::IOContext& io_context, arrow::internal::Executor* executor) @@ -1051,6 +1100,25 @@ class ARROW_EXPORT IpcFileRecordBatchGenerator { Future<> read_dictionaries_; }; +/// A generator of record batches for use when reading +/// a subset of columns from the file. +/// +/// All batches are yielded in order. +class ARROW_EXPORT SelectiveIpcFileRecordBatchGenerator { + public: + using Item = std::shared_ptr; + + explicit SelectiveIpcFileRecordBatchGenerator( + std::shared_ptr state) + : state_(std::move(state)), index_(0) {} + + Future operator()(); + + private: + std::shared_ptr state_; + int index_; +}; + class RecordBatchFileReaderImpl : public RecordBatchFileReader { public: RecordBatchFileReaderImpl() : file_(NULLPTR), footer_offset_(0), footer_(NULLPTR) {} @@ -1088,15 +1156,31 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::OK(); } + Future> ReadRecordBatchAsync(int i) { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + + auto cached_metadata = cached_metadata_.find(i); + if (cached_metadata != cached_metadata_.end()) { + return ReadCachedRecordBatch(i, cached_metadata->second); + } + + return Status::Invalid( + "Asynchronous record batch reading is only supported after a call to " + "PreBufferMetadata or PreBufferBatches"); + } + Result> ReadRecordBatch(int i) override { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); - if (!read_dictionaries_) { - RETURN_NOT_OK(ReadDictionaries()); - read_dictionaries_ = true; + auto cached_metadata = cached_metadata_.find(i); + if (cached_metadata != cached_metadata_.end()) { + return ReadCachedRecordBatch(i, cached_metadata->second).result(); } + RETURN_NOT_OK(WaitForDictionaryReadFinished()); + FieldsLoaderFunction fields_loader = {}; if (!field_inclusion_mask_.empty()) { auto& schema = schema_; @@ -1143,11 +1227,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Status Open(const std::shared_ptr& file, int64_t footer_offset, const IpcReadOptions& options) { owned_file_ = file; + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); return Open(file.get(), footer_offset, options); } Status Open(io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) { + // The metadata_cache_ may have already been constructed with an owned file in the + // owning overload of Open + if (!metadata_cache_) { + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); + } file_ = file; options_ = options; footer_offset_ = footer_offset; @@ -1164,11 +1256,19 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { Future<> OpenAsync(const std::shared_ptr& file, int64_t footer_offset, const IpcReadOptions& options) { owned_file_ = file; + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); return OpenAsync(file.get(), footer_offset, options); } Future<> OpenAsync(io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& options) { + // The metadata_cache_ may have already been constructed with an owned file in the + // owning overload of OpenAsync + if (!metadata_cache_) { + metadata_cache_ = std::make_shared( + file, file->io_context(), options.pre_buffer_cache_options); + } file_ = file; options_ = options; footer_offset_ = footer_offset; @@ -1195,34 +1295,65 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { const io::CacheOptions cache_options, arrow::internal::Executor* executor) override { auto state = std::dynamic_pointer_cast(shared_from_this()); + // Prebuffering causes us to use a lot of futures which, at the moment, + // can only slow things down when we are doing zero-copy in-memory reads. + // + // Prebuffering's read patterns are also slightly worse than the alternative + // when doing whole-file reads because the logic is not in place to recognize + // we can just read the entire file up-front + if (options_.included_fields.size() != 0 && + options_.included_fields.size() != schema_->fields().size() && + !file_->supports_zero_copy()) { + RETURN_NOT_OK(state->PreBufferMetadata({})); + return SelectiveIpcFileRecordBatchGenerator(std::move(state)); + } + std::shared_ptr cached_source; - if (coalesce) { + if (coalesce && file_->supports_zero_copy()) { if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file"); - cached_source = std::make_shared( - owned_file_, io_context, cache_options); - auto num_dictionaries = this->num_dictionaries(); - auto num_record_batches = this->num_record_batches(); - std::vector ranges(num_dictionaries + num_record_batches); - for (int i = 0; i < num_dictionaries; i++) { - auto block = FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - ranges[i].offset = block.offset; - ranges[i].length = block.metadata_length + block.body_length; - } - for (int i = 0; i < num_record_batches; i++) { - auto block = FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - ranges[num_dictionaries + i].offset = block.offset; - ranges[num_dictionaries + i].length = block.metadata_length + block.body_length; - } - RETURN_NOT_OK(cached_source->Cache(std::move(ranges))); + // Since the user is asking for all fields then we can cache the entire + // file (up to the footer) + return cached_source->Cache({{0, footer_offset_}}); + } + return WholeIpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), + io_context, executor); + } + + Status DoPreBufferMetadata(const std::vector& indices) { + RETURN_NOT_OK(CacheMetadata(indices)); + EnsureDictionaryReadStarted(); + Future<> all_metadata_ready = WaitForMetadatas(indices); + for (int index : indices) { + Future> metadata_loaded = + all_metadata_ready.Then([this, index]() -> Result> { + ++stats_.num_messages; + FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr metadata, + metadata_cache_->Read({block.offset, block.metadata_length})); + return ReadMessage(std::move(metadata), nullptr); + }); + cached_metadata_.emplace(index, metadata_loaded); + } + return Status::OK(); + } + + std::vector AllIndices() const { + std::vector all_indices(num_record_batches()); + std::iota(all_indices.begin(), all_indices.end(), 0); + return all_indices; + } + + Status PreBufferMetadata(const std::vector& indices) override { + if (indices.size() == 0) { + return DoPreBufferMetadata(AllIndices()); + } else { + return DoPreBufferMetadata(indices); } - return IpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), - io_context, executor); } private: - friend AsyncGenerator> MakeMessageGenerator( - std::shared_ptr, const io::IOContext&); - friend class IpcFileRecordBatchGenerator; + friend class WholeIpcFileRecordBatchGenerator; FileBlock GetRecordBatchBlock(int i) const { return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); @@ -1251,6 +1382,219 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::OK(); } + void AddDictionaryRanges(std::vector* ranges) const { + // Adds all dictionaries to the range cache + for (int i = 0; i < num_dictionaries(); ++i) { + FileBlock block = GetDictionaryBlock(i); + ranges->push_back({block.offset, block.metadata_length + block.body_length}); + } + } + + void AddMetadataRanges(const std::vector& indices, + std::vector* ranges) { + for (int index : indices) { + FileBlock block = GetRecordBatchBlock(static_cast(index)); + ranges->push_back({block.offset, block.metadata_length}); + } + } + + Status CacheMetadata(const std::vector& indices) { + std::vector ranges; + if (!read_dictionaries_) { + AddDictionaryRanges(&ranges); + } + AddMetadataRanges(indices, &ranges); + return metadata_cache_->Cache(std::move(ranges)); + } + + void EnsureDictionaryReadStarted() { + if (!dictionary_load_finished_.is_valid()) { + read_dictionaries_ = true; + std::vector ranges; + AddDictionaryRanges(&ranges); + dictionary_load_finished_ = + metadata_cache_->WaitFor(std::move(ranges)).Then([this] { + return ReadDictionaries(); + }); + } + } + + Status WaitForDictionaryReadFinished() { + if (!read_dictionaries_) { + RETURN_NOT_OK(ReadDictionaries()); + read_dictionaries_ = true; + return Status::OK(); + } + if (dictionary_load_finished_.is_valid()) { + return dictionary_load_finished_.status(); + } + // Dictionaries were previously loaded synchronously + return Status::OK(); + } + + Future<> WaitForMetadatas(const std::vector& indices) { + std::vector ranges; + AddMetadataRanges(indices, &ranges); + return metadata_cache_->WaitFor(std::move(ranges)); + } + + Result GetIpcReadContext(const flatbuf::Message* message, + const flatbuf::RecordBatch* batch) { + IpcReadContext context(&dictionary_memo_, options_, swap_endian_); + Compression::type compression; + RETURN_NOT_OK(GetCompression(batch, &compression)); + if (context.compression == Compression::UNCOMPRESSED && + message->version() == flatbuf::MetadataVersion::V4) { + // Possibly obtain codec information from experimental serialization format + // in 0.17.x + RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + } + context.compression = compression; + context.metadata_version = internal::GetMetadataVersion(message->version()); + return std::move(context); + } + + Result GetBatchFromMessage( + const flatbuf::Message* message) { + auto batch = message->header_as_RecordBatch(); + if (batch == nullptr) { + return Status::IOError( + "Header-type of flatbuffer-encoded Message is not RecordBatch."); + } + return batch; + } + + Result GetFlatbufMessage( + const std::shared_ptr& message) { + const Buffer& metadata = *message->metadata(); + const flatbuf::Message* flatbuf_message = nullptr; + RETURN_NOT_OK( + internal::VerifyMessage(metadata.data(), metadata.size(), &flatbuf_message)); + return flatbuf_message; + } + + struct CachedRecordBatchReadContext { + CachedRecordBatchReadContext(std::shared_ptr sch, + const flatbuf::RecordBatch* batch, + IpcReadContext context, io::RandomAccessFile* file, + std::shared_ptr owned_file, + int64_t block_data_offset) + : schema(std::move(sch)), + context(std::move(context)), + file(file), + owned_file(std::move(owned_file)), + loader(batch, context.metadata_version, context.options, block_data_offset), + columns(schema->num_fields()), + cache(file, file->io_context(), io::CacheOptions::LazyDefaults()), + length(batch->length()) {} + + Status CalculateLoadRequest() { + std::shared_ptr out_schema; + RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields, + &inclusion_mask, &out_schema)); + + for (int i = 0; i < schema->num_fields(); ++i) { + const Field& field = *schema->field(i); + if (inclusion_mask.size() == 0 || inclusion_mask[i]) { + // Read field + auto column = std::make_shared(); + RETURN_NOT_OK(loader.Load(&field, column.get())); + if (length != column->length) { + return Status::IOError("Array length did not match record batch length"); + } + columns[i] = std::move(column); + if (inclusion_mask.size() > 0) { + filtered_columns.push_back(columns[i]); + filtered_fields.push_back(schema->field(i)); + } + } else { + // Skip field. This logic must be executed to advance the state of the + // loader to the next field + RETURN_NOT_OK(loader.SkipField(&field)); + } + } + if (inclusion_mask.size() > 0) { + filtered_schema = ::arrow::schema(std::move(filtered_fields), schema->metadata()); + } else { + filtered_schema = schema; + } + return Status::OK(); + } + + Future<> ReadAsync() { + RETURN_NOT_OK(cache.Cache(loader.read_request().ranges_to_read())); + return cache.WaitFor(loader.read_request().ranges_to_read()); + } + + Result> CreateRecordBatch() { + std::vector> buffers; + for (const auto& range_to_read : loader.read_request().ranges_to_read()) { + ARROW_ASSIGN_OR_RAISE(auto buffer, cache.Read(range_to_read)); + buffers.push_back(std::move(buffer)); + } + loader.read_request().FulfillRequest(buffers); + + // Dictionary resolution needs to happen on the unfiltered columns, + // because fields are mapped structurally (by path in the original schema). + RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo, + context.options.memory_pool)); + if (inclusion_mask.size() > 0) { + columns.clear(); + } else { + filtered_columns = std::move(columns); + } + + if (context.compression != Compression::UNCOMPRESSED) { + RETURN_NOT_OK( + DecompressBuffers(context.compression, context.options, &filtered_columns)); + } + + // swap endian in a set of ArrayData if necessary (swap_endian == true) + if (context.swap_endian) { + for (int i = 0; i < static_cast(filtered_columns.size()); ++i) { + ARROW_ASSIGN_OR_RAISE(filtered_columns[i], arrow::internal::SwapEndianArrayData( + filtered_columns[i])); + } + } + return RecordBatch::Make(std::move(filtered_schema), length, + std::move(filtered_columns)); + } + + std::shared_ptr schema; + IpcReadContext context; + io::RandomAccessFile* file; + std::shared_ptr owned_file; + + ArrayLoader loader; + ArrayDataVector columns; + io::internal::ReadRangeCache cache; + int64_t length; + ArrayDataVector filtered_columns; + FieldVector filtered_fields; + std::shared_ptr filtered_schema; + std::vector inclusion_mask; + }; + + Future> ReadCachedRecordBatch( + int index, Future> message_fut) { + ++stats_.num_record_batches; + return dictionary_load_finished_.Then([message_fut] { return message_fut; }) + .Then([this, index](const std::shared_ptr& message_obj) + -> Future> { + FileBlock block = GetRecordBatchBlock(index); + ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj)); + ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message)); + ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch)); + + auto read_context = std::make_shared( + schema_, batch, std::move(context), file_, owned_file_, + block.offset + static_cast(block.metadata_length)); + RETURN_NOT_OK(read_context->CalculateLoadRequest()); + return read_context->ReadAsync().Then( + [read_context] { return read_context->CreateRecordBatch(); }); + }); + } + Status ReadFooter() { auto fut = ReadFooterAsync(/*executor=*/nullptr); return fut.status(); @@ -1341,6 +1685,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { std::shared_ptr out_schema_; ReadStats stats_; + std::shared_ptr metadata_cache_; + std::unordered_set cached_data_blocks_; + Future<> dictionary_load_finished_; + std::unordered_map>> cached_metadata_; + std::unordered_map> cached_data_requests_; bool swap_endian_; }; @@ -1399,7 +1748,17 @@ Future> RecordBatchFileReader::OpenAsync( .Then([=]() -> Result> { return result; }); } -Future IpcFileRecordBatchGenerator::operator()() { +Future +SelectiveIpcFileRecordBatchGenerator::operator()() { + int index = index_++; + if (index >= state_->num_record_batches()) { + return IterationEnd(); + } + return state_->ReadRecordBatchAsync(index); +} + +Future +WholeIpcFileRecordBatchGenerator::operator()() { auto state = state_; if (!read_dictionaries_.is_valid()) { std::vector>> messages(state->num_dictionaries()); @@ -1439,7 +1798,7 @@ Future IpcFileRecordBatchGenerator::operator( }); } -Future> IpcFileRecordBatchGenerator::ReadBlock( +Future> WholeIpcFileRecordBatchGenerator::ReadBlock( const FileBlock& block) { if (cached_source_) { auto cached_source = cached_source_; @@ -1456,7 +1815,7 @@ Future> IpcFileRecordBatchGenerator::ReadBlock( } } -Status IpcFileRecordBatchGenerator::ReadDictionaries( +Status WholeIpcFileRecordBatchGenerator::ReadDictionaries( RecordBatchFileReaderImpl* state, std::vector> dictionary_messages) { IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_); @@ -1466,7 +1825,7 @@ Status IpcFileRecordBatchGenerator::ReadDictionaries( return Status::OK(); } -Result> IpcFileRecordBatchGenerator::ReadRecordBatch( +Result> WholeIpcFileRecordBatchGenerator::ReadRecordBatch( RecordBatchFileReaderImpl* state, Message* message) { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 6f2157557f3b3..4bdbccc5097ab 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -196,6 +196,18 @@ class ARROW_EXPORT RecordBatchFileReader /// \brief Computes the total number of rows in the file. virtual Result CountRows() = 0; + /// \brief Begin loading metadata for the desired batches into memory. + /// + /// This method will also begin loading all dictionaries messages into memory. + /// + /// For a regular file this will immediately begin disk I/O in the background on a + /// thread on the IOContext's thread pool. If the file is memory mapped this will + /// ensure the memory needed for the metadata is paged from disk into memory + /// + /// \param indices Indices of the batches to prefetch + /// If empty then all batches will be prefetched. + virtual Status PreBufferMetadata(const std::vector& indices) = 0; + /// \brief Get a reentrant generator of record batches. /// /// \param[in] coalesce If true, enable I/O coalescing. diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index d7c1d852b88b3..e68a4332d7f20 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -75,11 +75,12 @@ void CompareBatchColumnsDetailed(const RecordBatch& result, const RecordBatch& e } Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, - std::shared_ptr* out, uint32_t seed) { + std::shared_ptr* out, uint32_t seed, int32_t min, + int32_t max) { random::RandomArrayGenerator rand(seed); const double null_probability = include_nulls ? 0.5 : 0.0; - *out = rand.Int32(length, 0, 1000, null_probability); + *out = rand.Int32(length, min, max, null_probability); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index b4c7e31c925e5..28aea00e30f03 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -42,7 +42,8 @@ void CompareBatchColumnsDetailed(const RecordBatch& result, const RecordBatch& e ARROW_TESTING_EXPORT Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, - std::shared_ptr* out, uint32_t seed = 0); + std::shared_ptr* out, uint32_t seed = 0, + int32_t min = 0, int32_t max = 1000); ARROW_TESTING_EXPORT Status MakeRandomInt64Array(int64_t length, bool include_nulls, MemoryPool* pool,