Skip to content

Commit

Permalink
Add is_non_acquirable field to the dynamodb lock (#429)
Browse files Browse the repository at this point in the history
* Add is_non_acquirable field to the dynamodb lock

* Extends the NonAcquirableLock docs with an example
  • Loading branch information
mosyp authored Sep 17, 2021
1 parent 138a2aa commit 484c374
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 2 deletions.
20 changes: 19 additions & 1 deletion rust/src/storage/s3/dynamodb_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl LockItem {
}

/// Error returned by the [`DynamoDbLockClient`] API.
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum DynamoError {
/// Error caused by the DynamoDB table not being created.
#[error("Dynamo table not found")]
Expand All @@ -154,6 +154,19 @@ pub enum DynamoError {
#[error("Could not acquire lock for {0} sec")]
TimedOut(u64),

/// Error returned by [`DynamoDbLockClient::acquire_lock`] which indicates that the lock could
/// not be acquired because the `is_non_acquirable` is set to `true`.
/// Usually this is done intentionally outside of [`DynamoDbLockClient`].
///
/// The example could be the dropping of a table. For example external service acquires the lock
/// to drop (or drop/create etc., something that modifies the delta log completely) a table.
/// The dangerous part here is that the concurrent delta workers will still perform the write
/// whenever the lock is available, because it effectively locks the rename operation. However
/// if the `is_non_acquirable` is set, then the `NonAcquirableLock` is returned which prohibits
/// the delta-rs to continue the write.
#[error("The existing lock in dynamodb is non-acquirable")]
NonAcquirableLock,

/// Error that caused by the dynamodb request exceeded maximum allowed provisioned throughput
/// for the table.
#[error("Maximum allowed provisioned throughput for the table exceeded")]
Expand Down Expand Up @@ -208,6 +221,8 @@ pub const RECORD_VERSION_NUMBER: &str = "recordVersionNumber";
pub const IS_RELEASED: &str = "isReleased";
/// The field name of `lease_duration` in DynamoDB
pub const LEASE_DURATION: &str = "leaseDuration";
/// The field name of `is_non_acquirable` in DynamoDB
pub const IS_NON_ACQUIRABLE: &str = "isNonAcquirable";
/// The field name of `data` in DynamoDB
pub const DATA: &str = "data";
/// The field name of `data.source` in DynamoDB
Expand Down Expand Up @@ -367,6 +382,7 @@ impl DynamoDbLockClient {
data,
lookup_time: now_millis(),
acquired_expired_lock: false,
is_non_acquirable: item.contains_key(IS_NON_ACQUIRABLE),
}));
}

Expand Down Expand Up @@ -462,6 +478,7 @@ impl DynamoDbLockClient {
data: data.map(String::from),
lookup_time: now_millis(),
acquired_expired_lock,
is_non_acquirable: false,
})
}
}
Expand Down Expand Up @@ -515,6 +532,7 @@ impl<'a> AcquireLockState<'a> {
// there's no lock, we good to acquire it
Ok(self.upsert_new_lock(data).await?)
}
Some(existing) if existing.is_non_acquirable => Err(DynamoError::NonAcquirableLock),
Some(existing) if existing.is_released => {
// lock is released by a caller, we good to acquire it
Ok(self.upsert_released_lock(data).await?)
Expand Down
4 changes: 3 additions & 1 deletion rust/src/storage/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,10 @@ pub struct LockItem {
pub data: Option<String>,
/// The last time this lock was updated or retrieved.
pub lookup_time: u128,
/// Tells whether this lock was acquired by expiring existing one
/// Tells whether this lock was acquired by expiring existing one.
pub acquired_expired_lock: bool,
/// If true then this lock could not be acquired.
pub is_non_acquirable: bool,
}

/// Lock data which stores an attempt to rename `source` into `destination`
Expand Down
58 changes: 58 additions & 0 deletions rust/tests/dynamodb_lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod s3_common;
#[cfg(feature = "s3")]
mod dynamodb {
use deltalake::storage::s3::dynamodb_lock::*;
use deltalake::storage::s3::LockItem;
use maplit::hashmap;
use rusoto_dynamodb::*;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -194,4 +195,61 @@ mod dynamodb {
assert_ne!(w1_rnv, lock_w2.record_version_number);
assert_eq!(current.record_version_number, lock_w2.record_version_number);
}

#[tokio::test]
async fn test_is_non_acquirable() {
async fn update_is_non_acquirable(action: &str) {
let value = match action {
"set" => AttributeValueUpdate {
action: Some("PUT".to_string()),
value: Some(attr_val("1".to_string())),
},
"drop" => AttributeValueUpdate {
action: Some("DELETE".to_string()),
value: None,
},
_ => unreachable!(),
};
DynamoDbClient::new(crate::s3_common::region())
.update_item(UpdateItemInput {
table_name: TABLE.to_string(),
key: hashmap! {
PARTITION_KEY_NAME.to_string() => attr_val("test_is_non_acquirable"),
},
attribute_updates: Some(hashmap! {
IS_NON_ACQUIRABLE.to_string() => value
}),
..Default::default()
})
.await
.unwrap();
}

async fn current_lock(client: &DynamoDbLockClient) -> LockItem {
client.get_lock().await.unwrap().unwrap()
}

let lock = create_dynamo_lock("test_is_non_acquirable", "worker").await;
let item = lock.acquire_lock(None).await.unwrap();

// verify that lock is acquirable
assert!(!item.is_non_acquirable);

// verify that lock is non acquirable
update_is_non_acquirable("set").await;
assert!(current_lock(&lock).await.is_non_acquirable);

// verify that the error is correct
assert_eq!(
lock.acquire_lock(None).await.err().unwrap(),
DynamoError::NonAcquirableLock
);

// verify that lock is acquirable
update_is_non_acquirable("drop").await;
assert!(!current_lock(&lock).await.is_non_acquirable);

// done
assert!(lock.release_lock(&item).await.unwrap());
}
}

0 comments on commit 484c374

Please sign in to comment.