From fcd436f56194341e1a7c95684e214b4c385a3dfe Mon Sep 17 00:00:00 2001 From: Hans Kratz Date: Wed, 18 Sep 2024 20:58:39 +0200 Subject: [PATCH] Fix issues with `AsyncBufRead::read_line` and `AsyncBufReadExt::lines` (#2884) Fixes the following issues in `AsyncBufRead::read_line`: * When the future is dropped the previous string contents are not restored so the string is empty. * If invalid UTF-8 is encountered the previous string contents are not restored. * If an IO error occurs after `read_until_internal` already read a couple of bytes a debug assertion fails. * Performance: The whole string to which read contents are appended is check for UTF-8 validity instead of just the added bytes. Fixes the following issues in `AsyncBufRead::read_line`: * If an IO error occurs after `read_until_internal` already read a couple of bytes a debug assertion fails. (#2862) Fixes #2862 --- futures-util/src/io/lines.rs | 1 + futures-util/src/io/read_line.rs | 47 +++++++++++++++++++++---------- futures-util/src/io/read_until.rs | 3 +- futures/tests/io_lines.rs | 26 ++++++++++++++++- futures/tests/io_read_line.rs | 43 ++++++++++++++++++++++++++++ 5 files changed, 102 insertions(+), 18 deletions(-) diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs index 8c4d17c584..0a1abf4bf6 100644 --- a/futures-util/src/io/lines.rs +++ b/futures-util/src/io/lines.rs @@ -35,6 +35,7 @@ impl Stream for Lines { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?; + *this.read = 0; if n == 0 && this.buf.is_empty() { return Poll::Ready(None); } diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index b4483223dc..43942add54 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -18,13 +18,14 @@ pub struct ReadLine<'a, R: ?Sized> { buf: &'a mut String, bytes: Vec, read: usize, + finished: bool, } impl Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0, finished: false } } } @@ -35,26 +36,42 @@ pub(super) fn read_line_internal( bytes: &mut Vec, read: &mut usize, ) -> Poll> { - let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); - if str::from_utf8(bytes).is_err() { - bytes.clear(); - Poll::Ready(ret.and_then(|_| { - Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) - })) - } else { - debug_assert!(buf.is_empty()); - debug_assert_eq!(*read, 0); - // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. - mem::swap(unsafe { buf.as_mut_vec() }, bytes); - Poll::Ready(ret) + let mut ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes[bytes.len() - *read..bytes.len()]).is_err() { + bytes.truncate(bytes.len() - *read); + if ret.is_ok() { + ret = Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )); + } } + *read = 0; + // Safety: `bytes` is valid UTF-8 because it was taken from a String + // and the newly read bytes are either valid UTF-8 or have been removed. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) } impl Future for ReadLine<'_, R> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf, bytes, read } = &mut *self; - read_line_internal(Pin::new(reader), cx, buf, bytes, read) + let Self { reader, buf, bytes, read, finished: _ } = &mut *self; + let ret = ready!(read_line_internal(Pin::new(reader), cx, buf, bytes, read)); + self.finished = true; + Poll::Ready(ret) + } +} + +impl Drop for ReadLine<'_, R> { + fn drop(&mut self) { + // restore old string contents + if !self.finished { + self.bytes.truncate(self.bytes.len() - self.read); + // Safety: `bytes` is valid UTF-8 because it was taken from a String + // and the newly read bytes have been removed. + mem::swap(unsafe { self.buf.as_mut_vec() }, &mut self.bytes); + } } } diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs index d6121d6f05..adc359db51 100644 --- a/futures-util/src/io/read_until.rs +++ b/futures-util/src/io/read_until.rs @@ -3,7 +3,6 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; use std::io; -use std::mem; use std::pin::Pin; use std::vec::Vec; @@ -46,7 +45,7 @@ pub(super) fn read_until_internal( reader.as_mut().consume(used); *read += used; if done || used == 0 { - return Poll::Ready(Ok(mem::replace(read, 0))); + return Poll::Ready(Ok(*read)); } } } diff --git a/futures/tests/io_lines.rs b/futures/tests/io_lines.rs index 5ce01a6945..62afef326a 100644 --- a/futures/tests/io_lines.rs +++ b/futures/tests/io_lines.rs @@ -1,6 +1,6 @@ use futures::executor::block_on; use futures::future::{Future, FutureExt}; -use futures::io::{AsyncBufReadExt, Cursor}; +use futures::io::{AsyncBufReadExt, AsyncRead, Cursor}; use futures::stream::{self, StreamExt, TryStreamExt}; use futures::task::Poll; use futures_test::io::AsyncReadTestExt; @@ -27,6 +27,24 @@ macro_rules! run_next { }; } +struct IOErrorRead(bool); + +impl AsyncRead for IOErrorRead { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &mut [u8], + ) -> Poll> { + if self.0 { + Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into())) + } else { + self.0 = true; + b[..16].fill(b'x'); + Ok(16).into() + } + } +} + #[test] fn lines() { let buf = Cursor::new(&b"12\r"[..]); @@ -58,3 +76,9 @@ fn maybe_pending() { assert_eq!(run_next!(s), "".to_string()); assert!(run(s.next()).is_none()); } + +#[test] +fn issue2862() { + let mut lines = futures::io::BufReader::new(IOErrorRead(false)).lines(); + assert!(block_on(lines.next()).unwrap().is_err()) +} diff --git a/futures/tests/io_read_line.rs b/futures/tests/io_read_line.rs index 88a877928a..c7559c593b 100644 --- a/futures/tests/io_read_line.rs +++ b/futures/tests/io_read_line.rs @@ -3,6 +3,7 @@ use futures::future::{Future, FutureExt}; use futures::io::{AsyncBufReadExt, Cursor}; use futures::stream::{self, StreamExt, TryStreamExt}; use futures::task::Poll; +use futures::AsyncRead; use futures_test::io::AsyncReadTestExt; use futures_test::task::noop_context; @@ -15,6 +16,24 @@ fn run(mut f: F) -> F::Output { } } +struct IOErrorRead(bool); + +impl AsyncRead for IOErrorRead { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &mut [u8], + ) -> Poll> { + if self.0 { + Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into())) + } else { + self.0 = true; + b[..16].fill(b'x'); + Ok(16).into() + } + } +} + #[test] fn read_line() { let mut buf = Cursor::new(b"12"); @@ -34,6 +53,30 @@ fn read_line() { assert_eq!(v, ""); } +#[test] +fn read_line_drop() { + // string contents should be preserved if the future is dropped + let mut buf = Cursor::new(b"12\n\n"); + let mut v = String::from("abc"); + drop(buf.read_line(&mut v)); + assert_eq!(v, "abc"); +} + +#[test] +fn read_line_io_error() { + let mut r = futures::io::BufReader::new(IOErrorRead(false)); + let _ = block_on(r.read_line(&mut String::new())); +} + +#[test] +fn read_line_utf8_error() { + let mut buf = Cursor::new(b"12\xFF\n\n"); + let mut v = String::from("abc"); + let res = block_on(buf.read_line(&mut v)); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData); + assert_eq!(v, "abc"); +} + #[test] fn maybe_pending() { let mut buf = b"12".interleave_pending();