From 8d6e4354d3626b5f43b1162666057310f3777e40 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 31 Aug 2022 00:25:17 +0200 Subject: [PATCH 1/3] chore: cleanup --- README.adoc | 2 +- TODO | 3 -- build/setup_localstack.sh | 46 ----------------- docker-compose.yml | 14 ------ docs/ADLSGen2-HOWTO.md | 47 ----------------- rust/src/delta_dataframe.rs | 70 -------------------------- rust/src/lib.rs | 2 - rust/tests/integration_object_store.rs | 32 ++++-------- 8 files changed, 11 insertions(+), 205 deletions(-) delete mode 100644 TODO delete mode 100755 build/setup_localstack.sh delete mode 100644 docs/ADLSGen2-HOWTO.md delete mode 100644 rust/src/delta_dataframe.rs diff --git a/README.adoc b/README.adoc index aa902a8303..0212984eb0 100644 --- a/README.adoc +++ b/README.adoc @@ -29,7 +29,7 @@ link:https://github.com/rajasekarv/vega[vega], etc. It also provides bindings to * Local file system * AWS S3 -* Azure Data Lake Storage Gen 2 (link:docs/ADLSGen2-HOWTO.md[HOW-TO]) +* Azure Blob Storage / Azure Datalake Storage Gen2 * Google Cloud Storage .Support features diff --git a/TODO b/TODO deleted file mode 100644 index 3f90ef4728..0000000000 --- a/TODO +++ /dev/null @@ -1,3 +0,0 @@ -* handle commitInfo action -* use list api to find transaction logs to apply -* prefetch log content in parallel diff --git a/build/setup_localstack.sh b/build/setup_localstack.sh deleted file mode 100755 index 6a305ff04c..0000000000 --- a/build/setup_localstack.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash - -export AWS_DEFAULT_REGION=us-east-1 -export AWS_ACCESS_KEY_ID=test -export AWS_SECRET_ACCESS_KEY=test -export ENDPOINT=http://localstack:4566 - - -function wait_for() { - retries=10 - until eval $2 > /dev/null 2>&1 - do - if [ "$retries" -lt "0" ]; then - echo "$1 is still offline after 10 retries"; - exit 1; - fi - echo "Waiting on $1 to start..." - sleep 5 - retries=$((retries - 1)) - done -} - -wait_for "S3" "aws s3api list-buckets --endpoint-url=$ENDPOINT" - -echo "Uploading S3 test delta tables..." -aws s3api create-bucket --bucket delta-checkpoint --endpoint-url=$ENDPOINT > /dev/null 2>&1 -aws s3api create-bucket --bucket deltars --endpoint-url=$ENDPOINT > /dev/null 2>&1 -aws s3 sync /data/golden s3://deltars/golden/ --delete --endpoint-url=$ENDPOINT > /dev/null -aws s3 sync /data/simple_table s3://deltars/simple/ --delete --endpoint-url=$ENDPOINT > /dev/null -aws s3 sync /data/simple_commit s3://deltars/simple_commit_rw1/ --delete --endpoint-url=$ENDPOINT > /dev/null -aws s3 sync /data/simple_commit s3://deltars/simple_commit_rw2/ --delete --endpoint-url=$ENDPOINT > /dev/null -aws s3 sync /data/concurrent_workers s3://deltars/concurrent_workers/ --delete --endpoint-url=$ENDPOINT > /dev/null - -wait_for "DynamoDB" "aws dynamodb list-tables --endpoint-url=$ENDPOINT" - -echo "Creating DynamoDB test lock table..." -aws dynamodb delete-table --table-name test_table --endpoint-url=$ENDPOINT > /dev/null 2>&1 -aws dynamodb create-table --table-name test_table --endpoint-url=$ENDPOINT \ - --attribute-definitions \ - AttributeName=key,AttributeType=S \ - --key-schema \ - AttributeName=key,KeyType=HASH \ - --provisioned-throughput \ - ReadCapacityUnits=10,WriteCapacityUnits=10 > /dev/null - -echo Localstack is configured! diff --git a/docker-compose.yml b/docker-compose.yml index b3ba58f818..62a7170010 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,17 +24,3 @@ services: image: mcr.microsoft.com/azure-storage/azurite ports: - 10000:10000 - - # setup-localstack: - # image: localstack/localstack:0.14.4 - # depends_on: - # - localstack - # entrypoint: "/bin/bash" - # command: - # - /setup_localstack.sh - # volumes: - # - "./build/setup_localstack.sh:/setup_localstack.sh" - # - "./rust/tests/data/golden:/data/golden" - # - "./rust/tests/data/simple_table:/data/simple_table" - # - "./rust/tests/data/simple_commit:/data/simple_commit" - # - "./rust/tests/data/concurrent_workers:/data/concurrent_workers" diff --git a/docs/ADLSGen2-HOWTO.md b/docs/ADLSGen2-HOWTO.md deleted file mode 100644 index 30fcbc228b..0000000000 --- a/docs/ADLSGen2-HOWTO.md +++ /dev/null @@ -1,47 +0,0 @@ -HOW-TO: Azure Data Lake Storage Gen2 (ADLS Gen2) -================================================ - -*"Azure Data Lake Storage Gen2 is a set of capabilities dedicated to big data analytics, built on Azure Blob Storage."* - -Source: https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction - -## Usage - -Using ADLS Gen2 requires creating and Azure Storage Account with specific settings. - -This can be done via an [ARM template](https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/overview) or via the [Azure Portal](https://docs.microsoft.com/en-us/azure/azure-portal/azure-portal-overview). - -### ARM Template - -Docs: [Microsoft.Storage/storageAccounts - Bicep & ARM template reference | Microsoft Docs](https://docs.microsoft.com/en-us/azure/templates/microsoft.storage/storageaccounts?tabs=json) - -The critical part for creating an ADLS Gen2 Storage Account via ARM are the following properties: - -* `kind = StorageV2` -* `isHnsEnabled = true` - -### Azure Portal - -Docs: [Create a storage account for Azure Data Lake Storage Gen2 | Microsoft Docs](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account) - -## Resources - -* [Multi-protocol access on Azure Data Lake Storage | Microsoft Docs](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-multi-protocol-access) -* [Blob service REST API - Azure Storage | Microsoft Docs](https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api) -* [Azure Data Lake Storage Gen2 REST API reference - Azure Storage | Microsoft Docs](https://docs.microsoft.com/en-us/rest/api/storageservices/data-lake-storage-gen2) - -## Example - -When you want to connect to a deltatable stored in ADLS Gen2, you need to set two environment variables. -* AZURE_STORAGE_ACCOUNT_NAME which holds the name of the storage account -* AZURE_STORAGE_ACCOUNT_KEY which contains the root key for the storage account. - -The url for the table should follow a specific format adls2://{accountname}/{filesystem}/{path to table}. - -example: -```python - from deltalake import DeltaTable - - delta = DeltaTable("adls2:////") - dataFrames = delta.to_pyarrow_table().to_pandas() -``` diff --git a/rust/src/delta_dataframe.rs b/rust/src/delta_dataframe.rs deleted file mode 100644 index 7f5928f58b..0000000000 --- a/rust/src/delta_dataframe.rs +++ /dev/null @@ -1,70 +0,0 @@ -extern crate rust_dataframe; - -use std::fs::File; -use std::path::Path; -use std::rc::Rc; -use std::sync::Arc; - -use arrow::record_batch::RecordBatchReader; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; -use parquet::file::reader::SerializedFileReader; -use rust_dataframe::dataframe::DataFrame; - -use crate as delta; -use crate::DeltaTableError; - -pub trait DeltaDataframe { - fn from_loaded_delta_table(table: delta::DeltaTable) -> Result; - fn from_delta_table(path: &str) -> Result; - fn from_delta_table_with_version( - path: &str, - version: delta::DeltaDataTypeVersion, - ) -> Result; -} - -impl DeltaDataframe for DataFrame { - fn from_loaded_delta_table( - delta_table: delta::DeltaTable, - ) -> Result { - let mut batches = vec![]; - let mut schema = None; - let table_path = Path::new(&delta_table.table_path); - - for fname in delta_table.get_files() { - let fpath = table_path.join(fname); - let file = File::open(&fpath).map_err(|e| DeltaTableError::MissingDataFile { - source: e, - path: String::from(fpath.to_str().unwrap()), - })?; - - let file_reader = SerializedFileReader::new(file)?; - let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader)); - - if schema.is_none() { - schema = Some(Arc::new(arrow_reader.get_schema()?)); - } - - let mut record_batch_reader = arrow_reader.get_record_reader(1024)?; - while let Ok(Some(batch)) = record_batch_reader.next_batch() { - batches.push(batch); - } - } - - Ok(Self::from_table( - rust_dataframe::table::Table::from_record_batches(schema.unwrap().clone(), batches), - )) - } - - fn from_delta_table(path: &str) -> Result { - let delta_table = delta::open_table(path)?; - return Self::from_loaded_delta_table(delta_table); - } - - fn from_delta_table_with_version( - path: &str, - version: delta::DeltaDataTypeVersion, - ) -> Result { - let delta_table = delta::open_table_with_version(path, version)?; - return Self::from_loaded_delta_table(delta_table); - } -} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 271bb33f90..b1cd58db16 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -97,8 +97,6 @@ pub mod vacuum; pub mod checkpoints; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod delta_arrow; -#[cfg(feature = "rust-dataframe-ext")] -mod delta_dataframe; #[cfg(feature = "datafusion-ext")] pub mod delta_datafusion; #[cfg(feature = "datafusion-ext")] diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index bc64dd3f21..7e0d8cf7dd 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -62,8 +62,6 @@ async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> } async fn put_get_delete_list(storage: &DynObjectStore) -> TestResult { - let store_str = storage.to_string(); - delete_fixtures(storage).await?; let content_list = flatten_list_stream(storage, None).await?; @@ -121,26 +119,16 @@ async fn put_get_delete_list(storage: &DynObjectStore) -> TestResult { let out_of_range = 200..300; let out_of_range_result = storage.get_range(&location, out_of_range).await; - if store_str.starts_with("MicrosoftAzureEmulator") { - // Azurite doesn't support x-ms-range-get-content-crc64 set by Azure SDK - // https://github.com/Azure/Azurite/issues/444 - let err = range_result.unwrap_err().to_string(); - assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err); - - let err = out_of_range_result.unwrap_err().to_string(); - assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err); - } else { - let bytes = range_result?; - assert_eq!(bytes, expected_data.slice(range)); - - // Should be a non-fatal error - out_of_range_result.unwrap_err(); - - let ranges = vec![0..1, 2..3, 0..5]; - let bytes = storage.get_ranges(&location, &ranges).await?; - for (range, bytes) in ranges.iter().zip(bytes) { - assert_eq!(bytes, expected_data.slice(range.clone())) - } + let bytes = range_result?; + assert_eq!(bytes, expected_data.slice(range)); + + // Should be a non-fatal error + out_of_range_result.unwrap_err(); + + let ranges = vec![0..1, 2..3, 0..5]; + let bytes = storage.get_ranges(&location, &ranges).await?; + for (range, bytes) in ranges.iter().zip(bytes) { + assert_eq!(bytes, expected_data.slice(range.clone())) } let head = storage.head(&location).await?; From 3b22ed3b78791d0b19dc456a3712d8ee78e0fcbe Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 31 Aug 2022 15:21:47 +0200 Subject: [PATCH 2/3] chore: address PR comments and add some docs --- rust/src/builder.rs | 94 +++++++++++++++++++++++++++++++++++----- rust/src/lib.rs | 4 +- rust/tests/common/mod.rs | 2 - 3 files changed, 84 insertions(+), 16 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 6ed0d334e6..8ccd1a8a7b 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -3,21 +3,22 @@ use crate::delta::{DeltaTable, DeltaTableError}; use crate::schema::DeltaDataTypeVersion; use crate::storage::file::FileStorageBackend; -#[cfg(any(feature = "s3", feature = "s3-rustls"))] -use crate::storage::s3::{S3StorageBackend, S3StorageOptions}; use crate::storage::DeltaObjectStore; use chrono::{DateTime, FixedOffset, Utc}; +use object_store::path::Path; +use object_store::{DynObjectStore, Error as ObjectStoreError, Result as ObjectStoreResult}; +use std::collections::HashMap; +use std::sync::Arc; +use url::Url; + +#[cfg(any(feature = "s3", feature = "s3-rustls"))] +use crate::storage::s3::{S3StorageBackend, S3StorageOptions}; #[cfg(any(feature = "s3", feature = "s3-rustls"))] use object_store::aws::AmazonS3Builder; #[cfg(feature = "azure")] use object_store::azure::MicrosoftAzureBuilder; #[cfg(feature = "gcs")] use object_store::gcp::GoogleCloudStorageBuilder; -use object_store::path::Path; -use object_store::{DynObjectStore, Error as ObjectStoreError, Result as ObjectStoreResult}; -use std::collections::HashMap; -use std::sync::Arc; -use url::Url; /// possible version specifications for loading a delta table #[derive(Debug, Clone, PartialEq, Eq)] @@ -280,9 +281,32 @@ impl StorageUrl { /// /// # Well-known formats /// + /// The lists below enumerates some well known uris, that are understood by the + /// parse function. We parse uris to refer to a specific storage location, which + /// is accessed using the internal storage backends. + /// /// ## Azure + /// + /// URIs according to : + /// /// * az:/// + /// * adl:/// /// * abfs(s):/// + /// + /// URIs according to : + /// + /// * abfs(s)://@.dfs.core.windows.net/ + /// + /// and a custom one + /// + /// * azure:/// + /// + /// ## S3 + /// * s3:/// + /// * s3a:/// + /// + /// ## GCS + /// * gs:/// pub fn parse(s: impl AsRef) -> ObjectStoreResult { let s = s.as_ref(); @@ -329,6 +353,11 @@ impl StorageUrl { self.url.scheme() } + /// Returns the URL host + pub fn host(&self) -> Option<&str> { + self.url.host_str() + } + /// Returns this [`StorageUrl`] as a string pub fn as_str(&self) -> &str { self.as_ref() @@ -338,8 +367,7 @@ impl StorageUrl { pub fn service_type(&self) -> StorageService { match self.url.scheme() { "file" => StorageService::Local, - "az" | "abfs" | "abfss" | "adls2" | "azure" | "wasb" => StorageService::Azure, - // TODO is s3a permissible? + "az" | "abfs" | "abfss" | "adls2" | "azure" | "wasb" | "adl" => StorageService::Azure, "s3" | "s3a" => StorageService::S3, "gs" => StorageService::GCS, _ => StorageService::Unknown, @@ -395,11 +423,53 @@ fn get_storage_backend( } #[cfg(feature = "azure")] StorageService::Azure => { - let url: &Url = storage_url.as_ref(); - // TODO we have to differentiate ... - let container_name = url.host_str().ok_or(ObjectStoreError::NotImplemented)?; + let (container_name, url_account) = match storage_url.scheme() { + "az" | "adl" | "azure" => { + let container = storage_url.host().ok_or(ObjectStoreError::NotImplemented)?; + (container.to_owned(), None) + } + "adls2" => { + log::warn!("Support for the 'adls2' scheme is deprecated and will be removed in a future version. Use `az:///` instead."); + let account = storage_url.host().ok_or(ObjectStoreError::NotImplemented)?; + let container = storage_url + .prefix + .parts() + .next() + .ok_or(ObjectStoreError::NotImplemented)? + .to_owned(); + (container.as_ref().to_string(), Some(account)) + } + "abfs" | "abfss" => { + // abfs(s) might refer to the fsspec convention abfs:/// + // or the convention for the hadoop driver abfs[s]://@.dfs.core.windows.net/ + let url: &Url = storage_url.as_ref(); + if url.username().is_empty() { + ( + url.host_str() + .ok_or(ObjectStoreError::NotImplemented)? + .to_string(), + None, + ) + } else { + let parts: Vec<&str> = url + .host_str() + .ok_or(ObjectStoreError::NotImplemented)? + .splitn(2, '.') + .collect(); + if parts.len() != 2 { + Err(ObjectStoreError::NotImplemented) + } else { + Ok((url.username().to_owned(), Some(parts[0]))) + }? + } + } + _ => todo!(), + }; let mut builder = get_azure_builder_from_options(options.unwrap_or_default()) .with_container_name(container_name); + if let Some(account) = url_account { + builder = builder.with_account(account); + } if let Some(allow) = allow_http { builder = builder.with_allow_http(allow); } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index b1cd58db16..0e31b6b61d 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -38,8 +38,8 @@ //! # Optional cargo package features //! //! - `s3`, `gcs`, `azure` - enable the storage backends for AWS S3, Google Cloud Storage (GCS), -//! or Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS instead of native -//! TLS implementation. +//! or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS +//! instead of native TLS implementation. //! - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue. //! - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation //! for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index bb95aeeada..56594fa5bf 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -7,7 +7,6 @@ use object_store::{path::Path, ObjectStore}; use serde_json::{Map, Value}; use std::any::Any; use std::collections::HashMap; -use std::process::Command; use std::sync::Arc; use tempdir::TempDir; @@ -138,7 +137,6 @@ impl TestContext { ); let backend = self.new_storage(); - let p = self.config.get("URI").unwrap().to_string(); let mut dt = DeltaTable::new(backend, DeltaTableConfig::default()); let mut commit_info = Map::::new(); From 63c9db1d52da21031b0d46f99506200f484739b5 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 31 Aug 2022 16:01:32 +0200 Subject: [PATCH 3/3] docs: add docs about supported urls --- python/docs/source/usage.rst | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 9d1563fe3c..eb65262ac7 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -29,13 +29,31 @@ To load the current version, use the constructor: >>> dt = DeltaTable("../rust/tests/data/delta-0.2.0") Depending on your storage backend, you could use the ``storage_options`` parameter to provide some configuration. -Currently only AWS S3 is supported. +Configuration is defined for specific backends - `s3 options`_, `azure options`_. .. code-block:: python >>> storage_options = {"AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY":"THE_AWS_SECRET_ACCESS_KEY"} >>> dt = DeltaTable("../rust/tests/data/delta-0.2.0", storage_options=storage_options) +The configuration can also be provided via the environment, and the basic service provider is derived from the URL +being used. We try to support many of the well-known formats to identify basic service properties. + +__S3__: + + * s3:/// + * s3a:/// + +__Azure__: + + * az:/// + * adl:/// + * abfs:/// + +__GCS__: + + * gs:/// + Alternatively, if you have a data catalog you can load it by reference to a database and table name. Currently only AWS Glue is supported. @@ -61,6 +79,8 @@ Besides local filesystems, the following backends are supported: * Google Cloud Storage, detected by the prefix ``gs://``. .. _`specific instructions`: https://github.com/delta-io/delta-rs/blob/main/docs/ADLSGen2-HOWTO.md +.. _`s3 options`: https://github.com/delta-io/delta-rs/blob/17999d24a58fb4c98c6280b9e57842c346b4603a/rust/src/builder.rs#L423-L491 +.. _`azure options`: https://github.com/delta-io/delta-rs/blob/17999d24a58fb4c98c6280b9e57842c346b4603a/rust/src/builder.rs#L524-L539 Time Travel