Skip to content

Commit

Permalink
extended CSV options in SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Nov 27, 2023
1 parent 234217e commit 2422275
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 3 deletions.
2 changes: 2 additions & 0 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ impl ListingSchemaProvider {
file_type: self.format.clone(),
has_header: self.has_header,
delimiter: ',',
quote: '"',
escape: None,
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ impl TableProviderFactory for ListingTableFactory {
CsvFormat::default()
.with_has_header(cmd.has_header)
.with_delimiter(cmd.delimiter as u8)
.with_quote(cmd.quote as u8)
.with_escape(cmd.escape.map(|c| c as u8))
.with_file_compression_type(file_compression_type),
),
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -262,6 +264,8 @@ mod tests {
file_type: "csv".to_string(),
has_header: true,
delimiter: ',',
quote: '"',
escape: None,
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub struct CreateExternalTable {
pub has_header: bool,
/// Delimiter for CSV
pub delimiter: char,
/// Quote character for CSV
pub quote: char,
/// Escape character for CSV
pub escape: Option<char>,
/// Partition Columns
pub table_partition_cols: Vec<String>,
/// Option to not error if table already exists
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ message CreateExternalTableNode {
bool unbounded = 14;
map<string, string> options = 11;
Constraints constraints = 15;
string quote = 16;
string escape = 17;
}

message PrepareNode {
Expand Down
34 changes: 34 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,13 @@ impl AsLogicalPlan for LogicalPlanNode {
} else {
None
};
let escape = if !create_extern_table.escape.is_empty() {
Some(create_extern_table.delimiter.chars().next().ok_or_else(|| {
DataFusionError::Internal(String::from("Protobuf deserialization error, unable to parse CSV escape"))
})?)
} else {
None
};

let file_type = create_extern_table.file_type.as_str();
if ctx.table_factory(file_type).is_none() {
Expand All @@ -530,6 +537,10 @@ impl AsLogicalPlan for LogicalPlanNode {
delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
DataFusionError::Internal(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
})?,
quote: create_extern_table.quote.chars().next().ok_or_else(|| {
DataFusionError::Internal(String::from("Protobuf deserialization error, unable to parse CSV quote"))
})?,
escape,
table_partition_cols: create_extern_table
.table_partition_cols
.clone(),
Expand Down Expand Up @@ -1289,6 +1300,8 @@ impl AsLogicalPlan for LogicalPlanNode {
file_type,
has_header,
delimiter,
quote,
escape,
schema: df_schema,
table_partition_cols,
if_not_exists,
Expand Down Expand Up @@ -1323,6 +1336,8 @@ impl AsLogicalPlan for LogicalPlanNode {
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
quote: String::from(*quote),
escape: escape.clone().unwrap_or_default().to_string(),
order_exprs: converted_order_exprs,
definition: definition.clone().unwrap_or_default(),
file_compression_type: file_compression_type.to_string(),
Expand Down
Loading

0 comments on commit 2422275

Please sign in to comment.