-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43949: [C++] io::BufferedInput: Fix invalid state after SetBufferSize #44387
Changes from 2 commits
1970634
e3c7eea
9f39c99
0a111dc
aecf05e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ class BufferedBase { | |
return !is_open_; | ||
} | ||
|
||
// Allocate buffer_ if needed, and resize it to buffer_size_ if required. | ||
Status ResetBuffer() { | ||
if (!buffer_) { | ||
// On first invocation, or if the buffer has been released, we allocate a | ||
|
@@ -283,18 +284,32 @@ class BufferedInputStream::Impl : public BufferedBase { | |
} | ||
|
||
// Resize internal read buffer. Note that the internal buffer-size | ||
// should be not larger than the raw_read_bound_. | ||
// should not be larger than the raw_read_bound_. | ||
// It might change the buffer_size_, but will not change buffer states | ||
// buffer_pos_ and bytes_buffered_. | ||
Status SetBufferSize(int64_t new_buffer_size) { | ||
if (new_buffer_size <= 0) { | ||
return Status::Invalid("Buffer size should be positive"); | ||
} | ||
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { | ||
return Status::Invalid("Cannot shrink read buffer if buffered data remains"); | ||
return Status::Invalid( | ||
"Cannot shrink read buffer if buffered data remains, new_buffer_size:", | ||
new_buffer_size, ", buffer_pos:", buffer_pos_, | ||
", bytes_buffered:", bytes_buffered_, ", buffer_size:", buffer_size_); | ||
mapleFU marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if (raw_read_bound_ >= 0) { | ||
// No need to reserve space for more than the total remaining number of bytes. | ||
new_buffer_size = std::min(new_buffer_size, | ||
bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); | ||
if (bytes_buffered_ == 0) { | ||
// Special case: we can not keep the current buffer because it does not | ||
// contain any required data. | ||
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); | ||
} else { | ||
// We should keep the current buffer because it contains data that | ||
// can be read. | ||
new_buffer_size = | ||
std::min(new_buffer_size, | ||
buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); | ||
} | ||
} | ||
return ResizeBuffer(new_buffer_size); | ||
} | ||
|
@@ -311,7 +326,7 @@ class BufferedInputStream::Impl : public BufferedBase { | |
} | ||
|
||
// Increase the buffer size if needed. | ||
if (nbytes > buffer_->size() - buffer_pos_) { | ||
if (nbytes > buffer_size_ - buffer_pos_) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the DCHECK below be changed as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll resume |
||
RETURN_NOT_OK(SetBufferSize(nbytes + buffer_pos_)); | ||
DCHECK(buffer_->size() - buffer_pos_ >= nbytes); | ||
} | ||
|
@@ -350,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase { | |
} | ||
|
||
Status DoBuffer() { | ||
// Fill buffer | ||
// Fill the buffer from the raw stream with at most `buffer_size_` bytes. | ||
if (!buffer_) { | ||
RETURN_NOT_OK(ResetBuffer()); | ||
} | ||
|
@@ -444,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase { | |
// The default -1 indicates that it is unbounded | ||
int64_t raw_read_bound_; | ||
|
||
// Number of remaining bytes in the buffer, to be reduced on each read from | ||
// the buffer | ||
// Number of remaining valid bytes in the buffer, to be reduced on each read | ||
// from the buffer. | ||
int64_t bytes_buffered_; | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream | |
int64_t raw_read_bound = -1); | ||
|
||
/// \brief Resize internal read buffer; calls to Read(...) will read at least | ||
/// this many bytes from the raw InputStream if possible. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite understand what's the intent of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both ok to me |
||
/// \param[in] new_buffer_size the new read buffer size | ||
/// \return Status | ||
Status SetBufferSize(int64_t new_buffer_size); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually occur using the public APIs? If yes, can we add a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes actually Peek test tests this. Previously,
PeekPastBufferedBytes
will raise error here.