Skip to content

Commit

Permalink
Systematic Configuration in 'Create External Table' and 'Copy To' Opt…
Browse files Browse the repository at this point in the history
…ions (#9382)

* Initial but not completely work like proto

* Delete docs.yaml

* Before proto is handled

* Update listing_table_factory.rs

* Before proto 2

* Minor adjustments

* Update headers

* Update copy.slt

* Add new test,

* Passes SLT tests

* Update csv.rs

* Before trying char

* Fix u8 handling

* Update according to review

* Passing tests

* passing tests with proto

* Cargo fix

* Testing and clippy refactors

* After merge corrections

* Parquet feature fix

* On datafusion-cli register COPY statements

* Correcting a test

* Review

* Review visited

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
4 people authored Mar 12, 2024
1 parent 92d046d commit 27b8a49
Show file tree
Hide file tree
Showing 70 changed files with 5,091 additions and 3,162 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ jobs:
git add --all
git commit -m 'Publish built docs triggered by ${{ github.sha }}'
git push || git push --force
fi
fi
10 changes: 6 additions & 4 deletions benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::AccessLogOpt;
use crate::{BenchmarkRun, CommonOpt};
use std::path::PathBuf;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::logical_expr::utils::disjunction;
Expand All @@ -25,7 +27,7 @@ use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_common::instant::Instant;
use std::path::PathBuf;

use structopt::StructOpt;

/// Test performance of parquet filter pushdown
Expand Down Expand Up @@ -179,7 +181,7 @@ async fn exec_scan(
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let exec = test_file.create_scan(Some(filter)).await?;
let exec = test_file.create_scan(ctx, Some(filter)).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
13 changes: 7 additions & 6 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::AccessLogOpt;
use crate::BenchmarkRun;
use crate::CommonOpt;
use std::path::PathBuf;
use std::sync::Arc;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
Expand All @@ -26,8 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;
use std::path::PathBuf;
use std::sync::Arc;

use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -174,7 +175,7 @@ async fn exec_sort(
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(None).await?;
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
28 changes: 15 additions & 13 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use super::get_query_sql;
use std::path::PathBuf;
use std::sync::Arc;

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
};
use crate::{BenchmarkRun, CommonOpt};

use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
use datafusion::datasource::file_format::csv::CsvFormat;
Expand All @@ -26,21 +32,16 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use log::info;

use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use std::path::PathBuf;
use std::sync::Arc;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use datafusion::error::Result;
use datafusion::prelude::*;
use log::info;
use structopt::StructOpt;

use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};

/// Run the tpch benchmark.
///
/// This benchmarks is derived from the [TPC-H][1] version
Expand Down Expand Up @@ -253,7 +254,7 @@ impl RunOpt {
}
"parquet" => {
let path = format!("{path}/{table}");
let format = ParquetFormat::default().with_enable_pruning(Some(true));
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down Expand Up @@ -298,11 +299,12 @@ struct QueryResult {
// Only run with "ci" mode when we have the data
#[cfg(feature = "ci")]
mod tests {
use std::path::Path;

use super::*;

use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use std::path::Path;

use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
Expand Down
39 changes: 27 additions & 12 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::object_storage::get_object_store;
use async_trait::async_trait;
use std::any::Any;
use std::sync::{Arc, Weak};

use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::plan_datafusion_err;
Expand All @@ -26,12 +29,10 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;

use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use url::Url;

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
Expand Down Expand Up @@ -155,15 +156,16 @@ impl SchemaProvider for DynamicFileSchemaProvider {

// if the inner schema provider didn't have a table by
// that name, try to treat it as a listing table
let state = self
let mut state = self
.state
.upgrade()
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let url: &Url = table_url.as_ref();
let scheme = table_url.scheme();
let url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
// will return `Ok` which means we don't need to register it again. However,
Expand All @@ -174,10 +176,22 @@ impl SchemaProvider for DynamicFileSchemaProvider {
Err(_) => {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await?;
match scheme {
"s3" | "oss" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
}
_ => {}
};
let store = get_object_store(
&state,
table_url.scheme(),
url,
state.default_table_options(),
)
.await?;
state.runtime_env().register_object_store(url, store);
}
}
Expand Down Expand Up @@ -215,6 +229,7 @@ fn substitute_tilde(cur: String) -> String {
#[cfg(test)]
mod tests {
use super::*;

use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;

Expand Down
Loading

0 comments on commit 27b8a49

Please sign in to comment.