Skip to content

Commit

Permalink
Merge branch 'main' into delete-op
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed May 8, 2023
2 parents 4276f3b + 005b874 commit 1f47ca7
Show file tree
Hide file tree
Showing 17 changed files with 402 additions and 182 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
with:
target: ${{ matrix.target }}
command: publish
args: -m python/Cargo.toml --no-sdist $FEATURES_FLAG
args: -m python/Cargo.toml --no-sdist ${{ env.FEATURES_FLAG }}

release-pypi-mac-universal2:
needs: validate-release-tag
Expand All @@ -62,7 +62,7 @@ jobs:
with:
target: ${{ matrix.target }}
command: publish
args: -m python/Cargo.toml --no-sdist --universal2 $FEATURES_FLAG
args: --skip-existing -m python/Cargo.toml --no-sdist --universal2 ${{ env.FEATURES_FLAG }}

release-pypi-windows:
needs: validate-release-tag
Expand All @@ -78,7 +78,7 @@ jobs:
with:
target: x86_64-pc-windows-msvc
command: publish
args: -m python/Cargo.toml --no-sdist $FEATURES_FLAG
args: --skip-existing -m python/Cargo.toml --no-sdist ${{ env.FEATURES_FLAG }}

release-pypi-manylinux:
needs: validate-release-tag
Expand All @@ -94,7 +94,7 @@ jobs:
with:
target: x86_64-unknown-linux-gnu
command: publish
args: -m python/Cargo.toml $FEATURES_FLAG
args: --skip-existing -m python/Cargo.toml ${{ env.FEATURES_FLAG }}

- name: Publish manylinux to pypi aarch64 (without sdist)
uses: messense/maturin-action@v1
Expand All @@ -103,7 +103,7 @@ jobs:
with:
target: aarch64-unknown-linux-gnu
command: publish
args: -m python/Cargo.toml --no-sdist $FEATURES_FLAG
args: --skip-existing -m python/Cargo.toml --no-sdist ${{ env.FEATURES_FLAG }}

release-docs:
needs:
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["azure", "gcs", "python", "datafusion"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental"]

[features]
default = ["rustls"]
Expand Down
3 changes: 2 additions & 1 deletion python/deltalake/data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
class DataCatalog(Enum):
"""List of the Data Catalogs"""

AWS = "glue" # Only AWS Glue Data Catalog is available
AWS = "glue" # AWS Glue Data Catalog
UNITY = "unity" # Databricks Unity Catalog
1 change: 1 addition & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def optimize(
:return: the metrics from optimize
"""
metrics = self._table.optimize(partition_filters, target_size)
self.update_incremental()
return json.loads(metrics)

def pyarrow_schema(self) -> pyarrow.Schema:
Expand Down
10 changes: 9 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def write_deltalake(
],
*,
schema: Optional[pa.Schema] = None,
partition_by: Optional[List[str]] = None,
partition_by: Optional[Union[List[str], str]] = None,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
file_options: Optional[ds.ParquetFileWriteOptions] = None,
Expand Down Expand Up @@ -146,6 +146,10 @@ def write_deltalake(

table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)

# We need to write against the latest table version
if table:
table.update_incremental()

if schema is None:
if isinstance(data, RecordBatchReader):
schema = data.schema
Expand All @@ -166,6 +170,9 @@ def write_deltalake(

filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

if isinstance(partition_by, str):
partition_by = [partition_by]

if table: # already exists
if schema != table.schema().to_pyarrow() and not (
mode == "overwrite" and overwrite_schema
Expand Down Expand Up @@ -331,6 +338,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
schema,
partition_filters,
)
table.update_incremental()


def __enforce_append_only(
Expand Down
15 changes: 14 additions & 1 deletion python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ being used. We try to support many of the well-known formats to identify basic s
* gs://<bucket>/<path>

Alternatively, if you have a data catalog you can load it by reference to a
database and table name. Currently only AWS Glue is supported.
database and table name. Currently supported are AWS Glue and Databricks Unity Catalog.

For AWS Glue catalog, use AWS environment variables to authenticate.

Expand All @@ -70,6 +70,19 @@ For AWS Glue catalog, use AWS environment variables to authenticate.
>>> dt.to_pyarrow_table().to_pydict()
{'id': [5, 7, 9, 5, 6, 7, 8, 9]}
For Databricks Unity Catalog authentication, use environment variables:
* DATABRICKS_WORKSPACE_URL (e.g. https://adb-62800498333851.30.azuredatabricks.net)
* DATABRICKS_ACCESS_TOKEN

.. code-block:: python
>>> from deltalake import DataCatalog, DeltaTable
>>> catalog_name = 'main'
>>> schema_name = 'db_schema'
>>> table_name = 'db_table'
>>> data_catalog = DataCatalog.UNITY
>>> dt = DeltaTable.from_data_catalog(data_catalog=data_catalog, data_catalog_id=catalog_name, database_name=schema_name, table_name=table_name)
.. _`s3 options`: https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants
.. _`azure options`: https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants
.. _`gcs options`: https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pyspark = [

[project.urls]
documentation = "https://delta-io.github.io/delta-rs/python/"
repository = "https://github.com/delta-io/delta-rs"
repository = "https://github.com/delta-io/delta-rs/tree/main/python/"

[tool.mypy]
files = "deltalake/*.py"
Expand Down
2 changes: 2 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def test_optimize_run_table(
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
dt.optimize()
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1
7 changes: 2 additions & 5 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ def test_update_schema(existing_table: DeltaTable):

write_deltalake(existing_table, new_data, mode="overwrite", overwrite_schema=True)

existing_table.update_incremental()

read_data = existing_table.to_pyarrow_table()
assert new_data == read_data
assert existing_table.schema().to_pyarrow() == new_data.schema
Expand Down Expand Up @@ -174,7 +172,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
def test_roundtrip_partitioned(
tmp_path: pathlib.Path, sample_data: pa.Table, column: str
):
write_deltalake(tmp_path, sample_data, partition_by=[column])
write_deltalake(tmp_path, sample_data, partition_by=column)

delta_table = DeltaTable(tmp_path)
assert delta_table.schema().to_pyarrow() == sample_data.schema
Expand Down Expand Up @@ -259,14 +257,13 @@ def test_append_only_should_append_only_with_the_overwrite_mode(
write_deltalake(data_store_type, sample_data, mode=mode)

expected = pa.concat_tables([sample_data, sample_data])
table.update_incremental()

assert table.to_pyarrow_table() == expected
assert table.version() == 1


def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table):
write_deltalake(existing_table, sample_data, mode="overwrite")
existing_table.update_incremental()
assert existing_table.to_pyarrow_table() == sample_data


Expand Down
21 changes: 19 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "37.0.0", optional = true }
arrow = { version = "37", optional = true }
arrow-array = { version = "37", optional = true }
arrow-cast = { version = "37", optional = true }
arrow-schema = { version = "37", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = ["hdfs3", "try_spawn_blocking"], optional = true }
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
], optional = true }
errno = "0.3"
futures = "0.3"
itertools = "0.10"
Expand Down Expand Up @@ -53,6 +59,11 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
# Glue
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Unity
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true }
reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
datafusion = { version = "23", optional = true }
datafusion-expr = { version = "23", optional = true }
Expand Down Expand Up @@ -91,6 +102,7 @@ glibc_version = { path = "../glibc_version", version = "0.1" }

[features]
azure = ["object_store/azure"]
arrow = ["dep:arrow", "arrow-array", "arrow-cast", "arrow-schema"]
default = ["arrow", "parquet"]
datafusion = [
"dep:datafusion",
Expand Down Expand Up @@ -130,6 +142,11 @@ s3 = [
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = [
"reqwest",
"reqwest-middleware",
"reqwest-retry",
]

[[bench]]
name = "read_checkpoint"
Expand Down
27 changes: 27 additions & 0 deletions rust/src/data_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::fmt::Debug;
#[cfg(feature = "glue")]
pub mod glue;

#[cfg(feature = "unity-experimental")]
pub mod unity;

/// Error enum that represents a CatalogError.
#[derive(thiserror::Error, Debug)]
pub enum DataCatalogError {
Expand Down Expand Up @@ -43,6 +46,28 @@ pub enum DataCatalogError {
source: rusoto_credential::CredentialsError,
},

/// Error caused by missing environment variable for Unity Catalog.
#[cfg(feature = "unity-experimental")]
#[error("Missing Unity Catalog environment variable: {var_name}")]
MissingEnvVar {
/// Variable name
var_name: String,
},

/// Error caused by invalid access token value
#[cfg(feature = "unity-experimental")]
#[error("Invalid Databricks personal access token")]
InvalidAccessToken,

/// Databricks API client error
#[cfg(feature = "unity-experimental")]
#[error("API client error: {source}")]
APIClientError {
/// The underlying unity::GetTableError
#[from]
source: unity::GetTableError,
},

/// Error representing an invalid Data Catalog.
#[error("This data catalog doesn't exist: {data_catalog}")]
InvalidDataCatalog {
Expand Down Expand Up @@ -74,6 +99,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result<Box<dyn DataCatalog>, Data
"hdfs" => unimplemented!("HDFS Data Catalog is not implemented"),
#[cfg(feature = "glue")]
"glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)),
#[cfg(feature = "unity-experimental")]
"unity" => Ok(Box::new(unity::UnityCatalog::new()?)),
_ => Err(DataCatalogError::InvalidDataCatalog {
data_catalog: data_catalog.to_string(),
}),
Expand Down
Loading

0 comments on commit 1f47ca7

Please sign in to comment.