Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust): implement abort commit for S3DynamoDBLogStore #2452

Merged
merged 3 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.1.1"
version = "0.1.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
31 changes: 29 additions & 2 deletions crates/aws/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use aws_credential_types::provider::error::CredentialsError;
use aws_sdk_dynamodb::{
error::SdkError,
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
};
use aws_smithy_runtime_api::client::result::ServiceError;
Expand Down Expand Up @@ -89,6 +89,9 @@ pub enum LockClientError {
to opt out of support for concurrent writers."
)]
LockClientRequired,

#[error("Log entry for table '{table_path}' and version '{version}' is already complete")]
VersionAlreadyCompleted { table_path: String, version: i64 },
}

impl From<GetItemError> for LockClientError {
Expand Down Expand Up @@ -164,7 +167,31 @@ impl From<UpdateItemError> for LockClientError {
}
}

impl From<DeleteItemError> for LockClientError {
fn from(err: DeleteItemError) -> Self {
match err {
DeleteItemError::ConditionalCheckFailedException(_) => {
unreachable!("error must be handled explicitly")
}
DeleteItemError::InternalServerError(_) => err.into(),
DeleteItemError::ProvisionedThroughputExceededException(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::RequestLimitExceeded(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::ResourceNotFoundException(_) => LockClientError::LockTableNotFound,
DeleteItemError::ItemCollectionSizeLimitExceededException(_) => err.into(),
DeleteItemError::TransactionConflictException(_) => err.into(),
_ => LockClientError::GenericDynamoDb {
source: Box::new(err),
},
}
}
}

impl_from_service_error!(GetItemError);
impl_from_service_error!(PutItemError);
impl_from_service_error!(QueryError);
impl_from_service_error!(UpdateItemError);
impl_from_service_error!(DeleteItemError);
45 changes: 43 additions & 2 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub mod storage;
use aws_config::SdkConfig;
use aws_sdk_dynamodb::{
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
types::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
Expand Down Expand Up @@ -416,6 +416,43 @@ impl DynamoDbLockClient {
.await
}

/// Delete existing log entry if it is not already complete
pub async fn delete_commit_entry(
&self,
version: i64,
table_path: &str,
) -> Result<(), LockClientError> {
self.retry(|| async {
match self
.dynamodb_client
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyCompleted {
table_path: table_path.to_owned(),
version,
}),
),
_ => Err(backoff::Error::permanent(err.into())),
},
}
})
.await
}

async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Expand Down Expand Up @@ -553,6 +590,10 @@ pub mod constants {
pub static ref CONDITION_EXPR_CREATE: String = format!(
"attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})"
);

pub static ref CONDITION_DELETE_INCOMPLETE: String = format!(
"(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))"
);
}

pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f";
Expand Down
30 changes: 30 additions & 0 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ impl LogStore for S3DynamoDbLogStore {
Ok(())
}

/// Tries to abort an entry by first deleting the commit log entry, then deleting the temp commit file
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
self.lock_client
.delete_commit_entry(version, &self.table_path)
.await
.map_err(|err| match err {
LockClientError::ProvisionedThroughputExceeded => todo!(
"deltalake-aws does not yet handle DynamoDB provisioned throughput errors"
),
LockClientError::VersionAlreadyCompleted { version, .. } => {
error!("Trying to abort a completed commit");
TransactionError::LogStoreError {
msg: format!("trying to abort a completed log entry: {}", version),
source: Box::new(err),
}
}
err => TransactionError::LogStoreError {
msg: "dynamodb client failed to delete log entry".to_owned(),
source: Box::new(err),
},
})?;

abort_commit_entry(&self.storage, version, tmp_commit).await?;
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((Arc::from(store), prefix))
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

Expand Down
76 changes: 75 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::storage::StorageOptions;
use deltalake_core::table::builder::ensure_table_uri;
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder};
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder, ObjectStoreError};
use deltalake_test::utils::*;
use lazy_static::lazy_static;
use object_store::path::Path;
Expand Down Expand Up @@ -182,6 +182,80 @@ async fn test_repair_on_load() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;

log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

// The entry should have been aborted - the latest entry should be one version lower
if let Some(new_entry) = client.get_latest_entry(&table.table_uri()).await? {
assert_eq!(entry.version - 1, new_entry.version);
}
// Temp commit file should have been deleted
assert!(matches!(
log_store.object_store().get(&entry.temp_path).await,
Err(ObjectStoreError::NotFound { .. })
));

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
// Test abort commit does not delete the temp commit if the DynamoDB entry is not deleted
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry_fail").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "finished_commit").await?;

// Mark entry as complete
client
.update_commit_entry(entry.version, &table.table_uri())
.await?;

// Abort will fail since we marked the entry as complete
assert!(matches!(
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await,
Err(_),
));

// Check temp commit file still exists
assert!(log_store.object_store().get(&entry.temp_path).await.is_ok());

Ok(())
}

const WORKERS: i64 = 3;
const COMMITS: i64 = 15;

Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ impl LogStore for DefaultLogStore {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(self, current_version).await
}
Expand Down
21 changes: 20 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use crate::{
kernel::Action,
operations::transaction::TransactionError,
protocol::{get_last_checkpoint, ProtocolError},
storage::{commit_uri_from_version, ObjectStoreRef, StorageOptions},
storage::{
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions,
},
DeltaTableError,
};
use bytes::Bytes;
Expand Down Expand Up @@ -183,6 +185,13 @@ pub trait LogStore: Sync + Send {
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Abort the commit entry for the given version.
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

Expand Down Expand Up @@ -449,6 +458,16 @@ pub async fn write_commit_entry(
Ok(())
}

/// Default implementation for aborting a commit entry
pub async fn abort_commit_entry(
storage: &dyn ObjectStore,
_version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
storage.delete_with_retries(tmp_commit, 15).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn execute(
return Err(err.into());
}
Err(err) => {
log_store.object_store().delete(commit).await?;
log_store.abort_commit_entry(commit_version, commit).await?;
return Err(err.into());
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ use crate::kernel::{
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::storage::ObjectStoreRetryExt;
use crate::table::config::TableConfig;
use crate::table::state::DeltaTableState;
use crate::{crate_version, DeltaResult};
Expand Down Expand Up @@ -554,17 +553,15 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
Err(err) => {
this.log_store
.object_store()
.delete_with_retries(tmp_commit, 15)
.abort_commit_entry(version, tmp_commit)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
}
Err(err) => {
this.log_store
.object_store()
.delete_with_retries(tmp_commit, 15)
.abort_commit_entry(version, tmp_commit)
.await?;
return Err(err.into());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/storage/retry_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ pub trait ObjectStoreRetryExt: ObjectStore {
}
}

impl<T: ObjectStore> ObjectStoreRetryExt for T {}
impl<T: ObjectStore + ?Sized> ObjectStoreRetryExt for T {}
Loading