diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 885ab19256065..9039ffb571b9e 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(ARROW_ALTIVEC "Build Arrow with Altivec" ON) - endif() if(NOT ARROW_BUILD_TESTS) diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 0f5a0dc06979c..1339a99aa787e 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) Status BufferReader::Read(int64_t nbytes, std::shared_ptr* out) { int64_t size = std::min(nbytes, size_ - position_); - if (buffer_ != nullptr) { + if (size > 0 && buffer_ != nullptr) { *out = SliceBuffer(buffer_, position_, size); } else { *out = std::make_shared(data_ + position_, size); } - position_ += nbytes; + position_ += size; return Status::OK(); } diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index b7ac5f059749f..c047f53d6bf06 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS json-internal.cc metadata.cc metadata-internal.cc + stream.cc ) if(NOT APPLE) @@ -151,6 +152,7 @@ install(FILES file.h json.h metadata.h + stream.h DESTINATION include/arrow/ipc) # pkg-config support diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 7b4d18c267d43..9da7b3912d4bc 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -49,10 +49,9 @@ namespace ipc { class RecordBatchWriter : public ArrayVisitor { public: - RecordBatchWriter(const std::vector>& columns, int32_t num_rows, - int64_t buffer_start_offset, int max_recursion_depth) - : columns_(columns), - num_rows_(num_rows), + RecordBatchWriter( + const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth) + : batch_(batch), max_recursion_depth_(max_recursion_depth), buffer_start_offset_(buffer_start_offset) {} @@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor { } // Perform depth-first traversal of the row-batch - for (size_t i = 0; i < columns_.size(); ++i) { - RETURN_NOT_OK(VisitArray(*columns_[i].get())); + for (int i = 0; i < batch_.num_columns(); ++i) { + RETURN_NOT_OK(VisitArray(*batch_.column(i))); } // The position for the start of a buffer relative to the passed frame of @@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor { // itself as an int32_t. std::shared_ptr metadata_fb; RETURN_NOT_OK(WriteRecordBatchMetadata( - num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb)); + batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb)); // Need to write 4 bytes (metadata size), the metadata, plus padding to - // fall on an 8-byte offset - int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4); + // end on an 8-byte offset + int64_t start_offset; + RETURN_NOT_OK(dst->Tell(&start_offset)); + + int64_t padded_metadata_length = metadata_fb->size() + 4; + const int remainder = (padded_metadata_length + start_offset) % 8; + if (remainder != 0) { padded_metadata_length += 8 - remainder; } // The returned metadata size includes the length prefix, the flatbuffer, // plus padding *metadata_length = static_cast(padded_metadata_length); - // Write the flatbuffer size prefix - int32_t flatbuffer_size = metadata_fb->size(); + // Write the flatbuffer size prefix including padding + int32_t flatbuffer_size = padded_metadata_length - 4; RETURN_NOT_OK( dst->Write(reinterpret_cast(&flatbuffer_size), sizeof(int32_t))); @@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - // Do not copy this vector. Ownership must be retained elsewhere - const std::vector>& columns_; - int32_t num_rows_; + const RecordBatch& batch_; std::vector field_nodes_; std::vector buffer_meta_; @@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor { int64_t buffer_start_offset_; }; -Status WriteRecordBatch(const std::vector>& columns, - int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) { +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer( - columns, num_rows, buffer_start_offset, max_recursion_depth); + RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth); return serializer.Write(dst, metadata_length, body_length); } -Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { - RecordBatchWriter serializer( - batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth); +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { + RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 963b9ee368537..f9ef7d9fe1202 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64; // // @param(out) body_length: the size of the contiguous buffer block plus // padding bytes -ARROW_EXPORT Status WriteRecordBatch(const std::vector>& columns, - int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, - int max_recursion_depth = kMaxIpcRecursionDepth); - -// int64_t GetRecordBatchMetadata(const RecordBatch* batch); +ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch, + int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth); // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size); +ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index d7d2e613f87db..bc086e31519a5 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -26,6 +26,7 @@ #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" #include "arrow/status.h" @@ -35,82 +36,154 @@ namespace arrow { namespace ipc { static constexpr const char* kArrowMagicBytes = "ARROW1"; - // ---------------------------------------------------------------------- -// Writer implementation +// File footer + +static flatbuffers::Offset> +FileBlocksToFlatbuffer(FBB& fbb, const std::vector& blocks) { + std::vector fb_blocks; -FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr& schema) - : sink_(sink), schema_(schema), position_(-1), started_(false) {} + for (const FileBlock& block : blocks) { + fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); + } -Status FileWriter::UpdatePosition() { - return sink_->Tell(&position_); + return fbb.CreateVectorOfStructs(fb_blocks); } -Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out) { - *out = std::shared_ptr(new FileWriter(sink, schema)); // ctor is private - RETURN_NOT_OK((*out)->UpdatePosition()); - return Status::OK(); +Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, + const std::vector& record_batches, io::OutputStream* out) { + FBB fbb; + + flatbuffers::Offset fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); + + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); + auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); + + auto footer = flatbuf::CreateFooter( + fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + + fbb.Finish(footer); + + int32_t size = fbb.GetSize(); + + return out->Write(fbb.GetBufferPointer(), size); } -Status FileWriter::Write(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(sink_->Write(data, nbytes)); - position_ += nbytes; - return Status::OK(); +static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { + return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); } -Status FileWriter::Align() { - int64_t remainder = PaddedLength(position_) - position_; - if (remainder > 0) { return Write(kPaddingBytes, remainder); } +class FileFooter::FileFooterImpl { + public: + FileFooterImpl(const std::shared_ptr& buffer, const flatbuf::Footer* footer) + : buffer_(buffer), footer_(footer) {} + + int num_dictionaries() const { return footer_->dictionaries()->size(); } + + int num_record_batches() const { return footer_->recordBatches()->size(); } + + MetadataVersion::type version() const { + switch (footer_->version()) { + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; + // Add cases as other versions become available + default: + return MetadataVersion::V2; + } + } + + FileBlock record_batch(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + } + + FileBlock dictionary(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + } + + Status GetSchema(std::shared_ptr* out) const { + auto schema_msg = std::make_shared(nullptr, footer_->schema()); + return schema_msg->GetSchema(out); + } + + private: + // Retain reference to memory + std::shared_ptr buffer_; + + const flatbuf::Footer* footer_; +}; + +FileFooter::FileFooter() {} + +FileFooter::~FileFooter() {} + +Status FileFooter::Open( + const std::shared_ptr& buffer, std::unique_ptr* out) { + const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); + + *out = std::unique_ptr(new FileFooter()); + + // TODO(wesm): Verify the footer + (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); + return Status::OK(); } -Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(Write(data, nbytes)); - return Align(); +int FileFooter::num_dictionaries() const { + return impl_->num_dictionaries(); } -Status FileWriter::Start() { - RETURN_NOT_OK(WriteAligned( - reinterpret_cast(kArrowMagicBytes), strlen(kArrowMagicBytes))); - started_ = true; - return Status::OK(); +int FileFooter::num_record_batches() const { + return impl_->num_record_batches(); } -Status FileWriter::CheckStarted() { - if (!started_) { return Start(); } - return Status::OK(); +MetadataVersion::type FileFooter::version() const { + return impl_->version(); } -Status FileWriter::WriteRecordBatch( - const std::vector>& columns, int32_t num_rows) { - RETURN_NOT_OK(CheckStarted()); - - int64_t offset = position_; +FileBlock FileFooter::record_batch(int i) const { + return impl_->record_batch(i); +} - // There may be padding ever the end of the metadata, so we cannot rely on - // position_ - int32_t metadata_length; - int64_t body_length; +FileBlock FileFooter::dictionary(int i) const { + return impl_->dictionary(i); +} - // Frame of reference in file format is 0, see ARROW-384 - const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( - columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length)); - RETURN_NOT_OK(UpdatePosition()); +Status FileFooter::GetSchema(std::shared_ptr* out) const { + return impl_->GetSchema(out); +} - DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes"; +// ---------------------------------------------------------------------- +// File writer implementation - // Append metadata, to be written in the footer later - record_batches_.emplace_back(offset, metadata_length, body_length); +Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out) { + *out = std::shared_ptr(new FileWriter(sink, schema)); // ctor is private + RETURN_NOT_OK((*out)->UpdatePosition()); + return Status::OK(); +} +Status FileWriter::Start() { + RETURN_NOT_OK(WriteAligned( + reinterpret_cast(kArrowMagicBytes), strlen(kArrowMagicBytes))); + started_ = true; return Status::OK(); } +Status FileWriter::WriteRecordBatch(const RecordBatch& batch) { + // Push an empty FileBlock + // Append metadata, to be written in the footer later + record_batches_.emplace_back(0, 0, 0); + return StreamWriter::WriteRecordBatch( + batch, &record_batches_[record_batches_.size() - 1]); +} + Status FileWriter::Close() { // Write metadata int64_t initial_position = position_; - RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_)); + RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_)); RETURN_NOT_OK(UpdatePosition()); // Write footer length diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h index 4f35c37b03235..7696954c188e3 100644 --- a/cpp/src/arrow/ipc/file.h +++ b/cpp/src/arrow/ipc/file.h @@ -25,13 +25,12 @@ #include #include "arrow/ipc/metadata.h" +#include "arrow/ipc/stream.h" #include "arrow/util/visibility.h" namespace arrow { -class Array; class Buffer; -struct Field; class RecordBatch; class Schema; class Status; @@ -45,40 +44,43 @@ class ReadableFileInterface; namespace ipc { -class ARROW_EXPORT FileWriter { - public: - static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, - std::shared_ptr* out); +Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, + const std::vector& record_batches, io::OutputStream* out); - // TODO(wesm): Write dictionaries +class ARROW_EXPORT FileFooter { + public: + ~FileFooter(); - Status WriteRecordBatch( - const std::vector>& columns, int32_t num_rows); + static Status Open( + const std::shared_ptr& buffer, std::unique_ptr* out); - Status Close(); + int num_dictionaries() const; + int num_record_batches() const; + MetadataVersion::type version() const; - private: - FileWriter(io::OutputStream* sink, const std::shared_ptr& schema); + FileBlock record_batch(int i) const; + FileBlock dictionary(int i) const; - Status CheckStarted(); - Status Start(); + Status GetSchema(std::shared_ptr* out) const; - Status UpdatePosition(); + private: + FileFooter(); + class FileFooterImpl; + std::unique_ptr impl_; +}; - // Adds padding bytes if necessary to ensure all memory blocks are written on - // 8-byte boundaries. - Status Align(); +class ARROW_EXPORT FileWriter : public StreamWriter { + public: + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); - // Write data and update position - Status Write(const uint8_t* data, int64_t nbytes); + Status WriteRecordBatch(const RecordBatch& batch) override; + Status Close() override; - // Write and align - Status WriteAligned(const uint8_t* data, int64_t nbytes); + private: + using StreamWriter::StreamWriter; - io::OutputStream* sink_; - std::shared_ptr schema_; - int64_t position_; - bool started_; + Status Start() override; std::vector dictionaries_; std::vector record_batches_; diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 6ba0a6e16be08..17868f8f1029e 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -55,8 +55,8 @@ class TestWriteRecordBatch : public ::testing::TestWithParam, const int64_t buffer_offset = 0; - RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset, - mmap_.get(), &metadata_length, &body_length)); + RETURN_NOT_OK(WriteRecordBatch( + batch, buffer_offset, mmap_.get(), &metadata_length, &body_length)); std::shared_ptr metadata; RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); @@ -102,9 +102,8 @@ void TestGetRecordBatchSize(std::shared_ptr batch) { int32_t mock_metadata_length = -1; int64_t mock_body_length = -1; int64_t size = -1; - ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock, - &mock_metadata_length, &mock_body_length)); - ASSERT_OK(GetRecordBatchSize(batch.get(), &size)); + ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length)); + ASSERT_OK(GetRecordBatchSize(*batch, &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } @@ -157,11 +156,10 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); if (override_level) { - return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), - metadata_length, body_length, recursion_level + 1); + return WriteRecordBatch( + *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1); } else { - return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), - metadata_length, body_length); + return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length); } } diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index 0a9f677966389..15ceb80493632 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -29,6 +29,7 @@ #include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/file.h" +#include "arrow/ipc/stream.h" #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" @@ -41,6 +42,19 @@ namespace arrow { namespace ipc { +void CompareBatch(const RecordBatch& left, const RecordBatch& right) { + ASSERT_TRUE(left.schema()->Equals(right.schema())); + ASSERT_EQ(left.num_columns(), right.num_columns()) + << left.schema()->ToString() << " result: " << right.schema()->ToString(); + EXPECT_EQ(left.num_rows(), right.num_rows()); + for (int i = 0; i < left.num_columns(); ++i) { + EXPECT_TRUE(left.column(i)->Equals(right.column(i))) + << "Idx: " << i << " Name: " << left.column_name(i); + } +} + +using BatchVector = std::vector>; + class TestFileFormat : public ::testing::TestWithParam { public: void SetUp() { @@ -50,43 +64,94 @@ class TestFileFormat : public ::testing::TestWithParam { } void TearDown() {} - Status RoundTripHelper( - const RecordBatch& batch, std::vector>* out_batches) { + Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { // Write the file - RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), &file_writer_)); - int num_batches = 3; - for (int i = 0; i < num_batches; ++i) { - RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), batch.num_rows())); + std::shared_ptr writer; + RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); + + const int num_batches = static_cast(in_batches.size()); + + for (const auto& batch : in_batches) { + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } - RETURN_NOT_OK(file_writer_->Close()); + RETURN_NOT_OK(writer->Close()); // Current offset into stream is the end of the file int64_t footer_offset; RETURN_NOT_OK(sink_->Tell(&footer_offset)); // Open the file - auto reader = std::make_shared(buffer_); - RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_)); + auto buf_reader = std::make_shared(buffer_); + std::shared_ptr reader; + RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader)); - EXPECT_EQ(num_batches, file_reader_->num_record_batches()); - - out_batches->resize(num_batches); + EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { - RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i])); + std::shared_ptr chunk; + RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); + out_batches->emplace_back(chunk); } return Status::OK(); } - void CompareBatch(const RecordBatch* left, const RecordBatch* right) { - ASSERT_TRUE(left->schema()->Equals(right->schema())); - ASSERT_EQ(left->num_columns(), right->num_columns()) - << left->schema()->ToString() << " result: " << right->schema()->ToString(); - EXPECT_EQ(left->num_rows(), right->num_rows()); - for (int i = 0; i < left->num_columns(); ++i) { - EXPECT_TRUE(left->column(i)->Equals(right->column(i))) - << "Idx: " << i << " Name: " << left->column_name(i); + protected: + MemoryPool* pool_; + + std::unique_ptr sink_; + std::shared_ptr buffer_; +}; + +TEST_P(TestFileFormat, RoundTrip) { + std::shared_ptr batch1; + std::shared_ptr batch2; + ASSERT_OK((*GetParam())(&batch1)); // NOLINT clang-tidy gtest issue + ASSERT_OK((*GetParam())(&batch2)); // NOLINT clang-tidy gtest issue + + std::vector> in_batches = {batch1, batch2}; + std::vector> out_batches; + + ASSERT_OK(RoundTripHelper(in_batches, &out_batches)); + + // Compare batches + for (size_t i = 0; i < in_batches.size(); ++i) { + CompareBatch(*in_batches[i], *out_batches[i]); + } +} + +class TestStreamFormat : public ::testing::TestWithParam { + public: + void SetUp() { + pool_ = default_memory_pool(); + buffer_ = std::make_shared(pool_); + sink_.reset(new io::BufferOutputStream(buffer_)); + } + void TearDown() {} + + Status RoundTripHelper( + const RecordBatch& batch, std::vector>* out_batches) { + // Write the file + std::shared_ptr writer; + RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer)); + int num_batches = 5; + for (int i = 0; i < num_batches; ++i) { + RETURN_NOT_OK(writer->WriteRecordBatch(batch)); + } + RETURN_NOT_OK(writer->Close()); + + // Open the file + auto buf_reader = std::make_shared(buffer_); + + std::shared_ptr reader; + RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader)); + + std::shared_ptr chunk; + while (true) { + RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk)); + if (chunk == nullptr) { break; } + out_batches->emplace_back(chunk); } + return Status::OK(); } protected: @@ -94,12 +159,9 @@ class TestFileFormat : public ::testing::TestWithParam { std::unique_ptr sink_; std::shared_ptr buffer_; - - std::shared_ptr file_writer_; - std::shared_ptr file_reader_; }; -TEST_P(TestFileFormat, RoundTrip) { +TEST_P(TestStreamFormat, RoundTrip) { std::shared_ptr batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue @@ -109,14 +171,80 @@ TEST_P(TestFileFormat, RoundTrip) { // Compare batches. Same for (size_t i = 0; i < out_batches.size(); ++i) { - CompareBatch(batch.get(), out_batches[i].get()); + CompareBatch(*batch, *out_batches[i]); } } -INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat, - ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, - &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, - &MakeStringTypesRecordBatch, &MakeStruct)); +#define BATCH_CASES() \ + ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \ + &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch, \ + &MakeStruct); + +INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES()); +INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES()); + +class TestFileFooter : public ::testing::Test { + public: + void SetUp() {} + + void CheckRoundtrip(const Schema& schema, const std::vector& dictionaries, + const std::vector& record_batches) { + auto buffer = std::make_shared(); + io::BufferOutputStream stream(buffer); + + ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); + + std::unique_ptr footer; + ASSERT_OK(FileFooter::Open(buffer, &footer)); + + ASSERT_EQ(MetadataVersion::V2, footer->version()); + + // Check schema + std::shared_ptr schema2; + ASSERT_OK(footer->GetSchema(&schema2)); + AssertSchemaEqual(schema, *schema2); + + // Check blocks + ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); + ASSERT_EQ(record_batches.size(), footer->num_record_batches()); + + for (int i = 0; i < footer->num_dictionaries(); ++i) { + CheckBlocks(dictionaries[i], footer->dictionary(i)); + } + + for (int i = 0; i < footer->num_record_batches(); ++i) { + CheckBlocks(record_batches[i], footer->record_batch(i)); + } + } + + void CheckBlocks(const FileBlock& left, const FileBlock& right) { + ASSERT_EQ(left.offset, right.offset); + ASSERT_EQ(left.metadata_length, right.metadata_length); + ASSERT_EQ(left.body_length, right.body_length); + } + + private: + std::shared_ptr example_schema_; +}; + +TEST_F(TestFileFooter, Basics) { + auto f0 = std::make_shared("f0", std::make_shared()); + auto f1 = std::make_shared("f1", std::make_shared()); + Schema schema({f0, f1}); + + std::vector dictionaries; + dictionaries.emplace_back(8, 92, 900); + dictionaries.emplace_back(1000, 100, 1900); + dictionaries.emplace_back(3000, 100, 2900); + + std::vector record_batches; + record_batches.emplace_back(6000, 100, 900); + record_batches.emplace_back(7000, 100, 1900); + record_batches.emplace_back(9000, 100, 2900); + record_batches.emplace_back(12000, 100, 3900); + + CheckRoundtrip(schema, dictionaries, record_batches); +} } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index 07509890da35c..30f968c2bfd8b 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -245,8 +245,9 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { std::vector> arrays; MakeBatchArrays(schema, num_rows, &arrays); - batches.emplace_back(std::make_shared(schema, num_rows, arrays)); - ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows)); + auto batch = std::make_shared(schema, num_rows, arrays); + batches.push_back(batch); + ASSERT_OK(writer->WriteRecordBatch(*batch)); } std::string result; diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 7c5744a241068..098f996d292a2 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -23,6 +23,7 @@ #include "arrow/io/memory.h" #include "arrow/ipc/metadata.h" +#include "arrow/ipc/test-common.h" #include "arrow/schema.h" #include "arrow/status.h" #include "arrow/test-util.h" @@ -34,20 +35,11 @@ class Buffer; namespace ipc { -static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) { - if (!lhs->Equals(*rhs)) { - std::stringstream ss; - ss << "left schema: " << lhs->ToString() << std::endl - << "right schema: " << rhs->ToString() << std::endl; - FAIL() << ss.str(); - } -} - class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} - void CheckRoundtrip(const Schema* schema) { + void CheckRoundtrip(const Schema& schema) { std::shared_ptr buffer; ASSERT_OK(WriteSchema(schema, &buffer)); @@ -57,12 +49,12 @@ class TestSchemaMetadata : public ::testing::Test { ASSERT_EQ(Message::SCHEMA, message->type()); auto schema_msg = std::make_shared(message); - ASSERT_EQ(schema->num_fields(), schema_msg->num_fields()); + ASSERT_EQ(schema.num_fields(), schema_msg->num_fields()); std::shared_ptr schema2; ASSERT_OK(schema_msg->GetSchema(&schema2)); - assert_schema_equal(schema, schema2.get()); + AssertSchemaEqual(schema, *schema2); } }; @@ -82,7 +74,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f10 = std::make_shared("f10", std::make_shared()); Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10}); - CheckRoundtrip(&schema); + CheckRoundtrip(schema); } TEST_F(TestSchemaMetadata, NestedFields) { @@ -94,70 +86,7 @@ TEST_F(TestSchemaMetadata, NestedFields) { auto f1 = std::make_shared("f1", type2); Schema schema({f0, f1}); - CheckRoundtrip(&schema); -} - -class TestFileFooter : public ::testing::Test { - public: - void SetUp() {} - - void CheckRoundtrip(const Schema* schema, const std::vector& dictionaries, - const std::vector& record_batches) { - auto buffer = std::make_shared(); - io::BufferOutputStream stream(buffer); - - ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); - - std::unique_ptr footer; - ASSERT_OK(FileFooter::Open(buffer, &footer)); - - ASSERT_EQ(MetadataVersion::V2, footer->version()); - - // Check schema - std::shared_ptr schema2; - ASSERT_OK(footer->GetSchema(&schema2)); - assert_schema_equal(schema, schema2.get()); - - // Check blocks - ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); - ASSERT_EQ(record_batches.size(), footer->num_record_batches()); - - for (int i = 0; i < footer->num_dictionaries(); ++i) { - CheckBlocks(dictionaries[i], footer->dictionary(i)); - } - - for (int i = 0; i < footer->num_record_batches(); ++i) { - CheckBlocks(record_batches[i], footer->record_batch(i)); - } - } - - void CheckBlocks(const FileBlock& left, const FileBlock& right) { - ASSERT_EQ(left.offset, right.offset); - ASSERT_EQ(left.metadata_length, right.metadata_length); - ASSERT_EQ(left.body_length, right.body_length); - } - - private: - std::shared_ptr example_schema_; -}; - -TEST_F(TestFileFooter, Basics) { - auto f0 = std::make_shared("f0", std::make_shared()); - auto f1 = std::make_shared("f1", std::make_shared()); - Schema schema({f0, f1}); - - std::vector dictionaries; - dictionaries.emplace_back(8, 92, 900); - dictionaries.emplace_back(1000, 100, 1900); - dictionaries.emplace_back(3000, 100, 2900); - - std::vector record_batches; - record_batches.emplace_back(6000, 100, 900); - record_batches.emplace_back(7000, 100, 1900); - record_batches.emplace_back(9000, 100, 2900); - record_batches.emplace_back(12000, 100, 3900); - - CheckRoundtrip(&schema, dictionaries, record_batches); + CheckRoundtrip(schema); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 757e6c00ab243..95bc742054fab 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -81,7 +81,7 @@ static Status ConvertJsonToArrow( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr batch; RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows())); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } return writer->Close(); } @@ -108,7 +108,7 @@ static Status ConvertArrowToJson( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr batch; RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows())); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } std::string result; diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 6e3a9939730f4..773fb74a1767a 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -64,25 +64,23 @@ class JsonWriter::JsonWriterImpl { return Status::OK(); } - Status WriteRecordBatch( - const std::vector>& columns, int32_t num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema_->num_fields()); + Status WriteRecordBatch(const RecordBatch& batch) { + DCHECK_EQ(batch.num_columns(), schema_->num_fields()); writer_->StartObject(); writer_->Key("count"); - writer_->Int(num_rows); + writer_->Int(batch.num_rows()); writer_->Key("columns"); writer_->StartArray(); for (int i = 0; i < schema_->num_fields(); ++i) { - const std::shared_ptr& column = columns[i]; + const std::shared_ptr& column = batch.column(i); - DCHECK_EQ(num_rows, column->length()) + DCHECK_EQ(batch.num_rows(), column->length()) << "Array length did not match record batch length"; - RETURN_NOT_OK( - WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get())); + RETURN_NOT_OK(WriteJsonArray(schema_->field(i)->name, *column, writer_.get())); } writer_->EndArray(); @@ -113,9 +111,8 @@ Status JsonWriter::Finish(std::string* result) { return impl_->Finish(result); } -Status JsonWriter::WriteRecordBatch( - const std::vector>& columns, int32_t num_rows) { - return impl_->WriteRecordBatch(columns, num_rows); +Status JsonWriter::WriteRecordBatch(const RecordBatch& batch) { + return impl_->WriteRecordBatch(batch); } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h index 7395be43b967d..88afdfaa5ff3b 100644 --- a/cpp/src/arrow/ipc/json.h +++ b/cpp/src/arrow/ipc/json.h @@ -46,8 +46,7 @@ class ARROW_EXPORT JsonWriter { // TODO(wesm): Write dictionaries - Status WriteRecordBatch( - const std::vector>& columns, int32_t num_rows); + Status WriteRecordBatch(const RecordBatch& batch); Status Finish(std::string* result); diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index cc160c42ec9ef..cd7722056a3c7 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -282,10 +282,10 @@ flatbuf::Endianness endianness() { } Status SchemaToFlatbuffer( - FBB& fbb, const Schema* schema, flatbuffers::Offset* out) { + FBB& fbb, const Schema& schema, flatbuffers::Offset* out) { std::vector field_offsets; - for (int i = 0; i < schema->num_fields(); ++i) { - std::shared_ptr field = schema->field(i); + for (int i = 0; i < schema.num_fields(); ++i) { + std::shared_ptr field = schema.field(i); FieldOffset offset; RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset)); field_offsets.push_back(offset); @@ -295,7 +295,7 @@ Status SchemaToFlatbuffer( return Status::OK(); } -Status MessageBuilder::SetSchema(const Schema* schema) { +Status MessageBuilder::SetSchema(const Schema& schema) { flatbuffers::Offset fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema)); diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 4826ebe22899d..d94a8abc99ab0 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -49,11 +49,11 @@ static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVe Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr* out); Status SchemaToFlatbuffer( - FBB& fbb, const Schema* schema, flatbuffers::Offset* out); + FBB& fbb, const Schema& schema, flatbuffers::Offset* out); class MessageBuilder { public: - Status SetSchema(const Schema* schema); + Status SetSchema(const Schema& schema); Status SetRecordBatch(int32_t length, int64_t body_length, const std::vector& nodes, diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index f0674ff8d5aeb..a97965c40d608 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -38,7 +38,7 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -Status WriteSchema(const Schema* schema, std::shared_ptr* out) { +Status WriteSchema(const Schema& schema, std::shared_ptr* out) { MessageBuilder message; RETURN_NOT_OK(message.SetSchema(schema)); RETURN_NOT_OK(message.Finish()); @@ -232,124 +232,5 @@ int RecordBatchMetadata::num_fields() const { return impl_->num_fields(); } -// ---------------------------------------------------------------------- -// File footer - -static flatbuffers::Offset> -FileBlocksToFlatbuffer(FBB& fbb, const std::vector& blocks) { - std::vector fb_blocks; - - for (const FileBlock& block : blocks) { - fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); - } - - return fbb.CreateVectorOfStructs(fb_blocks); -} - -Status WriteFileFooter(const Schema* schema, const std::vector& dictionaries, - const std::vector& record_batches, io::OutputStream* out) { - FBB fbb; - - flatbuffers::Offset fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); - - auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); - auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - - auto footer = flatbuf::CreateFooter( - fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); - - fbb.Finish(footer); - - int32_t size = fbb.GetSize(); - - return out->Write(fbb.GetBufferPointer(), size); -} - -static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { - return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); -} - -class FileFooter::FileFooterImpl { - public: - FileFooterImpl(const std::shared_ptr& buffer, const flatbuf::Footer* footer) - : buffer_(buffer), footer_(footer) {} - - int num_dictionaries() const { return footer_->dictionaries()->size(); } - - int num_record_batches() const { return footer_->recordBatches()->size(); } - - MetadataVersion::type version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - return MetadataVersion::V2; - // Add cases as other versions become available - default: - return MetadataVersion::V2; - } - } - - FileBlock record_batch(int i) const { - return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - } - - FileBlock dictionary(int i) const { - return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - } - - Status GetSchema(std::shared_ptr* out) const { - auto schema_msg = std::make_shared(nullptr, footer_->schema()); - return schema_msg->GetSchema(out); - } - - private: - // Retain reference to memory - std::shared_ptr buffer_; - - const flatbuf::Footer* footer_; -}; - -FileFooter::FileFooter() {} - -FileFooter::~FileFooter() {} - -Status FileFooter::Open( - const std::shared_ptr& buffer, std::unique_ptr* out) { - const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); - - *out = std::unique_ptr(new FileFooter()); - - // TODO(wesm): Verify the footer - (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); - - return Status::OK(); -} - -int FileFooter::num_dictionaries() const { - return impl_->num_dictionaries(); -} - -int FileFooter::num_record_batches() const { - return impl_->num_record_batches(); -} - -MetadataVersion::type FileFooter::version() const { - return impl_->version(); -} - -FileBlock FileFooter::record_batch(int i) const { - return impl_->record_batch(i); -} - -FileBlock FileFooter::dictionary(int i) const { - return impl_->dictionary(i); -} - -Status FileFooter::GetSchema(std::shared_ptr* out) const { - return impl_->GetSchema(out); -} - } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 1c4ef64d62fad..6e15ef353d853 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -49,7 +49,7 @@ struct MetadataVersion { // Serialize arrow::Schema as a Flatbuffer ARROW_EXPORT -Status WriteSchema(const Schema* schema, std::shared_ptr* out); +Status WriteSchema(const Schema& schema, std::shared_ptr* out); // Read interface classes. We do not fully deserialize the flatbuffers so that // individual fields metadata can be retrieved from very large schema without @@ -149,10 +149,8 @@ class ARROW_EXPORT Message { std::unique_ptr impl_; }; -// ---------------------------------------------------------------------- -// File footer for file-like representation - struct FileBlock { + FileBlock() {} FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) : offset(offset), metadata_length(metadata_length), body_length(body_length) {} @@ -161,32 +159,6 @@ struct FileBlock { int64_t body_length; }; -ARROW_EXPORT -Status WriteFileFooter(const Schema* schema, const std::vector& dictionaries, - const std::vector& record_batches, io::OutputStream* out); - -class ARROW_EXPORT FileFooter { - public: - ~FileFooter(); - - static Status Open( - const std::shared_ptr& buffer, std::unique_ptr* out); - - int num_dictionaries() const; - int num_record_batches() const; - MetadataVersion::type version() const; - - FileBlock record_batch(int i) const; - FileBlock dictionary(int i) const; - - Status GetSchema(std::shared_ptr* out) const; - - private: - FileFooter(); - class FileFooterImpl; - std::unique_ptr impl_; -}; - } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc new file mode 100644 index 0000000000000..a2ca672fbe0aa --- /dev/null +++ b/cpp/src/arrow/ipc/stream.cc @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/stream.h" + +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/schema.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace ipc { + +// ---------------------------------------------------------------------- +// Stream writer implementation + +StreamWriter::~StreamWriter() {} + +StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr& schema) + : sink_(sink), schema_(schema), position_(-1), started_(false) {} + +Status StreamWriter::UpdatePosition() { + return sink_->Tell(&position_); +} + +Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(sink_->Write(data, nbytes)); + position_ += nbytes; + return Status::OK(); +} + +Status StreamWriter::Align() { + int64_t remainder = PaddedLength(position_) - position_; + if (remainder > 0) { return Write(kPaddingBytes, remainder); } + return Status::OK(); +} + +Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(Write(data, nbytes)); + return Align(); +} + +Status StreamWriter::CheckStarted() { + if (!started_) { return Start(); } + return Status::OK(); +} + +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block) { + RETURN_NOT_OK(CheckStarted()); + + block->offset = position_; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( + batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length)); + RETURN_NOT_OK(UpdatePosition()); + + DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; + + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// StreamWriter implementation + +Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out) { + // ctor is private + *out = std::shared_ptr(new StreamWriter(sink, schema)); + RETURN_NOT_OK((*out)->UpdatePosition()); + return Status::OK(); +} + +Status StreamWriter::Start() { + std::shared_ptr schema_fb; + RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb)); + + int32_t flatbuffer_size = schema_fb->size(); + RETURN_NOT_OK( + Write(reinterpret_cast(&flatbuffer_size), sizeof(int32_t))); + + // Write the flatbuffer + RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); + started_ = true; + return Status::OK(); +} + +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { + // Pass FileBlock, but results not used + FileBlock dummy_block; + return WriteRecordBatch(batch, &dummy_block); +} + +Status StreamWriter::Close() { + // Close the stream + RETURN_NOT_OK(CheckStarted()); + return sink_->Close(); +} + +// ---------------------------------------------------------------------- +// StreamReader implementation + +StreamReader::StreamReader(const std::shared_ptr& stream) + : stream_(stream), schema_(nullptr) {} + +StreamReader::~StreamReader() {} + +Status StreamReader::Open(const std::shared_ptr& stream, + std::shared_ptr* reader) { + // Private ctor + *reader = std::shared_ptr(new StreamReader(stream)); + return (*reader)->ReadSchema(); +} + +Status StreamReader::ReadSchema() { + std::shared_ptr message; + RETURN_NOT_OK(ReadNextMessage(&message)); + + if (message->type() != Message::SCHEMA) { + return Status::IOError("First message was not schema type"); + } + + SchemaMetadata schema_meta(message); + + // TODO(wesm): If the schema contains dictionaries, we must read all the + // dictionaries from the stream before constructing the final Schema + return schema_meta.GetSchema(&schema_); +} + +Status StreamReader::ReadNextMessage(std::shared_ptr* message) { + std::shared_ptr buffer; + RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); + + if (buffer->size() != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + int32_t message_length = *reinterpret_cast(buffer->data()); + + RETURN_NOT_OK(stream_->Read(message_length, &buffer)); + if (buffer->size() != message_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } + return Message::Open(buffer, 0, message); +} + +std::shared_ptr StreamReader::schema() const { + return schema_; +} + +Status StreamReader::GetNextRecordBatch(std::shared_ptr* batch) { + std::shared_ptr message; + RETURN_NOT_OK(ReadNextMessage(&message)); + + if (message == nullptr) { + // End of stream + *batch = nullptr; + return Status::OK(); + } + + if (message->type() != Message::RECORD_BATCH) { + return Status::IOError("Metadata not record batch"); + } + + auto batch_metadata = std::make_shared(message); + + std::shared_ptr batch_body; + RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body)); + + if (batch_body->size() < message->body_length()) { + return Status::IOError("Unexpected EOS when reading message body"); + } + + io::BufferReader reader(batch_body); + + return ReadRecordBatch(batch_metadata, schema_, &reader, batch); +} + +} // namespace ipc +} // namespace arrow diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h new file mode 100644 index 0000000000000..0b0e62f13fc5f --- /dev/null +++ b/cpp/src/arrow/ipc/stream.h @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow streaming binary format + +#ifndef ARROW_IPC_STREAM_H +#define ARROW_IPC_STREAM_H + +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +struct Field; +class RecordBatch; +class Schema; +class Status; + +namespace io { + +class InputStream; +class OutputStream; + +} // namespace io + +namespace ipc { + +struct FileBlock; +class Message; + +class ARROW_EXPORT StreamWriter { + public: + virtual ~StreamWriter(); + + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + + virtual Status WriteRecordBatch(const RecordBatch& batch); + virtual Status Close(); + + protected: + StreamWriter(io::OutputStream* sink, const std::shared_ptr& schema); + + virtual Status Start(); + + Status CheckStarted(); + Status UpdatePosition(); + + Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block); + + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte boundaries. + Status Align(); + + // Write data and update position + Status Write(const uint8_t* data, int64_t nbytes); + + // Write and align + Status WriteAligned(const uint8_t* data, int64_t nbytes); + + io::OutputStream* sink_; + std::shared_ptr schema_; + int64_t position_; + bool started_; +}; + +class ARROW_EXPORT StreamReader { + public: + ~StreamReader(); + + // Open an stream. + static Status Open(const std::shared_ptr& stream, + std::shared_ptr* reader); + + std::shared_ptr schema() const; + + // Returned batch is nullptr when end of stream reached + Status GetNextRecordBatch(std::shared_ptr* batch); + + private: + explicit StreamReader(const std::shared_ptr& stream); + + Status ReadSchema(); + + Status ReadNextMessage(std::shared_ptr* message); + + std::shared_ptr stream_; + std::shared_ptr schema_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_STREAM_H diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 3faeebf956966..ca790ded92191 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -36,6 +36,15 @@ namespace arrow { namespace ipc { +static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) { + if (!lhs.Equals(rhs)) { + std::stringstream ss; + ss << "left schema: " << lhs.ToString() << std::endl + << "right schema: " << rhs.ToString() << std::endl; + FAIL() << ss.str(); + } +} + const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index b3185b1c1671c..82957600d1eb6 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -29,8 +29,7 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, shared_ptr[CFileWriter]* out) - CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns, - int32_t num_rows) + CStatus WriteRecordBatch(const CRecordBatch& batch) CStatus Close() diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx index abc5e1b11ec4c..22069a7290ead 100644 --- a/python/pyarrow/ipc.pyx +++ b/python/pyarrow/ipc.pyx @@ -21,6 +21,8 @@ # distutils: language = c++ # cython: embedsignature = True +from cython.operator cimport dereference as deref + from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_io cimport * from pyarrow.includes.libarrow_ipc cimport * @@ -58,10 +60,9 @@ cdef class ArrowFileWriter: self.close() def write_record_batch(self, RecordBatch batch): - cdef CRecordBatch* bptr = batch.batch with nogil: check_status(self.writer.get() - .WriteRecordBatch(bptr.columns(), bptr.num_rows())) + .WriteRecordBatch(deref(batch.batch))) def close(self): with nogil: diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 6623e239880bc..feafa3dfc3875 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -254,16 +254,16 @@ struct arrow_traits { static constexpr bool is_numeric_nullable = false; }; -#define INT_DECL(TYPE) \ - template <> \ - struct arrow_traits { \ - static constexpr int npy_type = NPY_##TYPE; \ - static constexpr bool supports_nulls = false; \ - static constexpr double na_value = NAN; \ - static constexpr bool is_boolean = false; \ - static constexpr bool is_numeric_not_nullable = true; \ - static constexpr bool is_numeric_nullable = false; \ - typedef typename npy_traits::value_type T; \ +#define INT_DECL(TYPE) \ + template <> \ + struct arrow_traits { \ + static constexpr int npy_type = NPY_##TYPE; \ + static constexpr bool supports_nulls = false; \ + static constexpr double na_value = NAN; \ + static constexpr bool is_boolean = false; \ + static constexpr bool is_numeric_not_nullable = true; \ + static constexpr bool is_numeric_nullable = false; \ + typedef typename npy_traits::value_type T; \ }; INT_DECL(INT8); @@ -1803,7 +1803,7 @@ class ArrowDeserializer { // types Status Convert(PyObject** out) { -#define CONVERT_CASE(TYPE) \ +#define CONVERT_CASE(TYPE) \ case Type::TYPE: { \ RETURN_NOT_OK(ConvertValues()); \ } break; @@ -1857,8 +1857,7 @@ class ArrowDeserializer { } template - inline typename std::enable_if::type - ConvertValues() { + inline typename std::enable_if::type ConvertValues() { typedef typename arrow_traits::T T; RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); @@ -1910,24 +1909,21 @@ class ArrowDeserializer { // UTF8 strings template - inline typename std::enable_if::type - ConvertValues() { + inline typename std::enable_if::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(arr_)); return ConvertBinaryLike(data_, out_values); } template - inline typename std::enable_if::type - ConvertValues() { + inline typename std::enable_if::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(arr_)); return ConvertBinaryLike(data_, out_values); } template - inline typename std::enable_if::type - ConvertValues() { + inline typename std::enable_if::type ConvertValues() { std::shared_ptr block; RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block)); RETURN_NOT_OK(block->Write(col_, 0, 0));