Skip to content

Commit

Permalink
feat: add restore command in python binding (#1529)
Browse files Browse the repository at this point in the history
# Description
This is a implementation of the Restore Command for python binding.

# Related Issue(s)
#837

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
loleek and wjones127 authored Aug 1, 2023
1 parent c768ad4 commit 2210c3b
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 4 deletions.
30 changes: 30 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -454,6 +455,35 @@ def pyarrow_schema(self) -> pyarrow.Schema:
)
return self.schema().to_pyarrow()

def restore(
self,
target: Union[int, datetime, str],
*,
ignore_missing_files: bool = False,
protocol_downgrade_allowed: bool = False,
) -> Dict[str, Any]:
"""
Run the Restore command on the Delta Table: restore table to a given version or datetime.
:param target: the expected version will restore, which represented by int, date str or datetime.
:param ignore_missing_files: whether the operation carry on when some data files missing.
:param protocol_downgrade_allowed: whether the operation when protocol version upgraded.
:return: the metrics from restore.
"""
if isinstance(target, datetime):
metrics = self._table.restore(
target.isoformat(),
ignore_missing_files=ignore_missing_files,
protocol_downgrade_allowed=protocol_downgrade_allowed,
)
else:
metrics = self._table.restore(
target,
ignore_missing_files=ignore_missing_files,
protocol_downgrade_allowed=protocol_downgrade_allowed,
)
return json.loads(metrics)

def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_release_version() -> str:
("py:class", "pandas.DataFrame"),
("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"),
("py:class", "pathlib.Path"),
("py:class", "datetime.datetime"),
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
21 changes: 21 additions & 0 deletions python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,24 @@ the method will raise an error.
This method could also be used to insert a new partition if one doesn't already
exist, making this operation idempotent.


Restoring tables
~~~~~~~~~~~~~~~~

.. py:currentmodule:: deltalake.table
Restoring a table will restore delta table to a specified version or datetime. This
operation compares the current state of the delta table with the state to be restored.
And add those missing files into the AddFile actions and add redundant files into
RemoveFile actions. Then commit into a new version.


Use :meth:`DeltaTable.restore` to perform the restore operation. Note that if any other
concurrent operation was performed on the table, restore will fail.

.. code-block:: python
>>> dt = DeltaTable("../rust/tests/data/simple_table")
>>> dt.restore(1)
{'numRemovedFile': 5, 'numRestoredFile': 22}
32 changes: 32 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
Expand Down Expand Up @@ -316,6 +317,37 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}

// Run the restore command on the Delta Table: restore table to a given version or datetime
#[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false))]
pub fn restore(
&mut self,
target: Option<&PyAny>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
) -> PyResult<String> {
let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(val) = target {
if let Ok(version) = val.extract::<i64>() {
cmd = cmd.with_version_to_restore(version)
}
if let Ok(ds) = val.extract::<&str>() {
let datetime = DateTime::<Utc>::from(
DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(|err| {
PyValueError::new_err(format!("Failed to parse datetime string: {err}"))
})?,
);
cmd = cmd.with_datetime_to_restore(datetime)
}
}
cmd = cmd.with_ignore_missing_files(ignore_missing_files);
cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed);
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
Expand Down
79 changes: 79 additions & 0 deletions python/tests/test_restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import datetime
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_version(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
dt.restore(1)
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_datetime_str(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
dt.restore("2020-05-01T00:47:31-07:00")
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_datetime(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
date = datetime.datetime.strptime(
"2023-04-26T21:23:32+08:00", "%Y-%m-%dT%H:%M:%S%z"
)
dt.restore(date)
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1
8 changes: 5 additions & 3 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;

use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError};
Expand Down Expand Up @@ -57,7 +58,8 @@ impl From<RestoreError> for DeltaTableError {
}

/// Metrics from Restore
#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RestoreMetrics {
/// Number of files removed
pub num_removed_file: usize,
Expand Down Expand Up @@ -109,13 +111,13 @@ impl RestoreBuilder {

/// Set whether to ignore missing files which delete manually or by vacuum.
/// If true, continue to run when encountering missing files.
pub fn ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
self.ignore_missing_files = ignore_missing_files;
self
}

/// Set whether allow to downgrade protocol
pub fn protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
self.protocol_downgrade_allowed = protocol_downgrade_allowed;
self
}
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {

let result = DeltaOps(context.table)
.restore()
.ignore_missing_files(true)
.with_ignore_missing_files(true)
.with_version_to_restore(1)
.await;
assert!(result.is_ok());
Expand Down

0 comments on commit 2210c3b

Please sign in to comment.