Skip to content

Commit

Permalink
Fix a bug: buffer_size_ should not regard as bytes_buffered_ + buffer…
Browse files Browse the repository at this point in the history
…_pos_
  • Loading branch information
mapleFU committed Oct 15, 2024
1 parent 1970634 commit e3c7eea
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
23 changes: 12 additions & 11 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class BufferedBase {
return !is_open_;
}

// Reset the `buffer_` to a new buffer of size `buffer_size_`
// Allocate buffer_ if needed, and resize it to buffer_size_ if required.
Status ResetBuffer() {
if (!buffer_) {
// On first invocation, or if the buffer has been released, we allocate a
Expand Down Expand Up @@ -284,9 +284,9 @@ class BufferedInputStream::Impl : public BufferedBase {
}

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
//
// SetBufferSize will not change the buffer_pos_ and bytes_buffered_.
// should not be larger than the raw_read_bound_.
// It might change the buffer_size_, but will not change buffer states
// buffer_pos_ and bytes_buffered_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
Expand All @@ -301,13 +301,14 @@ class BufferedInputStream::Impl : public BufferedBase {
// No need to reserve space for more than the total remaining number of bytes.
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// contains any required data.
// contain any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
} else {
// We should keeping the current buffer because it contains data that
// We should keep the current buffer because it contains data that
// can be read.
new_buffer_size =
std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_));
std::min(new_buffer_size,
buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
}
return ResizeBuffer(new_buffer_size);
Expand All @@ -325,7 +326,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

// Increase the buffer size if needed.
if (nbytes > buffer_->size() - buffer_pos_) {
if (nbytes > buffer_size_ - buffer_pos_) {
RETURN_NOT_OK(SetBufferSize(nbytes + buffer_pos_));
DCHECK(buffer_->size() - buffer_pos_ >= nbytes);
}
Expand Down Expand Up @@ -364,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

Status DoBuffer() {
// Refill the buffer from the raw stream with `buffer_size_` bytes.
// Fill the buffer from the raw stream with at most `buffer_size_` bytes.
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Expand Down Expand Up @@ -458,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase {
// The default -1 indicates that it is unbounded
int64_t raw_read_bound_;

// Number of remaining bytes in the buffer, to be reduced on each read from
// the buffer
// Number of remaining valid bytes in the buffer, to be reduced on each read
// from the buffer.
int64_t bytes_buffered_;
};

Expand Down
23 changes: 17 additions & 6 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,17 +491,28 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) {
}
}

TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) {
TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) {
// GH-43949: Peek and SetBufferSize should not affect the
// buffered bytes.
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK(buffered_->Read(9));
ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9));
EXPECT_EQ(std::string_view(bytes->data_as<char>(), bytes->size()),
kExample1.substr(0, 9));
ASSERT_EQ(1, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3));
EXPECT_EQ(view, kExample1.substr(9, 3));
ASSERT_EQ(3, buffered_->bytes_buffered());
ASSERT_EQ(12, buffered_->buffer_size());
ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10));
EXPECT_EQ(view, kExample1.substr(9, 6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
// Do read
ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6));
EXPECT_EQ(std::string_view(bytes->data_as<char>(), bytes->size()),
kExample1.substr(9, 6));
ASSERT_EQ(0, buffered_->bytes_buffered());
}

class TestBufferedInputStreamBound : public ::testing::Test {
Expand Down

0 comments on commit e3c7eea

Please sign in to comment.