Skip to content

Commit

Permalink
GH-43949: [C++] io::BufferedInput: Fix invalid state after SetBufferS…
Browse files Browse the repository at this point in the history
…ize (#44387)

### Rationale for this change

See #43949

The problem is `Peek` and `Read` both calls `SetBufferSize`, however:
1. `Read` implicit says that, when `SetBufferSize` or read, the previous buffer is not being required. In this scenerio, `bytes_buffered_` is always 0, since `Read` will consume all data. And `new_buffer_size == std::min(new_buffer_size, (raw_read_bound_ - raw_read_total_))`
2. `Peek` still requires the buffer-size here. So, `bytes_buffered_` might not 0. When `Peek`, the `new_buffer_size` would less than expected, which shrinks the buffer

### What changes are included in this PR?

Update the Logic of `SetBufferSize`

1. If `bytes_buffered_ == 0`, `SetBufferSize` can discard the current buffer
2. Otherwise, `SetBufferSize` should resize minimal to `buffer_size_ + (raw_read_bound_ - raw_read_total_)`, since it should considering the current buffer

### Are these changes tested?

Yes

### Are there any user-facing changes?

Bugfix

* GitHub Issue: #43949

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
3 people authored Oct 24, 2024
1 parent 7ef5437 commit 7d5a818
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
29 changes: 22 additions & 7 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BufferedBase {
return !is_open_;
}

// Allocate buffer_ if needed, and resize it to buffer_size_ if required.
Status ResetBuffer() {
if (!buffer_) {
// On first invocation, or if the buffer has been released, we allocate a
Expand Down Expand Up @@ -283,18 +284,32 @@ class BufferedInputStream::Impl : public BufferedBase {
}

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
// should not be larger than the raw_read_bound_.
// It might change the buffer_size_, but will not change buffer states
// buffer_pos_ and bytes_buffered_.
Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
return Status::Invalid(
"Cannot shrink read buffer if buffered data remains, new_buffer_size: ",
new_buffer_size, ", buffer_pos: ", buffer_pos_,
", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_);
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// contain any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
} else {
// We should keep the current buffer because it contains data that
// can be read.
new_buffer_size =
std::min(new_buffer_size,
buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
}
}
return ResizeBuffer(new_buffer_size);
}
Expand Down Expand Up @@ -350,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

Status DoBuffer() {
// Fill buffer
// Fill the buffer from the raw stream with at most `buffer_size_` bytes.
if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Expand Down Expand Up @@ -444,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase {
// The default -1 indicates that it is unbounded
int64_t raw_read_bound_;

// Number of remaining bytes in the buffer, to be reduced on each read from
// the buffer
// Number of remaining valid bytes in the buffer, to be reduced on each read
// from the buffer.
int64_t bytes_buffered_;
};

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream
int64_t raw_read_bound = -1);

/// \brief Resize internal read buffer; calls to Read(...) will read at least
/// this many bytes from the raw InputStream if possible.
/// \param[in] new_buffer_size the new read buffer size
/// \return Status
Status SetBufferSize(int64_t new_buffer_size);
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,29 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) {
}
}

TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) {
// GH-43949: Peek and SetBufferSize should not affect the
// buffered bytes.
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(0, 9));
ASSERT_EQ(1, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());
ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3));
EXPECT_EQ(view, kExample1.substr(9, 3));
ASSERT_EQ(3, buffered_->bytes_buffered());
ASSERT_EQ(12, buffered_->buffer_size());
ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10));
// Peek() cannot go past the `raw_read_bound`
EXPECT_EQ(view, kExample1.substr(9, 6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
// Do read
ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6));
EXPECT_EQ(std::string_view(*bytes), kExample1.substr(9, 6));
ASSERT_EQ(0, buffered_->bytes_buffered());
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down

0 comments on commit 7d5a818

Please sign in to comment.