Skip to content

Commit

Permalink
apacheGH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset()
Browse files Browse the repository at this point in the history
We can reuse the same StreamDecoder to read multiple streams with
this.
  • Loading branch information
kou committed Oct 1, 2023
1 parent a381c05 commit 7eb56df
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
32 changes: 32 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,38 @@ TEST(TestRecordBatchStreamReader, MalformedInput) {
ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&garbage_reader));
}

namespace {
class EndlessCollectListener : public CollectListener {
public:
EndlessCollectListener() : CollectListener(), decoder_(nullptr) {}

void SetDecoder(StreamDecoder* decoder) { decoder_ = decoder; }

arrow::Status OnEOS() override { return decoder_->Reset(); }

private:
StreamDecoder* decoder_;
};
}; // namespace

TEST(TestStreamDecoder, Reset) {
auto listener = std::make_shared<EndlessCollectListener>();
StreamDecoder decoder(listener);
listener->SetDecoder(&decoder);

std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));
StreamWriterHelper writer_helper;
ASSERT_OK(writer_helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
ASSERT_OK(writer_helper.WriteBatch(batch));
ASSERT_OK(writer_helper.Finish());

ASSERT_OK(decoder.Consume(writer_helper.buffer_));
ASSERT_EQ(1, listener->num_record_batches());
ASSERT_OK(decoder.Consume(writer_helper.buffer_));
ASSERT_EQ(2, listener->num_record_batches());
}

TEST(TestStreamDecoder, NextRequiredSize) {
auto listener = std::make_shared<CollectListener>();
StreamDecoder decoder(listener);
Expand Down
13 changes: 11 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -909,14 +909,18 @@ class StreamDecoderInternal : public MessageDecoderListener {
return listener_->OnEOS();
}

std::shared_ptr<Listener> listener() const { return listener_; }

Listener* raw_listener() const { return listener_.get(); }

IpcReadOptions options() const { return options_; }

State state() const { return state_; }

std::shared_ptr<Schema> schema() const { return filtered_schema_; }

ReadStats stats() const { return stats_; }

State state() const { return state_; }

int num_required_initial_dictionaries() const {
return num_required_initial_dictionaries_;
}
Expand Down Expand Up @@ -2032,6 +2036,11 @@ Status StreamDecoder::Consume(const uint8_t* data, int64_t size) {
Status StreamDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->Consume(std::move(buffer));
}
Status StreamDecoder::Reset() {
impl_ =
std::make_unique<StreamDecoderImpl>(std::move(impl_->listener()), impl_->options());
return Status::OK();
}

std::shared_ptr<Schema> StreamDecoder::schema() const { return impl_->schema(); }

Expand Down
31 changes: 31 additions & 0 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,37 @@ class ARROW_EXPORT StreamDecoder {
/// \return Status
Status Consume(std::shared_ptr<Buffer> buffer);

/// \brief Reset the internal status.
///
/// You can reuse this decoder for new stream after calling
/// this. For example, you can implement endless decoder with this:
///
/// ~~~{.cpp}
/// class EndlessListener : public arrow::ipc::Listener {
/// public:
/// explicit EndlessListener() : arrow::ipc::Listener(), decoder_() {}
///
/// arrow::Status OnEOS() override {
/// return decoder_->Reset();
/// }
///
/// void SetDecoder(arrow::ipc::StreamDecoder* decoder) {
/// decoder_ = decoder;
/// }
///
/// private:
/// arrow::ipc::StreamDecoder* decoder_;
/// };
///
/// auto listener = std::make_shared<EndlessListener>();
/// arrow::ipc::StreamDecoder decoder(listener);
/// listener->SetDecoder(&decoder);
/// decoder.Consume(...);
/// ~~~
///
/// \return Status
Status Reset();

/// \return the shared schema of the record batches in the stream
std::shared_ptr<Schema> schema() const;

Expand Down

0 comments on commit 7eb56df

Please sign in to comment.