From 293922a4559038d767dae546f37c920d767280cb Mon Sep 17 00:00:00 2001 From: Peter Ke Date: Wed, 24 Apr 2024 19:00:33 -0700 Subject: [PATCH 1/3] abort commit --- crates/aws/src/errors.rs | 31 +++++++- crates/aws/src/lib.rs | 45 ++++++++++- crates/aws/src/logstore.rs | 30 ++++++++ crates/aws/tests/integration_s3_dynamodb.rs | 76 ++++++++++++++++++- crates/core/src/logstore/default_logstore.rs | 8 ++ crates/core/src/logstore/mod.rs | 19 ++++- crates/core/src/operations/restore.rs | 2 +- crates/core/src/operations/transaction/mod.rs | 11 +-- crates/core/src/storage/retry_ext.rs | 2 +- 9 files changed, 207 insertions(+), 17 deletions(-) diff --git a/crates/aws/src/errors.rs b/crates/aws/src/errors.rs index a79542bfff..55f2a2d013 100644 --- a/crates/aws/src/errors.rs +++ b/crates/aws/src/errors.rs @@ -6,8 +6,8 @@ use aws_credential_types::provider::error::CredentialsError; use aws_sdk_dynamodb::{ error::SdkError, operation::{ - create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError, - query::QueryError, update_item::UpdateItemError, + create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError, + put_item::PutItemError, query::QueryError, update_item::UpdateItemError, }, }; use aws_smithy_runtime_api::client::result::ServiceError; @@ -89,6 +89,9 @@ pub enum LockClientError { to opt out of support for concurrent writers." )] LockClientRequired, + + #[error("Log entry for table '{table_path}' and version '{version}' is already complete")] + VersionAlreadyCompleted { table_path: String, version: i64 }, } impl From for LockClientError { @@ -164,7 +167,31 @@ impl From for LockClientError { } } +impl From for LockClientError { + fn from(err: DeleteItemError) -> Self { + match err { + DeleteItemError::ConditionalCheckFailedException(_) => { + unreachable!("error must be handled explicitly") + } + DeleteItemError::InternalServerError(_) => err.into(), + DeleteItemError::ProvisionedThroughputExceededException(_) => { + LockClientError::ProvisionedThroughputExceeded + } + DeleteItemError::RequestLimitExceeded(_) => { + LockClientError::ProvisionedThroughputExceeded + } + DeleteItemError::ResourceNotFoundException(_) => LockClientError::LockTableNotFound, + DeleteItemError::ItemCollectionSizeLimitExceededException(_) => err.into(), + DeleteItemError::TransactionConflictException(_) => err.into(), + _ => LockClientError::GenericDynamoDb { + source: Box::new(err), + }, + } + } +} + impl_from_service_error!(GetItemError); impl_from_service_error!(PutItemError); impl_from_service_error!(QueryError); impl_from_service_error!(UpdateItemError); +impl_from_service_error!(DeleteItemError); diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index a60dbb89ac..d179c37e68 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -9,8 +9,8 @@ pub mod storage; use aws_config::SdkConfig; use aws_sdk_dynamodb::{ operation::{ - create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError, - query::QueryError, update_item::UpdateItemError, + create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError, + put_item::PutItemError, query::QueryError, update_item::UpdateItemError, }, types::{ AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType, @@ -416,6 +416,43 @@ impl DynamoDbLockClient { .await } + /// Delete existing log entry if it is not already complete + pub async fn delete_commit_entry( + &self, + version: i64, + table_path: &str, + ) -> Result<(), LockClientError> { + self.retry(|| async { + match self + .dynamodb_client + .delete_item() + .table_name(self.get_lock_table_name()) + .set_key(Some(self.get_primary_key(version, table_path))) + .set_expression_attribute_values(Some(maplit::hashmap! { + ":f".into() => string_attr("false"), + })) + .condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str()) + .send() + .await + { + Ok(_) => Ok(()), + Err(err) => match err.as_service_error() { + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err( + backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), + ), + Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err( + backoff::Error::permanent(LockClientError::VersionAlreadyCompleted { + table_path: table_path.to_owned(), + version, + }), + ), + _ => Err(backoff::Error::permanent(err.into())), + }, + } + }) + .await + } + async fn retry(&self, operation: Fn) -> Result where Fn: FnMut() -> Fut, @@ -553,6 +590,10 @@ pub mod constants { pub static ref CONDITION_EXPR_CREATE: String = format!( "attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})" ); + + pub static ref CONDITION_DELETE_INCOMPLETE: String = format!( + "(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))" + ); } pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f"; diff --git a/crates/aws/src/logstore.rs b/crates/aws/src/logstore.rs index a9a98dbe18..9eba66cb93 100644 --- a/crates/aws/src/logstore.rs +++ b/crates/aws/src/logstore.rs @@ -239,6 +239,36 @@ impl LogStore for S3DynamoDbLogStore { Ok(()) } + /// Tries to abort an entry by first deleting the commit log entry, then deleting the temp commit file + async fn abort_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + self.lock_client + .delete_commit_entry(version, &self.table_path) + .await + .map_err(|err| match err { + LockClientError::ProvisionedThroughputExceeded => todo!( + "deltalake-aws does not yet handle DynamoDB provisioned throughput errors" + ), + LockClientError::VersionAlreadyCompleted { version, .. } => { + error!("Trying to abort a completed commit"); + TransactionError::LogStoreError { + msg: format!("trying to abort a completed log entry: {}", version), + source: Box::new(err), + } + } + err => TransactionError::LogStoreError { + msg: "dynamodb client failed to delete log entry".to_owned(), + source: Box::new(err), + }, + })?; + + abort_commit_entry(&self.storage, version, tmp_commit).await?; + Ok(()) + } + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { debug!("Retrieving latest version of {self:?} at v{current_version}"); let entry = self diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index cf69818966..45aa379bd0 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -18,7 +18,7 @@ use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::storage::commit_uri_from_version; use deltalake_core::storage::StorageOptions; use deltalake_core::table::builder::ensure_table_uri; -use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder}; +use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder, ObjectStoreError}; use deltalake_test::utils::*; use lazy_static::lazy_static; use object_store::path::Path; @@ -182,6 +182,80 @@ async fn test_repair_on_load() -> TestResult<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn test_abort_commit_entry() -> TestResult<()> { + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let client = make_client()?; + let table = prepare_table(&context, "abort_entry").await?; + let options: StorageOptions = OPTIONS.clone().into(); + let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new( + ensure_table_uri(table.table_uri())?, + options.clone(), + &S3_OPTIONS, + std::sync::Arc::new(table.object_store()), + )?; + + let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?; + + log_store + .abort_commit_entry(entry.version, &entry.temp_path) + .await?; + + // The entry should have been aborted - the latest entry should be one version lower + if let Some(new_entry) = client.get_latest_entry(&table.table_uri()).await? { + assert_eq!(entry.version - 1, new_entry.version); + } + // Temp commit file should have been deleted + assert!(matches!( + log_store.object_store().get(&entry.temp_path).await, + Err(ObjectStoreError::NotFound { .. }) + )); + + // Test abort commit is idempotent - still works if already aborted + log_store + .abort_commit_entry(entry.version, &entry.temp_path) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> { + // Test abort commit does not delete the temp commit if the DynamoDB entry is not deleted + let context = IntegrationContext::new(Box::new(S3Integration::default()))?; + let client = make_client()?; + let table = prepare_table(&context, "abort_entry_fail").await?; + let options: StorageOptions = OPTIONS.clone().into(); + let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new( + ensure_table_uri(table.table_uri())?, + options.clone(), + &S3_OPTIONS, + std::sync::Arc::new(table.object_store()), + )?; + + let entry = create_incomplete_commit_entry(&table, 1, "finished_commit").await?; + + // Mark entry as complete + client + .update_commit_entry(entry.version, &table.table_uri()) + .await?; + + // Abort will fail since we marked the entry as complete + assert!(matches!( + log_store + .abort_commit_entry(entry.version, &entry.temp_path) + .await, + Err(_), + )); + + // Check temp commit file still exists + assert!(log_store.object_store().get(&entry.temp_path).await.is_ok()); + + Ok(()) +} + const WORKERS: i64 = 3; const COMMITS: i64 = 15; diff --git a/crates/core/src/logstore/default_logstore.rs b/crates/core/src/logstore/default_logstore.rs index ed463e9947..8fd4f52beb 100644 --- a/crates/core/src/logstore/default_logstore.rs +++ b/crates/core/src/logstore/default_logstore.rs @@ -50,6 +50,14 @@ impl LogStore for DefaultLogStore { super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await } + async fn abort_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError> { + super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await + } + async fn get_latest_version(&self, current_version: i64) -> DeltaResult { super::get_latest_version(self, current_version).await } diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index e6b4c6e2d4..020bd2fb2f 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -18,7 +18,7 @@ use crate::{ kernel::Action, operations::transaction::TransactionError, protocol::{get_last_checkpoint, ProtocolError}, - storage::{commit_uri_from_version, ObjectStoreRef, StorageOptions}, + storage::{commit_uri_from_version, ObjectStoreRef, StorageOptions, retry_ext::ObjectStoreRetryExt}, DeltaTableError, }; use bytes::Bytes; @@ -183,6 +183,13 @@ pub trait LogStore: Sync + Send { tmp_commit: &Path, ) -> Result<(), TransactionError>; + /// Abort the commit entry for the given version. + async fn abort_commit_entry( + &self, + version: i64, + tmp_commit: &Path, + ) -> Result<(), TransactionError>; + /// Find latest version currently stored in the delta log. async fn get_latest_version(&self, start_version: i64) -> DeltaResult; @@ -449,6 +456,16 @@ pub async fn write_commit_entry( Ok(()) } +/// Default implementation for aborting a commit entry +pub async fn abort_commit_entry( + storage: &dyn ObjectStore, + _version: i64, + tmp_commit: &Path, +) -> Result<(), TransactionError> { + storage.delete_with_retries(tmp_commit, 15).await?; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index 4f4cd0568d..c86f6c2db5 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -279,7 +279,7 @@ async fn execute( return Err(err.into()); } Err(err) => { - log_store.object_store().delete(commit).await?; + log_store.abort_commit_entry(commit_version, commit).await?; return Err(err.into()); } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6606a8c339..f5d3ba55c6 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -90,7 +90,6 @@ use crate::kernel::{ }; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::storage::ObjectStoreRetryExt; use crate::table::config::TableConfig; use crate::table::state::DeltaTableState; use crate::{crate_version, DeltaResult}; @@ -553,19 +552,13 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { attempt_number += 1; } Err(err) => { - this.log_store - .object_store() - .delete_with_retries(tmp_commit, 15) - .await?; + this.log_store.abort_commit_entry(version, tmp_commit).await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - this.log_store - .object_store() - .delete_with_retries(tmp_commit, 15) - .await?; + this.log_store.abort_commit_entry(version, tmp_commit).await?; return Err(err.into()); } } diff --git a/crates/core/src/storage/retry_ext.rs b/crates/core/src/storage/retry_ext.rs index b9f7ebf691..81a52f3ba3 100644 --- a/crates/core/src/storage/retry_ext.rs +++ b/crates/core/src/storage/retry_ext.rs @@ -79,4 +79,4 @@ pub trait ObjectStoreRetryExt: ObjectStore { } } -impl ObjectStoreRetryExt for T {} +impl ObjectStoreRetryExt for T {} From f2616c239c3e440664cf42039b376a28fa4c92bd Mon Sep 17 00:00:00 2001 From: Peter Ke Date: Thu, 25 Apr 2024 11:06:03 -0700 Subject: [PATCH 2/3] format --- crates/core/src/logstore/mod.rs | 4 +++- crates/core/src/operations/transaction/mod.rs | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 020bd2fb2f..b8646cdb65 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -18,7 +18,9 @@ use crate::{ kernel::Action, operations::transaction::TransactionError, protocol::{get_last_checkpoint, ProtocolError}, - storage::{commit_uri_from_version, ObjectStoreRef, StorageOptions, retry_ext::ObjectStoreRetryExt}, + storage::{ + commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions, + }, DeltaTableError, }; use bytes::Bytes; diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index f5d3ba55c6..4846844dfc 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -552,13 +552,17 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { attempt_number += 1; } Err(err) => { - this.log_store.abort_commit_entry(version, tmp_commit).await?; + this.log_store + .abort_commit_entry(version, tmp_commit) + .await?; return Err(TransactionError::CommitConflict(err).into()); } }; } Err(err) => { - this.log_store.abort_commit_entry(version, tmp_commit).await?; + this.log_store + .abort_commit_entry(version, tmp_commit) + .await?; return Err(err.into()); } } From 127fd556a6cd021f22fa91d485ba9ea6d026fdde Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 4 May 2024 15:49:48 +0000 Subject: [PATCH 3/3] chore: update the deltalake-aws version and clippy for release of #2452 --- crates/aws/Cargo.toml | 2 +- crates/aws/src/storage.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index aa8aaeb145..9ed7b8b6f4 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.1.1" +version = "0.1.2" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index be68ce6a7c..4deae70b06 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -85,7 +85,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { .0 .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) { - Ok((Arc::from(store), prefix)) + Ok((store, prefix)) } else { let s3_options = S3StorageOptions::from_map(&options.0)?;