From 93262c262a3988ddc2014503050e47cafbe51468 Mon Sep 17 00:00:00 2001 From: Nikolay Ulmasov Date: Wed, 7 Feb 2024 17:55:29 +0000 Subject: [PATCH 1/2] implement querying the remote http location directly Signed-off-by: Nikolay Ulmasov --- datafusion-cli/src/catalog.rs | 78 ++++++++++++++++++++++++++++++++++- docs/source/user-guide/cli.md | 19 +++++++-- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index cca2b44ad983..a82064fe8b38 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -24,9 +24,12 @@ use datafusion::datasource::listing::{ use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::context::SessionState; +use object_store::http::HttpBuilder; +use object_store::ObjectStore; use parking_lot::RwLock; use std::any::Any; use std::sync::{Arc, Weak}; +use url::Url; /// Wraps another catalog, automatically creating table providers /// for local files if needed @@ -151,10 +154,35 @@ impl SchemaProvider for DynamicFileSchemaProvider { // if the inner schema provider didn't have a table by // that name, try to treat it as a listing table let state = self.state.upgrade()?.read().clone(); - let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?) + let table_url = ListingTableUrl::parse(name).ok()?; + + // Assure the `http` store for this url is registered if this + // is an `http(s)` listing + // TODO: support for other types, e.g. `s3`, may need to be added + match table_url.scheme() { + "http" | "https" => { + let url: &Url = table_url.as_ref(); + match state.runtime_env().object_store_registry.get_store(url) { + Ok(_) => {} + Err(_) => { + let store = Arc::new( + HttpBuilder::new() + .with_url(url.origin().ascii_serialization()) + .build() + .ok()?, + ) as Arc; + state.runtime_env().register_object_store(url, store); + } + } + } + _ => {} + } + + let config = ListingTableConfig::new(table_url) .infer(&state) .await .ok()?; + Some(Arc::new(ListingTable::try_new(config).ok()?)) } @@ -166,3 +194,51 @@ impl SchemaProvider for DynamicFileSchemaProvider { self.inner.table_exist(name) } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::prelude::SessionContext; + + #[tokio::test] + async fn query_http_location_test() -> Result<()> { + + // Perhaps this could be changed to use an existing file but + // that will require a permanently availalble web resource + let domain = "example.com"; + let location = format!("http://{domain}/file.parquet"); + + let mut ctx = SessionContext::new(); + ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( + ctx.state().catalog_list(), + ctx.state_weak_ref(), + ))); + + let provider = + &DynamicFileCatalog::new(ctx.state().catalog_list(), ctx.state_weak_ref()) + as &dyn CatalogProviderList; + let catalog = provider + .catalog(provider.catalog_names().first().unwrap()) + .unwrap(); + let schema = catalog + .schema(catalog.schema_names().first().unwrap()) + .unwrap(); + let none = schema.table(&location).await; + + // That's a non-existing location so expecting None here. + assert!(none.is_none()); + + // It should still create an object store for the location + let store = ctx + .runtime_env() + .object_store(ListingTableUrl::parse(location)?)?; + + assert_eq!(format!("{store}"), "HttpStore"); + + // The store must be configured for this domain + let expected_domain = format!("Domain(\"{domain}\")"); + assert!(format!("{store:?}").contains(&expected_domain)); + + Ok(()) + } +} diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 4b909bbb1e90..af61978a20bf 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -148,10 +148,10 @@ OPTIONS: ## Querying data from the files directly -Files can be queried directly by enclosing the file or -directory name in single `'` quotes as shown in the example. +Files can be queried directly by enclosing the file, directory name +or a remote location in single `'` quotes as shown in the examples. -## Example +## Examples Create a CSV file to query. @@ -194,6 +194,19 @@ DataFusion CLI v16.0.0 2 rows in set. Query took 0.007 seconds. ``` +You can also query directly from the remote location via HTTP(S) without +registering the location as a table + +```sql +❯ select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet' ++----------+ +| COUNT(*) | ++----------+ +| 1000000 | ++----------+ +1 row in set. Query took 0.595 seconds. +``` + ## Creating External Tables It is also possible to create a table backed by files by explicitly From bbf4f255d0eb27983082d16f2b913ed7ec280f30 Mon Sep 17 00:00:00 2001 From: Nikolay Ulmasov Date: Wed, 7 Feb 2024 18:48:27 +0000 Subject: [PATCH 2/2] fix format Signed-off-by: Nikolay Ulmasov --- datafusion-cli/src/catalog.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index a82064fe8b38..65c1feb88561 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -202,7 +202,6 @@ mod tests { #[tokio::test] async fn query_http_location_test() -> Result<()> { - // Perhaps this could be changed to use an existing file but // that will require a permanently availalble web resource let domain = "example.com"; @@ -225,7 +224,7 @@ mod tests { .unwrap(); let none = schema.table(&location).await; - // That's a non-existing location so expecting None here. + // That's a non-existing location so expecting None here assert!(none.is_none()); // It should still create an object store for the location