Skip to content

Commit

Permalink
feat: change default 'json' extension handler (#2797)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Mar 19, 2024
1 parent 90f0be5 commit 29fcd78
Show file tree
Hide file tree
Showing 10 changed files with 1,046 additions and 26 deletions.
11 changes: 8 additions & 3 deletions crates/datafusion_ext/src/planner/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,6 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
.ok_or_else(|| DataFusionError::Plan(format!("strange file extension: {path}")))?
.to_lowercase();

// TODO: We can be a bit more sophisticated here and handle compression
// schemes as well.
Ok(match ext.as_str() {
"parquet" => OwnedTableReference::Partial {
schema: "public".into(),
Expand All @@ -254,7 +252,11 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
schema: "public".into(),
table: "read_csv".into(),
},
"json" | "jsonl" | "ndjson" => OwnedTableReference::Partial {
"json" => OwnedTableReference::Partial {
schema: "public".into(),
table: "read_json".into(),
},
"ndjson" | "jsonl" => OwnedTableReference::Partial {
schema: "public".into(),
table: "read_ndjson".into(),
},
Expand All @@ -270,6 +272,9 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
if let Ok(compression_type) = ext.parse::<FileCompressionType>() {
let ext = compression_type.get_ext();
let path = path.trim_end_matches(ext.as_str());
// TODO: only parquet/ndjson/csv actually support
// compression, so we'll end up attempting to handle
// compression for some types and not others.
infer_func_for_file(path)?
} else {
return Err(DataFusionError::Plan(format!(
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod errors;
pub mod errors;
mod stream;
pub mod table;
23 changes: 15 additions & 8 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use datasources::common::url::DatasourceUrl;
use datasources::debug::DebugTableType;
use datasources::excel::table::ExcelTableProvider;
use datasources::excel::ExcelTable;
use datasources::json::table::json_streaming_table;
use datasources::lake::delta::access::{load_table_direct, DeltaLakeAccessor};
use datasources::lake::iceberg::table::IcebergTable;
use datasources::lake::{storage_options_into_object_store, storage_options_into_store_access};
Expand Down Expand Up @@ -641,6 +642,9 @@ impl<'a> ExternalDispatcher<'a> {
compression: Option<&String>,
) -> Result<Arc<dyn TableProvider>> {
let path = path.as_ref();
// TODO: only parquet/ndjson/csv actually support compression,
// so we'll end up attempting to handle compression for some
// types and not others.
let compression = compression
.map(|c| c.parse::<FileCompressionType>())
.transpose()?
Expand Down Expand Up @@ -669,21 +673,24 @@ impl<'a> ExternalDispatcher<'a> {
accessor.clone().list_globbed(path).await?,
)
.await?),
"ndjson" | "json" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
"bson" => Ok(bson_streaming_table(
access.clone(),
DatasourceUrl::try_new(path)?,
None,
Some(128),
)
.await?),
"json" => Ok(
json_streaming_table(access.clone(), DatasourceUrl::try_new(path)?, None).await?,
),
"ndjson" | "jsonl" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
_ => Err(DispatchError::String(
format!("Unsupported file type: '{}', for '{}'", file_type, path,).to_string(),
)),
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub enum DispatchError {
#[error(transparent)]
BsonDatasource(#[from] datasources::bson::errors::BsonError),
#[error(transparent)]
JsonDatasource(#[from] datasources::json::errors::JsonError),
#[error(transparent)]
ClickhouseDatasource(#[from] datasources::clickhouse::errors::ClickhouseError),
#[error(transparent)]
NativeDatasource(#[from] datasources::native::errors::NativeError),
Expand Down
1,000 changes: 1,000 additions & 0 deletions testdata/json/userdata1.ndjson

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
34 changes: 20 additions & 14 deletions testdata/sqllogictests/infer.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ select id, "./testdata/parquet/userdata1.parquet".first_name
1 Amanda


query I
select count(*) from './testdata/json/userdata1.ndjson'
----
1000

query I
select count(*) from './testdata/json/userdata1.json'
----
1000


query IT
select id, "./testdata/json/userdata1.json".first_name
from './testdata/json/userdata1.json'
Expand Down Expand Up @@ -56,7 +62,7 @@ select count(*) from './testdata/parquet/*'

#Tests for inferring table functions from compressed file formats

#Tests for CSV with .gz, .bz2, .xz, .zst
#Tests for CSV with .gz, .bz2, .xz, .zst
#csv.gz
query
select count(*) from './testdata/csv/userdata1.csv.gz'
Expand Down Expand Up @@ -122,67 +128,67 @@ select id, "./testdata/csv/userdata1.csv.zst".first_name
#Tests for json with .gz, .bz2, .xz, .zst
#json.gz
query
select count(*) from './testdata/json/userdata1.json.gz'
select count(*) from './testdata/json/userdata1.ndjson.gz'
----
1000

#json.gz
query IT
select id, "./testdata/json/userdata1.json.gz".first_name
from './testdata/json/userdata1.json.gz'
select id, "./testdata/json/userdata1.ndjson.gz".first_name
from './testdata/json/userdata1.ndjson.gz'
order by id
limit 1
----
1 Amanda

#json.bz2
query
select count(*) from './testdata/json/userdata1.json.bz2'
select count(*) from './testdata/json/userdata1.ndjson.bz2'
----
1000

#json.bz2
query IT
select id, "./testdata/json/userdata1.json.bz2".first_name
from './testdata/json/userdata1.json.bz2'
select id, "./testdata/json/userdata1.ndjson.bz2".first_name
from './testdata/json/userdata1.ndjson.bz2'
order by id
limit 1
----
1 Amanda

#json.xz
query
select count(*) from './testdata/json/userdata1.json.xz'
select count(*) from './testdata/json/userdata1.ndjson.xz'
----
1000

#json.xz
query IT
select id, "./testdata/json/userdata1.json.xz".first_name
from './testdata/json/userdata1.json.xz'
select id, "./testdata/json/userdata1.ndjson.xz".first_name
from './testdata/json/userdata1.ndjson.xz'
order by id
limit 1
----
1 Amanda

#json.zst
query
select count(*) from './testdata/json/userdata1.json.zst'
select count(*) from './testdata/json/userdata1.ndjson.zst'
----
1000

#json.zst
query IT
select id, "./testdata/json/userdata1.json.zst".first_name
from './testdata/json/userdata1.json.zst'
select id, "./testdata/json/userdata1.ndjson.zst".first_name
from './testdata/json/userdata1.ndjson.zst'
order by id
limit 1
----
1 Amanda



#For infering function from parquet compressed formats .bz2, .xz, .zst, .gz
#For infering function from parquet compressed formats .bz2, .xz, .zst, .gz
#parquet.bz2
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.bz2'
Expand Down

0 comments on commit 29fcd78

Please sign in to comment.