Skip to content

Commit

Permalink
ARROW-495: [C++] Implement streaming binary format, refactoring
Browse files Browse the repository at this point in the history
cc @nongli

Author: Wes McKinney <[email protected]>

Closes #293 from wesm/ARROW-495 and squashes the following commits:

279583b [Wes McKinney] FileBlock is a struct
c88e61a [Wes McKinney] Fix Python bindings after API changes
645a329 [Wes McKinney] Install stream.h
21378b4 [Wes McKinney] Collapse BaseStreamWriter and StreamWriter
b6c4578 [Wes McKinney] clang-format
12eb2cb [Wes McKinney] Add unit tests for streaming format, fix EOS, metadata length padding issues
3200b17 [Wes McKinney] Implement StreamReader
69fe82e [Wes McKinney] Implement rough draft of StreamWriter, share code with FileWriter
  • Loading branch information
wesm committed Jan 21, 2017
1 parent 8ca7033 commit 5888e10
Show file tree
Hide file tree
Showing 24 changed files with 718 additions and 417 deletions.
1 change: 0 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_ALTIVEC
"Build Arrow with Altivec"
ON)

endif()

if(NOT ARROW_BUILD_TESTS)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
int64_t size = std::min(nbytes, size_ - position_);

if (buffer_ != nullptr) {
if (size > 0 && buffer_ != nullptr) {
*out = SliceBuffer(buffer_, position_, size);
} else {
*out = std::make_shared<Buffer>(data_ + position_, size);
}

position_ += nbytes;
position_ += size;
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS
json-internal.cc
metadata.cc
metadata-internal.cc
stream.cc
)

if(NOT APPLE)
Expand Down Expand Up @@ -151,6 +152,7 @@ install(FILES
file.h
json.h
metadata.h
stream.h
DESTINATION include/arrow/ipc)

# pkg-config support
Expand Down
44 changes: 22 additions & 22 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ namespace ipc {

class RecordBatchWriter : public ArrayVisitor {
public:
RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
int64_t buffer_start_offset, int max_recursion_depth)
: columns_(columns),
num_rows_(num_rows),
RecordBatchWriter(
const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
: batch_(batch),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset) {}

Expand All @@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor {
}

// Perform depth-first traversal of the row-batch
for (size_t i = 0; i < columns_.size(); ++i) {
RETURN_NOT_OK(VisitArray(*columns_[i].get()));
for (int i = 0; i < batch_.num_columns(); ++i) {
RETURN_NOT_OK(VisitArray(*batch_.column(i)));
}

// The position for the start of a buffer relative to the passed frame of
Expand Down Expand Up @@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor {
// itself as an int32_t.
std::shared_ptr<Buffer> metadata_fb;
RETURN_NOT_OK(WriteRecordBatchMetadata(
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb));

// Need to write 4 bytes (metadata size), the metadata, plus padding to
// fall on an 8-byte offset
int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
// end on an 8-byte offset
int64_t start_offset;
RETURN_NOT_OK(dst->Tell(&start_offset));

int64_t padded_metadata_length = metadata_fb->size() + 4;
const int remainder = (padded_metadata_length + start_offset) % 8;
if (remainder != 0) { padded_metadata_length += 8 - remainder; }

// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
*metadata_length = static_cast<int32_t>(padded_metadata_length);

// Write the flatbuffer size prefix
int32_t flatbuffer_size = metadata_fb->size();
// Write the flatbuffer size prefix including padding
int32_t flatbuffer_size = padded_metadata_length - 4;
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));

Expand Down Expand Up @@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}

// Do not copy this vector. Ownership must be retained elsewhere
const std::vector<std::shared_ptr<Array>>& columns_;
int32_t num_rows_;
const RecordBatch& batch_;

std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
Expand All @@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor {
int64_t buffer_start_offset_;
};

Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
RecordBatchWriter serializer(
columns, num_rows, buffer_start_offset, max_recursion_depth);
RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
return serializer.Write(dst, metadata_length, body_length);
}

Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
RecordBatchWriter serializer(
batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/ipc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64;
//
// @param(out) body_length: the size of the contiguous buffer block plus
// padding bytes
ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length,
int max_recursion_depth = kMaxIpcRecursionDepth);

// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);

// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);

// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
Expand Down
167 changes: 120 additions & 47 deletions cpp/src/arrow/ipc/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/adapter.h"
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
#include "arrow/status.h"
Expand All @@ -35,82 +36,154 @@ namespace arrow {
namespace ipc {

static constexpr const char* kArrowMagicBytes = "ARROW1";

// ----------------------------------------------------------------------
// Writer implementation
// File footer

static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
std::vector<flatbuf::Block> fb_blocks;

FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
: sink_(sink), schema_(schema), position_(-1), started_(false) {}
for (const FileBlock& block : blocks) {
fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
}

Status FileWriter::UpdatePosition() {
return sink_->Tell(&position_);
return fbb.CreateVectorOfStructs(fb_blocks);
}

Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out) {
*out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
RETURN_NOT_OK((*out)->UpdatePosition());
return Status::OK();
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
FBB fbb;

flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));

auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);

auto footer = flatbuf::CreateFooter(
fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);

fbb.Finish(footer);

int32_t size = fbb.GetSize();

return out->Write(fbb.GetBufferPointer(), size);
}

Status FileWriter::Write(const uint8_t* data, int64_t nbytes) {
RETURN_NOT_OK(sink_->Write(data, nbytes));
position_ += nbytes;
return Status::OK();
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
}

Status FileWriter::Align() {
int64_t remainder = PaddedLength(position_) - position_;
if (remainder > 0) { return Write(kPaddingBytes, remainder); }
class FileFooter::FileFooterImpl {
public:
FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
: buffer_(buffer), footer_(footer) {}

int num_dictionaries() const { return footer_->dictionaries()->size(); }

int num_record_batches() const { return footer_->recordBatches()->size(); }

MetadataVersion::type version() const {
switch (footer_->version()) {
case flatbuf::MetadataVersion_V1:
return MetadataVersion::V1;
case flatbuf::MetadataVersion_V2:
return MetadataVersion::V2;
// Add cases as other versions become available
default:
return MetadataVersion::V2;
}
}

FileBlock record_batch(int i) const {
return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
}

FileBlock dictionary(int i) const {
return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
}

Status GetSchema(std::shared_ptr<Schema>* out) const {
auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
return schema_msg->GetSchema(out);
}

private:
// Retain reference to memory
std::shared_ptr<Buffer> buffer_;

const flatbuf::Footer* footer_;
};

FileFooter::FileFooter() {}

FileFooter::~FileFooter() {}

Status FileFooter::Open(
const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());

*out = std::unique_ptr<FileFooter>(new FileFooter());

// TODO(wesm): Verify the footer
(*out)->impl_.reset(new FileFooterImpl(buffer, footer));

return Status::OK();
}

Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
RETURN_NOT_OK(Write(data, nbytes));
return Align();
int FileFooter::num_dictionaries() const {
return impl_->num_dictionaries();
}

Status FileWriter::Start() {
RETURN_NOT_OK(WriteAligned(
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
started_ = true;
return Status::OK();
int FileFooter::num_record_batches() const {
return impl_->num_record_batches();
}

Status FileWriter::CheckStarted() {
if (!started_) { return Start(); }
return Status::OK();
MetadataVersion::type FileFooter::version() const {
return impl_->version();
}

Status FileWriter::WriteRecordBatch(
const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
RETURN_NOT_OK(CheckStarted());

int64_t offset = position_;
FileBlock FileFooter::record_batch(int i) const {
return impl_->record_batch(i);
}

// There may be padding ever the end of the metadata, so we cannot rely on
// position_
int32_t metadata_length;
int64_t body_length;
FileBlock FileFooter::dictionary(int i) const {
return impl_->dictionary(i);
}

// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
RETURN_NOT_OK(UpdatePosition());
Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
return impl_->GetSchema(out);
}

DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
// ----------------------------------------------------------------------
// File writer implementation

// Append metadata, to be written in the footer later
record_batches_.emplace_back(offset, metadata_length, body_length);
Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out) {
*out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
RETURN_NOT_OK((*out)->UpdatePosition());
return Status::OK();
}

Status FileWriter::Start() {
RETURN_NOT_OK(WriteAligned(
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
started_ = true;
return Status::OK();
}

Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
// Push an empty FileBlock
// Append metadata, to be written in the footer later
record_batches_.emplace_back(0, 0, 0);
return StreamWriter::WriteRecordBatch(
batch, &record_batches_[record_batches_.size() - 1]);
}

Status FileWriter::Close() {
// Write metadata
int64_t initial_position = position_;
RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_));
RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_));
RETURN_NOT_OK(UpdatePosition());

// Write footer length
Expand Down
Loading

0 comments on commit 5888e10

Please sign in to comment.