Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_non_acquirable field to the dynamodb lock #429

Merged
merged 2 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion rust/src/storage/s3/dynamodb_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl LockItem {
}

/// Error returned by the [`DynamoDbLockClient`] API.
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum DynamoError {
/// Error caused by the DynamoDB table not being created.
#[error("Dynamo table not found")]
Expand All @@ -154,6 +154,19 @@ pub enum DynamoError {
#[error("Could not acquire lock for {0} sec")]
TimedOut(u64),

/// Error returned by [`DynamoDbLockClient::acquire_lock`] which indicates that the lock could
/// not be acquired because the `is_non_acquirable` is set to `true`.
/// Usually this is done intentionally outside of [`DynamoDbLockClient`].
mosyp marked this conversation as resolved.
Show resolved Hide resolved
///
/// The example could be the dropping of a table. For example external service acquires the lock
/// to drop (or drop/create etc., something that modifies the delta log completely) a table.
/// The dangerous part here is that the concurrent delta workers will still perform the write
/// whenever the lock is available, because it effectively locks the rename operation. However
/// if the `is_non_acquirable` is set, then the `NonAcquirableLock` is returned which prohibits
/// the delta-rs to continue the write.
#[error("The existing lock in dynamodb is non-acquirable")]
NonAcquirableLock,

/// Error that caused by the dynamodb request exceeded maximum allowed provisioned throughput
/// for the table.
#[error("Maximum allowed provisioned throughput for the table exceeded")]
Expand Down Expand Up @@ -208,6 +221,8 @@ pub const RECORD_VERSION_NUMBER: &str = "recordVersionNumber";
pub const IS_RELEASED: &str = "isReleased";
/// The field name of `lease_duration` in DynamoDB
pub const LEASE_DURATION: &str = "leaseDuration";
/// The field name of `is_non_acquirable` in DynamoDB
pub const IS_NON_ACQUIRABLE: &str = "isNonAcquirable";
/// The field name of `data` in DynamoDB
pub const DATA: &str = "data";
/// The field name of `data.source` in DynamoDB
Expand Down Expand Up @@ -367,6 +382,7 @@ impl DynamoDbLockClient {
data,
lookup_time: now_millis(),
acquired_expired_lock: false,
is_non_acquirable: item.contains_key(IS_NON_ACQUIRABLE),
}));
}

Expand Down Expand Up @@ -462,6 +478,7 @@ impl DynamoDbLockClient {
data: data.map(String::from),
lookup_time: now_millis(),
acquired_expired_lock,
is_non_acquirable: false,
})
}
}
Expand Down Expand Up @@ -515,6 +532,7 @@ impl<'a> AcquireLockState<'a> {
// there's no lock, we good to acquire it
Ok(self.upsert_new_lock(data).await?)
}
Some(existing) if existing.is_non_acquirable => Err(DynamoError::NonAcquirableLock),
Some(existing) if existing.is_released => {
// lock is released by a caller, we good to acquire it
Ok(self.upsert_released_lock(data).await?)
Expand Down
4 changes: 3 additions & 1 deletion rust/src/storage/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,10 @@ pub struct LockItem {
pub data: Option<String>,
/// The last time this lock was updated or retrieved.
pub lookup_time: u128,
/// Tells whether this lock was acquired by expiring existing one
/// Tells whether this lock was acquired by expiring existing one.
pub acquired_expired_lock: bool,
/// If true then this lock could not be acquired.
pub is_non_acquirable: bool,
}

/// Lock data which stores an attempt to rename `source` into `destination`
Expand Down
58 changes: 58 additions & 0 deletions rust/tests/dynamodb_lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod s3_common;
#[cfg(feature = "s3")]
mod dynamodb {
use deltalake::storage::s3::dynamodb_lock::*;
use deltalake::storage::s3::LockItem;
use maplit::hashmap;
use rusoto_dynamodb::*;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -194,4 +195,61 @@ mod dynamodb {
assert_ne!(w1_rnv, lock_w2.record_version_number);
assert_eq!(current.record_version_number, lock_w2.record_version_number);
}

#[tokio::test]
async fn test_is_non_acquirable() {
async fn update_is_non_acquirable(action: &str) {
let value = match action {
"set" => AttributeValueUpdate {
action: Some("PUT".to_string()),
value: Some(attr_val("1".to_string())),
},
"drop" => AttributeValueUpdate {
action: Some("DELETE".to_string()),
value: None,
},
_ => unreachable!(),
};
DynamoDbClient::new(crate::s3_common::region())
.update_item(UpdateItemInput {
table_name: TABLE.to_string(),
key: hashmap! {
PARTITION_KEY_NAME.to_string() => attr_val("test_is_non_acquirable"),
},
attribute_updates: Some(hashmap! {
IS_NON_ACQUIRABLE.to_string() => value
}),
..Default::default()
})
.await
.unwrap();
}

async fn current_lock(client: &DynamoDbLockClient) -> LockItem {
client.get_lock().await.unwrap().unwrap()
}

let lock = create_dynamo_lock("test_is_non_acquirable", "worker").await;
let item = lock.acquire_lock(None).await.unwrap();

// verify that lock is acquirable
assert!(!item.is_non_acquirable);

// verify that lock is non acquirable
update_is_non_acquirable("set").await;
assert!(current_lock(&lock).await.is_non_acquirable);

// verify that the error is correct
assert_eq!(
lock.acquire_lock(None).await.err().unwrap(),
DynamoError::NonAcquirableLock
);

// verify that lock is acquirable
update_is_non_acquirable("drop").await;
assert!(!current_lock(&lock).await.is_non_acquirable);

// done
assert!(lock.release_lock(&item).await.unwrap());
}
}