From e3c7eeac76b8c6b839f25601a31ba11cb6959225 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 15 Oct 2024 17:41:22 +0800 Subject: [PATCH] Fix a bug: buffer_size_ should not regard as bytes_buffered_ + buffer_pos_ --- cpp/src/arrow/io/buffered.cc | 23 ++++++++++++----------- cpp/src/arrow/io/buffered_test.cc | 23 +++++++++++++++++------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index 68d337905feae..714493ae68cc9 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -51,7 +51,7 @@ class BufferedBase { return !is_open_; } - // Reset the `buffer_` to a new buffer of size `buffer_size_` + // Allocate buffer_ if needed, and resize it to buffer_size_ if required. Status ResetBuffer() { if (!buffer_) { // On first invocation, or if the buffer has been released, we allocate a @@ -284,9 +284,9 @@ class BufferedInputStream::Impl : public BufferedBase { } // Resize internal read buffer. Note that the internal buffer-size - // should be not larger than the raw_read_bound_. - // - // SetBufferSize will not change the buffer_pos_ and bytes_buffered_. + // should not be larger than the raw_read_bound_. + // It might change the buffer_size_, but will not change buffer states + // buffer_pos_ and bytes_buffered_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); @@ -301,13 +301,14 @@ class BufferedInputStream::Impl : public BufferedBase { // No need to reserve space for more than the total remaining number of bytes. if (bytes_buffered_ == 0) { // Special case: we can not keep the current buffer because it does not - // contains any required data. + // contain any required data. new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); } else { - // We should keeping the current buffer because it contains data that + // We should keep the current buffer because it contains data that // can be read. new_buffer_size = - std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_)); + std::min(new_buffer_size, + buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); } } return ResizeBuffer(new_buffer_size); @@ -325,7 +326,7 @@ class BufferedInputStream::Impl : public BufferedBase { } // Increase the buffer size if needed. - if (nbytes > buffer_->size() - buffer_pos_) { + if (nbytes > buffer_size_ - buffer_pos_) { RETURN_NOT_OK(SetBufferSize(nbytes + buffer_pos_)); DCHECK(buffer_->size() - buffer_pos_ >= nbytes); } @@ -364,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase { } Status DoBuffer() { - // Refill the buffer from the raw stream with `buffer_size_` bytes. + // Fill the buffer from the raw stream with at most `buffer_size_` bytes. if (!buffer_) { RETURN_NOT_OK(ResetBuffer()); } @@ -458,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase { // The default -1 indicates that it is unbounded int64_t raw_read_bound_; - // Number of remaining bytes in the buffer, to be reduced on each read from - // the buffer + // Number of remaining valid bytes in the buffer, to be reduced on each read + // from the buffer. int64_t bytes_buffered_; }; diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index c77684f37fbce..d5cbb5455464c 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -491,17 +491,28 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) { } } -TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) { +TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) { + // GH-43949: Peek and SetBufferSize should not affect the + // buffered bytes. MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15); - ASSERT_OK(buffered_->Read(9)); + ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9)); + EXPECT_EQ(std::string_view(bytes->data_as(), bytes->size()), + kExample1.substr(0, 9)); ASSERT_EQ(1, buffered_->bytes_buffered()); ASSERT_EQ(10, buffered_->buffer_size()); - ASSERT_OK(buffered_->Peek(6)); - ASSERT_EQ(6, buffered_->bytes_buffered()); - ASSERT_EQ(15, buffered_->buffer_size()); - ASSERT_OK(buffered_->Peek(6)); + ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3)); + EXPECT_EQ(view, kExample1.substr(9, 3)); + ASSERT_EQ(3, buffered_->bytes_buffered()); + ASSERT_EQ(12, buffered_->buffer_size()); + ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10)); + EXPECT_EQ(view, kExample1.substr(9, 6)); ASSERT_EQ(6, buffered_->bytes_buffered()); ASSERT_EQ(15, buffered_->buffer_size()); + // Do read + ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6)); + EXPECT_EQ(std::string_view(bytes->data_as(), bytes->size()), + kExample1.substr(9, 6)); + ASSERT_EQ(0, buffered_->bytes_buffered()); } class TestBufferedInputStreamBound : public ::testing::Test {