Skip to content

Commit

Permalink
Unique delta object store url (delta-io#1212)
Browse files Browse the repository at this point in the history
# Description
Make the object store url be unique for stores created via
`DeltaObjectStore::new`, by generating it from the location instead of
the prefix (which was previously hard-coded to `/`), in the same manner
as for `try_new`.

Also, in the (unlikely) case that I'm not mistaken about
`DeltaScan::execute` logic being redundant (see delta-io#1188 for more details),
I've removed it and added a couple of tests.

# Related Issue(s)
Closes delta-io#1188 

# Documentation
  • Loading branch information
gruuya authored and chitralverma committed Mar 17, 2023
1 parent 86b0b9f commit 18901a3
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 48 deletions.
59 changes: 35 additions & 24 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
Expand All @@ -57,8 +56,8 @@ use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::builder::ensure_table_uri;
use crate::schema;
use crate::{action, open_table, open_table_with_storage_options, SchemaDataType};
use crate::{schema, DeltaTableBuilder};
use crate::{DeltaResult, Invariant};
use crate::{DeltaTable, DeltaTableError};

Expand Down Expand Up @@ -452,7 +451,7 @@ impl TableProvider for DeltaTable {
.await?;

Ok(Arc::new(DeltaScan {
url: ensure_table_uri(self.table_uri())?.as_str().into(),
table_uri: ensure_table_uri(self.table_uri())?.as_str().into(),
parquet_scan,
}))
}
Expand All @@ -470,11 +469,11 @@ impl TableProvider for DeltaTable {
}

// TODO: this will likely also need to perform column mapping later when we support reader protocol v2
/// A wrapper for parquet scans that installs the required ObjectStore
/// A wrapper for parquet scans
#[derive(Debug)]
pub struct DeltaScan {
/// The URL of the ObjectStore root
pub url: String,
pub table_uri: String,
/// The parquet scan to wrap
pub parquet_scan: Arc<dyn ExecutionPlan>,
}
Expand Down Expand Up @@ -512,22 +511,6 @@ impl ExecutionPlan for DeltaScan {
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let df_url = ListingTableUrl::parse(self.url.as_str())?;
let storage = context
.runtime_env()
.object_store_registry
.get_by_url(df_url);
let mut table = DeltaTableBuilder::from_uri(&self.url);
if let Ok(storage) = storage {
// When running in ballista, the store will be deserialized and re-created
// When testing with a MemoryStore, it will already be present and we should re-use it
table = table.with_storage_backend(
storage,
Url::parse(&self.url).map_err(|err| DataFusionError::Internal(err.to_string()))?,
);
}
let table = table.build()?;
register_store(&table, context.runtime_env());
self.parquet_scan.execute(partition, context)
}

Expand Down Expand Up @@ -869,10 +852,10 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let url: String = serde_json::from_reader(buf)
let table_uri: String = serde_json::from_reader(buf)
.map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
let delta_scan = DeltaScan {
url,
table_uri,
parquet_scan: (*inputs)[0].clone(),
};
Ok(Arc::new(delta_scan))
Expand All @@ -887,7 +870,7 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
.as_any()
.downcast_ref::<DeltaScan>()
.ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
serde_json::to_writer(buf, delta_scan.url.as_str())
serde_json::to_writer(buf, delta_scan.table_uri.as_str())
.map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
Ok(())
}
Expand Down Expand Up @@ -964,7 +947,11 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use serde_json::json;
use std::ops::Deref;

use super::*;

Expand Down Expand Up @@ -1210,4 +1197,28 @@ mod tests {
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}

#[test]
fn roundtrip_test_delta_exec_plan() {
let ctx = SessionContext::new();
let codec = DeltaPhysicalCodec {};

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
]));
let exec_plan = Arc::from(DeltaScan {
table_uri: "s3://my_bucket/this/is/some/path".to_string(),
parquet_scan: Arc::from(EmptyExec::new(false, schema)),
});
let proto: protobuf::PhysicalPlanNode =
protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
.expect("to proto");

let runtime = ctx.runtime_env();
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx, runtime.deref(), &codec)
.expect("from proto");
assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
}
}
62 changes: 48 additions & 14 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ pub struct DeltaObjectStore {
storage: Arc<dyn ObjectStore>,
location: Url,
options: StorageOptions,
#[allow(unused)]
prefix: Path,
}

impl std::fmt::Display for DeltaObjectStore {
Expand All @@ -72,15 +70,14 @@ impl std::fmt::Display for DeltaObjectStore {
impl DeltaObjectStore {
/// Create a new instance of [`DeltaObjectStore`]
///
/// # Arguemnts
/// # Arguments
///
/// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storagle location of `storage`.
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: Arc<DynObjectStore>, location: Url) -> Self {
Self {
storage,
location,
prefix: Path::from("/"),
options: HashMap::new().into(),
}
}
Expand All @@ -96,14 +93,13 @@ impl DeltaObjectStore {
let root_store =
ObjectStoreKind::parse_url(&location)?.into_impl(location.as_ref(), options.clone())?;
let storage = if prefix != Path::from("/") {
root_store.into_prefix(prefix.clone())
root_store.into_prefix(prefix)
} else {
root_store.into_store()
};
Ok(Self {
storage,
location,
prefix,
options: options.into(),
})
}
Expand All @@ -124,16 +120,20 @@ impl DeltaObjectStore {
}

#[cfg(feature = "datafusion")]
/// generate a unique enough url to identify the store in datafusion.
pub(crate) fn object_store_url(&self) -> ObjectStoreUrl {
/// Generate a unique enough url to identify the store in datafusion.
/// The DF object store registry only cares about the scheme and the host of the url for
/// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique
/// host we convert the location from this `DeltaObjectStore` to a valid name, combining the
/// original scheme, host and path with invalid characters replaced.
pub fn object_store_url(&self) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
// we make sure when we are parsing the table uri
ObjectStoreUrl::parse(format!(
"delta-rs://{}",
// NOTE We need to also replace colons, but its fine, since it just needs
// to be a unique-ish identifier for the object store in datafusion
self.prefix
.as_ref()
"delta-rs://{}-{}{}",
self.location.scheme(),
self.location.host_str().unwrap_or("-"),
self.location
.path()
.replace(DELIMITER, "-")
.replace(':', "-")
))
Expand Down Expand Up @@ -335,3 +335,37 @@ impl<'de> Deserialize<'de> for DeltaObjectStore {
deserializer.deserialize_seq(DeltaObjectStoreVisitor {})
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::storage::DeltaObjectStore;
use object_store::memory::InMemory;
use std::sync::Arc;
use url::Url;

#[tokio::test]
async fn test_unique_object_store_url() {
// Just a dummy store to be passed for initialization
let inner_store = Arc::from(InMemory::new());

for (location_1, location_2) in [
// Same scheme, no host, different path
("file:///path/to/table_1", "file:///path/to/table_2"),
// Different scheme/host, same path
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
// Same scheme, different host, same path
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();
let store_1 = DeltaObjectStore::new(inner_store.clone(), url_1);
let store_2 = DeltaObjectStore::new(inner_store.clone(), url_2);

assert_ne!(
store_1.object_store_url().as_str(),
store_2.object_store_url().as_str(),
);
}
}
}
60 changes: 50 additions & 10 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg(feature = "datafusion")]

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -20,9 +20,15 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::ScalarValue::*;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Expr;
use datafusion_proto::bytes::{
physical_plan_from_bytes_with_extension_codec, physical_plan_to_bytes_with_extension_codec,
};
use url::Url;

use deltalake::action::SaveMode;
use deltalake::delta_datafusion::{DeltaPhysicalCodec, DeltaScan};
use deltalake::operations::create::CreateBuilder;
use deltalake::storage::DeltaObjectStore;
use deltalake::{
operations::{write::WriteBuilder, DeltaOps},
DeltaTable, Schema,
Expand Down Expand Up @@ -148,23 +154,40 @@ async fn test_datafusion_simple_query_partitioned() -> Result<()> {
}

#[tokio::test]
async fn test_datafusion_write_from_delta_scan() -> Result<()> {
async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
// Build an execution plan for scanning a DeltaTable and serialize it to bytes.
// We want to emulate that this occurs on another node, so that all we have access to is the
// plan byte serialization.
let source_scan_bytes = {
let ctx = SessionContext::new();
let state = ctx.state();
let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?;
let source_scan = source_table.scan(&state, None, &[], None).await?;
physical_plan_to_bytes_with_extension_codec(source_scan, &DeltaPhysicalCodec {})?
};

// Build a new context from scratch and deserialize the plan
let ctx = SessionContext::new();
let state = ctx.state();

// Build an execution plan for scanning a DeltaTable
let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?;
let source_scan = source_table.scan(&state, None, &[], None).await?;
let source_scan = physical_plan_from_bytes_with_extension_codec(
&source_scan_bytes,
&ctx,
&DeltaPhysicalCodec {},
)?;
let fields = Schema::try_from(source_scan.schema())
.unwrap()
.get_fields()
.clone();

// Create target Delta Table
let target_table = CreateBuilder::new()
.with_location("memory://target")
.with_columns(source_table.schema().unwrap().get_fields().clone())
.with_columns(fields)
.with_table_name("target")
.await?;

// Trying to execute the write by providing only the Datafusion plan and not the session state
// results in an error due to missing object store in the runtime registry.
// Trying to execute the write from the input plan without providing Datafusion with a session
// state containing the referenced object store in the registry results in an error.
assert!(WriteBuilder::new()
.with_input_execution_plan(source_scan.clone())
.with_object_store(target_table.object_store())
Expand All @@ -173,6 +196,23 @@ async fn test_datafusion_write_from_delta_scan() -> Result<()> {
.to_string()
.contains("No suitable object store found for delta-rs://"));

// Register the missing source table object store
let source_uri = source_scan
.as_any()
.downcast_ref::<DeltaScan>()
.unwrap()
.table_uri
.clone();
let source_location = Url::parse(&source_uri).unwrap();
let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap();
let object_store_url = source_store.object_store_url();
let source_store_url: &Url = object_store_url.as_ref();
state.runtime_env().register_object_store(
source_store_url.scheme(),
source_store_url.host_str().unwrap_or_default(),
Arc::from(source_store),
);

// Execute write to the target table with the proper state
let target_table = WriteBuilder::new()
.with_input_execution_plan(source_scan)
Expand All @@ -181,8 +221,8 @@ async fn test_datafusion_write_from_delta_scan() -> Result<()> {
.await?;
ctx.register_table("target", Arc::new(target_table))?;

// Check results
let batches = ctx.sql("SELECT * FROM target").await?.collect().await?;

let expected = vec![
"+------------+-----------+",
"| date | dayOfYear |",
Expand Down

0 comments on commit 18901a3

Please sign in to comment.