From c06504d45483465180e1df24b3f0aa495a3406de Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 2 Oct 2023 06:44:25 +0900 Subject: [PATCH] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() We can reuse the same StreamDecoder to read multiple streams with this. --- cpp/src/arrow/ipc/read_write_test.cc | 32 ++++++++++++++++++++++++++++ cpp/src/arrow/ipc/reader.cc | 13 +++++++++-- cpp/src/arrow/ipc/reader.h | 31 +++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 3ae007c20efe7..f6f51914dd5ca 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -2161,6 +2161,38 @@ TEST(TestRecordBatchStreamReader, MalformedInput) { ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&garbage_reader)); } +namespace { +class EndlessCollectListener : public CollectListener { + public: + EndlessCollectListener() : CollectListener(), decoder_(nullptr) {} + + void SetDecoder(StreamDecoder* decoder) { decoder_ = decoder; } + + arrow::Status OnEOS() override { return decoder_->Reset(); } + + private: + StreamDecoder* decoder_; +}; +}; // namespace + +TEST(TestStreamDecoder, Reset) { + auto listener = std::make_shared(); + StreamDecoder decoder(listener); + listener->SetDecoder(&decoder); + + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + StreamWriterHelper writer_helper; + ASSERT_OK(writer_helper.Init(batch->schema(), IpcWriteOptions::Defaults())); + ASSERT_OK(writer_helper.WriteBatch(batch)); + ASSERT_OK(writer_helper.Finish()); + + ASSERT_OK(decoder.Consume(writer_helper.buffer_)); + ASSERT_EQ(1, listener->num_record_batches()); + ASSERT_OK(decoder.Consume(writer_helper.buffer_)); + ASSERT_EQ(2, listener->num_record_batches()); +} + TEST(TestStreamDecoder, NextRequiredSize) { auto listener = std::make_shared(); StreamDecoder decoder(listener); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 6e801e1f8adb7..e895bc545bdc0 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -909,14 +909,18 @@ class StreamDecoderInternal : public MessageDecoderListener { return listener_->OnEOS(); } + std::shared_ptr listener() const { return listener_; } + Listener* raw_listener() const { return listener_.get(); } + IpcReadOptions options() const { return options_; } + + State state() const { return state_; } + std::shared_ptr schema() const { return filtered_schema_; } ReadStats stats() const { return stats_; } - State state() const { return state_; } - int num_required_initial_dictionaries() const { return num_required_initial_dictionaries_; } @@ -2032,6 +2036,11 @@ Status StreamDecoder::Consume(const uint8_t* data, int64_t size) { Status StreamDecoder::Consume(std::shared_ptr buffer) { return impl_->Consume(std::move(buffer)); } +Status StreamDecoder::Reset() { + impl_ = + std::make_unique(std::move(impl_->listener()), impl_->options()); + return Status::OK(); +} std::shared_ptr StreamDecoder::schema() const { return impl_->schema(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 0d7ae22264052..2c61630b2b6e3 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -425,6 +425,37 @@ class ARROW_EXPORT StreamDecoder { /// \return Status Status Consume(std::shared_ptr buffer); + /// \brief Reset the internal status. + /// + /// You can reuse this decoder for new stream after calling + /// this. For example, you can implement endless decoder with this: + /// + /// ~~~{.cpp} + /// class EndlessListener : public arrow::ipc::Listener { + /// public: + /// explicit EndlessListener() : arrow::ipc::Listener(), decoder_() {} + /// + /// arrow::Status OnEOS() override { + /// return decoder_->Reset(); + /// } + /// + /// void SetDecoder(arrow::ipc::StreamDecoder* decoder) { + /// decoder_ = decoder; + /// } + /// + /// private: + /// arrow::ipc::StreamDecoder* decoder_; + /// }; + /// + /// auto listener = std::make_shared(); + /// arrow::ipc::StreamDecoder decoder(listener); + /// listener->SetDecoder(&decoder); + /// decoder.Consume(...); + /// ~~~ + /// + /// \return Status + Status Reset(); + /// \return the shared schema of the record batches in the stream std::shared_ptr schema() const;