Skip to content

Commit

Permalink
ARROW-499: Update file serialization to use the streaming serializati…
Browse files Browse the repository at this point in the history
…on format.

Author: Wes McKinney <[email protected]>
Author: Nong Li <[email protected]>

Closes #292 from nongli/file and squashes the following commits:

18890a9 [Wes McKinney] Message fixes. Fix Java test suite. Integration tests pass
f187539 [Nong Li] Merge pull request #1 from wesm/file-change-cpp-impl
e3af434 [Wes McKinney] Remove unused variable
664d5be [Wes McKinney] Fixes, stream tests pass again
ba8db91 [Wes McKinney] Redo MessageSerializer with unions. Still has bugs
21854cc [Wes McKinney] Restore Block.bodyLength to long
7c6f7ef [Nong Li] Update to restore Block behavior
27b3909 [Nong Li] [ARROW-499]: [Java] Update file serialization to use the streaming serialization format.
  • Loading branch information
wesm committed Jan 20, 2017
1 parent 512bc16 commit 8ca7033
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 185 deletions.
11 changes: 6 additions & 5 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ class RecordBatchWriter : public ArrayVisitor {
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));

// Need to write 4 bytes (metadata size), the metadata, plus padding to
// fall on a 64-byte offset
int64_t padded_metadata_length =
BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
// fall on an 8-byte offset
int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);

// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
*metadata_length = padded_metadata_length;
*metadata_length = static_cast<int32_t>(padded_metadata_length);

// Write the flatbuffer size prefix
int32_t flatbuffer_size = metadata_fb->size();
Expand Down Expand Up @@ -604,7 +603,9 @@ Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
return Status::Invalid(ss.str());
}

*metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
std::shared_ptr<Message> message;
RETURN_NOT_OK(Message::Open(buffer, 4, &message));
*metadata = std::make_shared<RecordBatchMetadata>(message);
return Status::OK();
}

Expand Down
21 changes: 4 additions & 17 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,10 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
flatbuffers::FlatBufferBuilder fbb;

auto batch = flatbuf::CreateRecordBatch(
fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));

fbb.Finish(batch);

int32_t size = fbb.GetSize();

auto result = std::make_shared<PoolBuffer>();
RETURN_NOT_OK(result->Resize(size));

uint8_t* dst = result->mutable_data();
memcpy(dst, fbb.GetBufferPointer(), size);

*out = result;
return Status::OK();
MessageBuilder builder;
RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
RETURN_NOT_OK(builder.Finish());
return builder.GetBuffer(out);
}

Status MessageBuilder::Finish() {
Expand Down
5 changes: 4 additions & 1 deletion format/File.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ table Footer {

struct Block {

/// Index to the start of the RecordBlock (note this is past the Message header)
offset: long;

/// Length of the metadata
metaDataLength: int;

/// Length of the data (this is aligned so there can be a gap between this and
/// the metatdata).
bodyLength: long;

}

root_type Footer;
2 changes: 1 addition & 1 deletion integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def get_static_json_files():


def run_all_tests(debug=False):
testers = [JavaTester(debug=debug), CPPTester(debug=debug)]
testers = [CPPTester(debug=debug), JavaTester(debug=debug)]
static_json_files = get_static_json_files()
generated_json_files = get_generated_json_files()
json_files = static_json_files + generated_json_files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ private static List<ArrowBlock> recordBatches(Footer footer) {

private static List<ArrowBlock> dictionaries(Footer footer) {
List<ArrowBlock> dictionaries = new ArrayList<>();
Block tempBLock = new Block();
Block tempBlock = new Block();

int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
Block block = footer.dictionaries(tempBLock, i);
Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return dictionaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.arrow.flatbuf.Buffer;
import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);

Expand All @@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
this.allocator = allocator;
}

private int readFully(ArrowBuf buffer, int l) throws IOException {
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
buffer.writerIndex(n);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}
return n;
}

private int readFully(ByteBuffer buffer) throws IOException {
int total = 0;
int n;
Expand Down Expand Up @@ -104,46 +87,21 @@ public ArrowFooter readFooter() throws IOException {

// TODO: read dictionaries

public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
if (l < 0) {
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
}
final ArrowBuf buffer = allocator.buffer(l);
LOGGER.debug("allocated buffer " + buffer);
in.position(recordBatchBlock.getOffset());
int n = readFully(buffer, l);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}

// Record batch flatbuffer is prefixed by its size as int32le
final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());

int nodesLength = recordBatchFB.nodesLength();
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
List<ArrowFieldNode> nodes = new ArrayList<>();
for (int i = 0; i < nodesLength; ++i) {
FieldNode node = recordBatchFB.nodes(i);
nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
block.getOffset(), block.getMetadataLength(),
block.getBodyLength()));
in.position(block.getOffset());
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(in, block.getOffset()), block, allocator);
if (batch == null) {
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
List<ArrowBuf> buffers = new ArrayList<>();
for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
Buffer bufferFB = recordBatchFB.buffers(i);
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
buffers.add(vectorBuffer);
}
ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
LOGGER.debug("released buffer " + buffer);
buffer.release();
return arrowRecordBatch;
return batch;
}

@Override
public void close() throws IOException {
in.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.Collections;
import java.util.List;

import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);

Expand All @@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
private final Schema schema;

private final List<ArrowBlock> recordBatches = new ArrayList<>();

private boolean started = false;

public ArrowWriter(WritableByteChannel out, Schema schema) {
Expand All @@ -49,47 +46,19 @@ public ArrowWriter(WritableByteChannel out, Schema schema) {

private void start() throws IOException {
writeMagic();
MessageSerializer.serialize(out, schema);
}


// TODO: write dictionaries

public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
out.align();
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));

// write metadata header with int32 size prefix
long offset = out.getCurrentPosition();
out.write(recordBatch, true);
out.align();
// write body
long bodyOffset = out.getCurrentPosition();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
if (buffers.size() != buffersLayout.size()) {
throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
}
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = bodyOffset + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}

out.write(buffer);
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
}
}
int metadataLength = (int)(bodyOffset - offset);
if (metadataLength <= 0) {
throw new InvalidArrowFileException("invalid recordBatch");
}
long bodyLength = out.getCurrentPosition() - bodyOffset;
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
recordBatches.add(batchDesc);
}

private void checkStarted() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {

private ReadableByteChannel in;
private long bytesRead = 0;
// The starting byte offset into 'in'.
private final long startByteOffset;

public ReadChannel(ReadableByteChannel in) {
public ReadChannel(ReadableByteChannel in, long startByteOffset) {
this.in = in;
this.startByteOffset = startByteOffset;
}

public ReadChannel(ReadableByteChannel in) {
this(in, 0);
}

public long bytesRead() { return bytesRead; }
Expand Down Expand Up @@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException {
return n;
}

public long getCurrentPositiion() { return startByteOffset + bytesRead; }

@Override
public void close() throws IOException {
if (this.in != null) {
Expand Down
Loading

0 comments on commit 8ca7033

Please sign in to comment.