From b39a20a6171d42e41d0b75c3a16989087880b3e4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Sat, 7 Jan 2023 09:07:06 +1300 Subject: [PATCH] Fix CSV infinite loop and improve error messages (#3470) * Fix CSV infinite loop and improve error messages * Doc --- arrow-csv/src/reader/records.rs | 85 +++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/arrow-csv/src/reader/records.rs b/arrow-csv/src/reader/records.rs index 711baa15278f..501da408815c 100644 --- a/arrow-csv/src/reader/records.rs +++ b/arrow-csv/src/reader/records.rs @@ -31,7 +31,7 @@ pub struct RecordReader { num_columns: usize, - num_rows: usize, + line_number: usize, offsets: Vec, data: Vec, } @@ -42,19 +42,21 @@ impl RecordReader { reader, delimiter, num_columns, - num_rows: 0, + line_number: 1, offsets: vec![], data: vec![], } } - fn fill_buf(&mut self, to_read: usize) -> Result<(), ArrowError> { + /// Clears and then fills the buffers on this [`RecordReader`] + /// returning the number of records read + fn fill_buf(&mut self, to_read: usize) -> Result { // Reserve sufficient capacity in offsets self.offsets.resize(to_read * self.num_columns + 1, 0); - self.num_rows = 0; + let mut read = 0; if to_read == 0 { - return Ok(()); + return Ok(0); } // The current offset into `self.data` @@ -71,7 +73,7 @@ impl RecordReader { 'input: loop { // Reserve necessary space in output data based on best estimate - let remaining_rows = to_read - self.num_rows; + let remaining_rows = to_read - read; let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE; let estimated_data = capacity.max(MIN_CAPACITY); self.data.resize(output_offset + estimated_data, 0); @@ -94,24 +96,26 @@ impl RecordReader { ReadRecordResult::InputEmpty => break 'input, // Input exhausted, need to read more ReadRecordResult::OutputFull => break, // Need to allocate more capacity ReadRecordResult::OutputEndsFull => { - return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got more than {}", self.num_columns, field_count))) + let line_number = self.line_number + read; + return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got more than {}", line_number, self.num_columns, field_count))); } ReadRecordResult::Record => { if field_count != self.num_columns { - return Err(ArrowError::CsvError(format!("incorrect number of fields, expected {} got {}", self.num_columns, field_count))) + let line_number = self.line_number + read; + return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got {}", line_number, self.num_columns, field_count))); } - self.num_rows += 1; + read += 1; field_count = 0; - if self.num_rows == to_read { - break 'outer // Read sufficient rows + if read == to_read { + break 'outer; // Read sufficient rows } if input.len() == input_offset { // Input exhausted, need to read more // Without this read_record will interpret the empty input // byte array as indicating the end of the file - break 'input + break 'input; } } } @@ -135,28 +139,38 @@ impl RecordReader { }); }); - Ok(()) + self.line_number += read; + + Ok(read) } - /// Skips forward `to_skip` rows - pub fn skip(&mut self, mut to_skip: usize) -> Result<(), ArrowError> { + /// Skips forward `to_skip` rows, returning an error if insufficient lines in source + pub fn skip(&mut self, to_skip: usize) -> Result<(), ArrowError> { // TODO: This could be done by scanning for unquoted newline delimiters - while to_skip != 0 { - self.fill_buf(to_skip.min(1024))?; - to_skip -= self.num_rows; + let mut skipped = 0; + while to_skip > skipped { + let read = self.fill_buf(to_skip.min(1024))?; + if read == 0 { + return Err(ArrowError::CsvError(format!( + "Failed to skip {} rows only found {}", + to_skip, skipped + ))); + } + + skipped += read; } Ok(()) } /// Reads up to `to_read` rows from the reader pub fn read(&mut self, to_read: usize) -> Result, ArrowError> { - self.fill_buf(to_read)?; + let num_rows = self.fill_buf(to_read)?; // Need to slice fields to the actual number of rows read // // We intentionally avoid using `Vec::truncate` to avoid having // to re-initialize the data again - let num_fields = self.num_rows * self.num_columns; + let num_fields = num_rows * self.num_columns; let last_offset = self.offsets[num_fields]; // Need to truncate data to the actual amount of data read @@ -165,8 +179,8 @@ impl RecordReader { })?; Ok(StringRecords { + num_rows, num_columns: self.num_columns, - num_rows: self.num_rows, offsets: &self.offsets[..num_fields + 1], data, }) @@ -263,4 +277,33 @@ mod tests { }) } } + + #[test] + fn test_invalid_fields() { + let csv = "a,b\nb,c\na\n"; + let cursor = Cursor::new(csv.as_bytes()); + let mut reader = RecordReader::new(cursor, Reader::new(), 2); + let err = reader.read(4).unwrap_err().to_string(); + + let expected = + "Csv error: incorrect number of fields for line 3, expected 2 got 1"; + + assert_eq!(err, expected); + + // Test with initial skip + let cursor = Cursor::new(csv.as_bytes()); + let mut reader = RecordReader::new(cursor, Reader::new(), 2); + reader.skip(1).unwrap(); + let err = reader.read(4).unwrap_err().to_string(); + assert_eq!(err, expected); + } + + #[test] + fn test_skip_insufficient_rows() { + let csv = "a\nv\n"; + let cursor = Cursor::new(csv.as_bytes()); + let mut reader = RecordReader::new(cursor, Reader::new(), 1); + let err = reader.skip(3).unwrap_err().to_string(); + assert_eq!(err, "Csv error: Failed to skip 3 rows only found 2"); + } }