From 497582906e826d3045e2c7b9d4ddb44ce6d42adf Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 30 Oct 2024 22:40:51 +0100 Subject: [PATCH] Report file location and offset when CSV schema mismatch (#13185) --- .../core/src/datasource/file_format/csv.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 3cb5ae4f85ca..2aaef2cda1c8 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -325,7 +325,13 @@ impl FileFormat for CsvFormat { let stream = self.read_to_delimited_chunks(store, object).await; let (schema, records_read) = self .infer_schema_from_stream(state, records_to_read, stream) - .await?; + .await + .map_err(|err| { + DataFusionError::Context( + format!("Error when processing CSV file {}", &object.location), + Box::new(err), + ) + })?; records_to_read -= records_read; schemas.push(schema); if records_to_read == 0 { @@ -433,11 +439,13 @@ impl CsvFormat { let mut total_records_read = 0; let mut column_names = vec![]; let mut column_type_possibilities = vec![]; - let mut first_chunk = true; + let mut record_number = -1; pin_mut!(stream); while let Some(chunk) = stream.next().await.transpose()? { + record_number += 1; + let first_chunk = record_number == 0; let mut format = arrow::csv::reader::Format::default() .with_header( first_chunk @@ -471,14 +479,14 @@ impl CsvFormat { (field.name().clone(), possibilities) }) .unzip(); - first_chunk = false; } else { if fields.len() != column_type_possibilities.len() { return exec_err!( "Encountered unequal lengths between records on CSV file whilst inferring schema. \ - Expected {} records, found {} records", + Expected {} fields, found {} fields at record {}", column_type_possibilities.len(), - fields.len() + fields.len(), + record_number + 1 ); }