From 7d5a8186a514c57c3db2118677c01f61956396fe Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 24 Oct 2024 20:15:29 +0800 Subject: [PATCH] GH-43949: [C++] io::BufferedInput: Fix invalid state after SetBufferSize (#44387) ### Rationale for this change See https://github.com/apache/arrow/issues/43949 The problem is `Peek` and `Read` both calls `SetBufferSize`, however: 1. `Read` implicit says that, when `SetBufferSize` or read, the previous buffer is not being required. In this scenerio, `bytes_buffered_` is always 0, since `Read` will consume all data. And `new_buffer_size == std::min(new_buffer_size, (raw_read_bound_ - raw_read_total_))` 2. `Peek` still requires the buffer-size here. So, `bytes_buffered_` might not 0. When `Peek`, the `new_buffer_size` would less than expected, which shrinks the buffer ### What changes are included in this PR? Update the Logic of `SetBufferSize` 1. If `bytes_buffered_ == 0`, `SetBufferSize` can discard the current buffer 2. Otherwise, `SetBufferSize` should resize minimal to `buffer_size_ + (raw_read_bound_ - raw_read_total_)`, since it should considering the current buffer ### Are these changes tested? Yes ### Are there any user-facing changes? Bugfix * GitHub Issue: #43949 Lead-authored-by: mwish Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/io/buffered.cc | 29 ++++++++++++++++++++++------- cpp/src/arrow/io/buffered.h | 1 + cpp/src/arrow/io/buffered_test.cc | 23 +++++++++++++++++++++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index b41e4257af215..25cf79c78d916 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -51,6 +51,7 @@ class BufferedBase { return !is_open_; } + // Allocate buffer_ if needed, and resize it to buffer_size_ if required. Status ResetBuffer() { if (!buffer_) { // On first invocation, or if the buffer has been released, we allocate a @@ -283,18 +284,32 @@ class BufferedInputStream::Impl : public BufferedBase { } // Resize internal read buffer. Note that the internal buffer-size - // should be not larger than the raw_read_bound_. + // should not be larger than the raw_read_bound_. + // It might change the buffer_size_, but will not change buffer states + // buffer_pos_ and bytes_buffered_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); } if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { - return Status::Invalid("Cannot shrink read buffer if buffered data remains"); + return Status::Invalid( + "Cannot shrink read buffer if buffered data remains, new_buffer_size: ", + new_buffer_size, ", buffer_pos: ", buffer_pos_, + ", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_); } if (raw_read_bound_ >= 0) { // No need to reserve space for more than the total remaining number of bytes. - new_buffer_size = std::min(new_buffer_size, - bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); + if (bytes_buffered_ == 0) { + // Special case: we can not keep the current buffer because it does not + // contain any required data. + new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); + } else { + // We should keep the current buffer because it contains data that + // can be read. + new_buffer_size = + std::min(new_buffer_size, + buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); + } } return ResizeBuffer(new_buffer_size); } @@ -350,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase { } Status DoBuffer() { - // Fill buffer + // Fill the buffer from the raw stream with at most `buffer_size_` bytes. if (!buffer_) { RETURN_NOT_OK(ResetBuffer()); } @@ -444,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase { // The default -1 indicates that it is unbounded int64_t raw_read_bound_; - // Number of remaining bytes in the buffer, to be reduced on each read from - // the buffer + // Number of remaining valid bytes in the buffer, to be reduced on each read + // from the buffer. int64_t bytes_buffered_; }; diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h index 01c0a016daba0..22ea7520a5050 100644 --- a/cpp/src/arrow/io/buffered.h +++ b/cpp/src/arrow/io/buffered.h @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream int64_t raw_read_bound = -1); /// \brief Resize internal read buffer; calls to Read(...) will read at least + /// this many bytes from the raw InputStream if possible. /// \param[in] new_buffer_size the new read buffer size /// \return Status Status SetBufferSize(int64_t new_buffer_size); diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index 89fe4b159f341..1d4805f580c39 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -491,6 +491,29 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) { } } +TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) { + // GH-43949: Peek and SetBufferSize should not affect the + // buffered bytes. + MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15); + ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9)); + EXPECT_EQ(std::string_view(*bytes), kExample1.substr(0, 9)); + ASSERT_EQ(1, buffered_->bytes_buffered()); + ASSERT_EQ(10, buffered_->buffer_size()); + ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3)); + EXPECT_EQ(view, kExample1.substr(9, 3)); + ASSERT_EQ(3, buffered_->bytes_buffered()); + ASSERT_EQ(12, buffered_->buffer_size()); + ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10)); + // Peek() cannot go past the `raw_read_bound` + EXPECT_EQ(view, kExample1.substr(9, 6)); + ASSERT_EQ(6, buffered_->bytes_buffered()); + ASSERT_EQ(15, buffered_->buffer_size()); + // Do read + ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6)); + EXPECT_EQ(std::string_view(*bytes), kExample1.substr(9, 6)); + ASSERT_EQ(0, buffered_->bytes_buffered()); +} + class TestBufferedInputStreamBound : public ::testing::Test { public: void SetUp() { CreateExample(/*bounded=*/true); }