Skip to content

Commit

Permalink
Pass the given data as suitable chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Oct 10, 2023
1 parent e753e52 commit bc689ac
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
11 changes: 8 additions & 3 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2187,10 +2187,15 @@ TEST(TestStreamDecoder, Reset) {
ASSERT_OK(writer_helper.WriteBatch(batch));
ASSERT_OK(writer_helper.Finish());

ASSERT_OK(decoder.Consume(writer_helper.buffer_));
ASSERT_EQ(1, listener->num_record_batches());
ASSERT_OK(decoder.Consume(writer_helper.buffer_));
ASSERT_OK_AND_ASSIGN(auto all_buffer, ConcatenateBuffers({writer_helper.buffer_,
writer_helper.buffer_}));
// Consume by Buffer
ASSERT_OK(decoder.Consume(all_buffer));
ASSERT_EQ(2, listener->num_record_batches());

// Consume by raw data
ASSERT_OK(decoder.Consume(all_buffer->data(), all_buffer->size()));
ASSERT_EQ(4, listener->num_record_batches());
}

TEST(TestStreamDecoder, NextRequiredSize) {
Expand Down
67 changes: 65 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,8 @@ class StreamDecoder::StreamDecoderImpl : public StreamDecoderInternal {

int64_t next_required_size() const { return message_decoder_.next_required_size(); }

const MessageDecoder* message_decoder() const { return &message_decoder_; }

private:
MessageDecoder message_decoder_;
};
Expand All @@ -2031,11 +2033,72 @@ StreamDecoder::StreamDecoder(std::shared_ptr<Listener> listener, IpcReadOptions
StreamDecoder::~StreamDecoder() {}

Status StreamDecoder::Consume(const uint8_t* data, int64_t size) {
return impl_->Consume(data, size);
while (size > 0) {
const auto next_required_size = impl_->next_required_size();
if (next_required_size == 0) {
break;
}
if (size < next_required_size) {
break;
}
ARROW_RETURN_NOT_OK(impl_->Consume(data, next_required_size));
data += next_required_size;
size -= next_required_size;
}
if (size > 0) {
return impl_->Consume(data, size);
} else {
return arrow::Status::OK();
}
}

Status StreamDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->Consume(std::move(buffer));
if (buffer->size() == 0) {
return arrow::Status::OK();
}
if (impl_->next_required_size() == 0 || buffer->size() <= impl_->next_required_size()) {
return impl_->Consume(std::move(buffer));
} else {
int64_t offset = 0;
while (true) {
const auto next_required_size = impl_->next_required_size();
if (next_required_size == 0) {
break;
}
if (buffer->size() - offset <= next_required_size) {
break;
}
if (buffer->is_cpu()) {
switch (impl_->message_decoder()->state()) {
case MessageDecoder::State::INITIAL:
case MessageDecoder::State::METADATA_LENGTH:
// We don't need to pass a sliced buffer because
// MessageDecoder doesn't keep reference of the given
// buffer on these states.
ARROW_RETURN_NOT_OK(
impl_->Consume(buffer->data() + offset, next_required_size));
break;
default:
ARROW_RETURN_NOT_OK(
impl_->Consume(SliceBuffer(buffer, offset, next_required_size)));
break;
}
} else {
ARROW_RETURN_NOT_OK(
impl_->Consume(SliceBuffer(buffer, offset, next_required_size)));
}
offset += next_required_size;
}
if (buffer->size() - offset == 0) {
return arrow::Status::OK();
} else if (offset == 0) {
return impl_->Consume(std::move(buffer));
} else {
return impl_->Consume(SliceBuffer(std::move(buffer), offset));
}
}
}

Status StreamDecoder::Reset() {
impl_ = std::make_unique<StreamDecoderImpl>(impl_->listener(), impl_->options());
return Status::OK();
Expand Down

0 comments on commit bc689ac

Please sign in to comment.