From 18901a3ccab93b1e998b0edb47e7736240a060c3 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 13 Mar 2023 17:13:15 +0100 Subject: [PATCH] Unique delta object store url (#1212) # Description Make the object store url be unique for stores created via `DeltaObjectStore::new`, by generating it from the location instead of the prefix (which was previously hard-coded to `/`), in the same manner as for `try_new`. Also, in the (unlikely) case that I'm not mistaken about `DeltaScan::execute` logic being redundant (see #1188 for more details), I've removed it and added a couple of tests. # Related Issue(s) Closes #1188 # Documentation --- rust/src/delta_datafusion.rs | 59 +++++++++++++++++++-------------- rust/src/storage/mod.rs | 62 +++++++++++++++++++++++++++-------- rust/tests/datafusion_test.rs | 60 +++++++++++++++++++++++++++------ 3 files changed, 133 insertions(+), 48 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 465347d2a6..921f171d63 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -35,7 +35,6 @@ use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; -use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -57,8 +56,8 @@ use object_store::{path::Path, ObjectMeta}; use url::Url; use crate::builder::ensure_table_uri; +use crate::schema; use crate::{action, open_table, open_table_with_storage_options, SchemaDataType}; -use crate::{schema, DeltaTableBuilder}; use crate::{DeltaResult, Invariant}; use crate::{DeltaTable, DeltaTableError}; @@ -452,7 +451,7 @@ impl TableProvider for DeltaTable { .await?; Ok(Arc::new(DeltaScan { - url: ensure_table_uri(self.table_uri())?.as_str().into(), + table_uri: ensure_table_uri(self.table_uri())?.as_str().into(), parquet_scan, })) } @@ -470,11 +469,11 @@ impl TableProvider for DeltaTable { } // TODO: this will likely also need to perform column mapping later when we support reader protocol v2 -/// A wrapper for parquet scans that installs the required ObjectStore +/// A wrapper for parquet scans #[derive(Debug)] pub struct DeltaScan { /// The URL of the ObjectStore root - pub url: String, + pub table_uri: String, /// The parquet scan to wrap pub parquet_scan: Arc, } @@ -512,22 +511,6 @@ impl ExecutionPlan for DeltaScan { partition: usize, context: Arc, ) -> DataFusionResult { - let df_url = ListingTableUrl::parse(self.url.as_str())?; - let storage = context - .runtime_env() - .object_store_registry - .get_by_url(df_url); - let mut table = DeltaTableBuilder::from_uri(&self.url); - if let Ok(storage) = storage { - // When running in ballista, the store will be deserialized and re-created - // When testing with a MemoryStore, it will already be present and we should re-use it - table = table.with_storage_backend( - storage, - Url::parse(&self.url).map_err(|err| DataFusionError::Internal(err.to_string()))?, - ); - } - let table = table.build()?; - register_store(&table, context.runtime_env()); self.parquet_scan.execute(partition, context) } @@ -869,10 +852,10 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { inputs: &[Arc], _registry: &dyn FunctionRegistry, ) -> Result, DataFusionError> { - let url: String = serde_json::from_reader(buf) + let table_uri: String = serde_json::from_reader(buf) .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?; let delta_scan = DeltaScan { - url, + table_uri, parquet_scan: (*inputs)[0].clone(), }; Ok(Arc::new(delta_scan)) @@ -887,7 +870,7 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { .as_any() .downcast_ref::() .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?; - serde_json::to_writer(buf, delta_scan.url.as_str()) + serde_json::to_writer(buf, delta_scan.table_uri.as_str()) .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?; Ok(()) } @@ -964,7 +947,11 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use chrono::{TimeZone, Utc}; use datafusion::from_slice::FromSlice; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion_proto::physical_plan::AsExecutionPlan; + use datafusion_proto::protobuf; use serde_json::json; + use std::ops::Deref; use super::*; @@ -1210,4 +1197,28 @@ mod tests { assert!(result.is_err()); assert!(matches!(result, Err(DeltaTableError::Generic { .. }))); } + + #[test] + fn roundtrip_test_delta_exec_plan() { + let ctx = SessionContext::new(); + let codec = DeltaPhysicalCodec {}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + let exec_plan = Arc::from(DeltaScan { + table_uri: "s3://my_bucket/this/is/some/path".to_string(), + parquet_scan: Arc::from(EmptyExec::new(false, schema)), + }); + let proto: protobuf::PhysicalPlanNode = + protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec) + .expect("to proto"); + + let runtime = ctx.runtime_env(); + let result_exec_plan: Arc = proto + .try_into_physical_plan(&ctx, runtime.deref(), &codec) + .expect("from proto"); + assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); + } } diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 99b1ac98a1..19e955a958 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -59,8 +59,6 @@ pub struct DeltaObjectStore { storage: Arc, location: Url, options: StorageOptions, - #[allow(unused)] - prefix: Path, } impl std::fmt::Display for DeltaObjectStore { @@ -72,15 +70,14 @@ impl std::fmt::Display for DeltaObjectStore { impl DeltaObjectStore { /// Create a new instance of [`DeltaObjectStore`] /// - /// # Arguemnts + /// # Arguments /// /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). - /// * `location` - A url corresponding to the storagle location of `storage`. + /// * `location` - A url corresponding to the storage location of `storage`. pub fn new(storage: Arc, location: Url) -> Self { Self { storage, location, - prefix: Path::from("/"), options: HashMap::new().into(), } } @@ -96,14 +93,13 @@ impl DeltaObjectStore { let root_store = ObjectStoreKind::parse_url(&location)?.into_impl(location.as_ref(), options.clone())?; let storage = if prefix != Path::from("/") { - root_store.into_prefix(prefix.clone()) + root_store.into_prefix(prefix) } else { root_store.into_store() }; Ok(Self { storage, location, - prefix, options: options.into(), }) } @@ -124,16 +120,20 @@ impl DeltaObjectStore { } #[cfg(feature = "datafusion")] - /// generate a unique enough url to identify the store in datafusion. - pub(crate) fn object_store_url(&self) -> ObjectStoreUrl { + /// Generate a unique enough url to identify the store in datafusion. + /// The DF object store registry only cares about the scheme and the host of the url for + /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique + /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the + /// original scheme, host and path with invalid characters replaced. + pub fn object_store_url(&self) -> ObjectStoreUrl { // we are certain, that the URL can be parsed, since // we make sure when we are parsing the table uri ObjectStoreUrl::parse(format!( - "delta-rs://{}", - // NOTE We need to also replace colons, but its fine, since it just needs - // to be a unique-ish identifier for the object store in datafusion - self.prefix - .as_ref() + "delta-rs://{}-{}{}", + self.location.scheme(), + self.location.host_str().unwrap_or("-"), + self.location + .path() .replace(DELIMITER, "-") .replace(':', "-") )) @@ -335,3 +335,37 @@ impl<'de> Deserialize<'de> for DeltaObjectStore { deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) } } + +#[cfg(feature = "datafusion")] +#[cfg(test)] +mod tests { + use crate::storage::DeltaObjectStore; + use object_store::memory::InMemory; + use std::sync::Arc; + use url::Url; + + #[tokio::test] + async fn test_unique_object_store_url() { + // Just a dummy store to be passed for initialization + let inner_store = Arc::from(InMemory::new()); + + for (location_1, location_2) in [ + // Same scheme, no host, different path + ("file:///path/to/table_1", "file:///path/to/table_2"), + // Different scheme/host, same path + ("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"), + // Same scheme, different host, same path + ("s3://bucket_1/table_1", "s3://bucket_2/table_1"), + ] { + let url_1 = Url::parse(location_1).unwrap(); + let url_2 = Url::parse(location_2).unwrap(); + let store_1 = DeltaObjectStore::new(inner_store.clone(), url_1); + let store_2 = DeltaObjectStore::new(inner_store.clone(), url_2); + + assert_ne!( + store_1.object_store_url().as_str(), + store_2.object_store_url().as_str(), + ); + } + } +} diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 274c36031e..0919c304d0 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -1,6 +1,6 @@ #![cfg(feature = "datafusion")] -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; @@ -20,9 +20,15 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::ScalarValue::*; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Expr; +use datafusion_proto::bytes::{ + physical_plan_from_bytes_with_extension_codec, physical_plan_to_bytes_with_extension_codec, +}; +use url::Url; use deltalake::action::SaveMode; +use deltalake::delta_datafusion::{DeltaPhysicalCodec, DeltaScan}; use deltalake::operations::create::CreateBuilder; +use deltalake::storage::DeltaObjectStore; use deltalake::{ operations::{write::WriteBuilder, DeltaOps}, DeltaTable, Schema, @@ -148,23 +154,40 @@ async fn test_datafusion_simple_query_partitioned() -> Result<()> { } #[tokio::test] -async fn test_datafusion_write_from_delta_scan() -> Result<()> { +async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> { + // Build an execution plan for scanning a DeltaTable and serialize it to bytes. + // We want to emulate that this occurs on another node, so that all we have access to is the + // plan byte serialization. + let source_scan_bytes = { + let ctx = SessionContext::new(); + let state = ctx.state(); + let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?; + let source_scan = source_table.scan(&state, None, &[], None).await?; + physical_plan_to_bytes_with_extension_codec(source_scan, &DeltaPhysicalCodec {})? + }; + + // Build a new context from scratch and deserialize the plan let ctx = SessionContext::new(); let state = ctx.state(); - - // Build an execution plan for scanning a DeltaTable - let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?; - let source_scan = source_table.scan(&state, None, &[], None).await?; + let source_scan = physical_plan_from_bytes_with_extension_codec( + &source_scan_bytes, + &ctx, + &DeltaPhysicalCodec {}, + )?; + let fields = Schema::try_from(source_scan.schema()) + .unwrap() + .get_fields() + .clone(); // Create target Delta Table let target_table = CreateBuilder::new() .with_location("memory://target") - .with_columns(source_table.schema().unwrap().get_fields().clone()) + .with_columns(fields) .with_table_name("target") .await?; - // Trying to execute the write by providing only the Datafusion plan and not the session state - // results in an error due to missing object store in the runtime registry. + // Trying to execute the write from the input plan without providing Datafusion with a session + // state containing the referenced object store in the registry results in an error. assert!(WriteBuilder::new() .with_input_execution_plan(source_scan.clone()) .with_object_store(target_table.object_store()) @@ -173,6 +196,23 @@ async fn test_datafusion_write_from_delta_scan() -> Result<()> { .to_string() .contains("No suitable object store found for delta-rs://")); + // Register the missing source table object store + let source_uri = source_scan + .as_any() + .downcast_ref::() + .unwrap() + .table_uri + .clone(); + let source_location = Url::parse(&source_uri).unwrap(); + let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap(); + let object_store_url = source_store.object_store_url(); + let source_store_url: &Url = object_store_url.as_ref(); + state.runtime_env().register_object_store( + source_store_url.scheme(), + source_store_url.host_str().unwrap_or_default(), + Arc::from(source_store), + ); + // Execute write to the target table with the proper state let target_table = WriteBuilder::new() .with_input_execution_plan(source_scan) @@ -181,8 +221,8 @@ async fn test_datafusion_write_from_delta_scan() -> Result<()> { .await?; ctx.register_table("target", Arc::new(target_table))?; + // Check results let batches = ctx.sql("SELECT * FROM target").await?.collect().await?; - let expected = vec![ "+------------+-----------+", "| date | dayOfYear |",