Skip to content

Commit

Permalink
Move kv expect revision to own functions
Browse files Browse the repository at this point in the history
  • Loading branch information
boba2fett authored and Jarema committed Jan 3, 2024
1 parent 7cb3755 commit d52a419
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
2 changes: 1 addition & 1 deletion async-nats/examples/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn main() -> Result<(), async_nats::Error> {
// Unsurprisingly, we get the new updated value as a message.
// Since it's KV interface, we should be able to delete a key as well.
// Does this result in a new message?
kv.delete("sue.color", None).await?;
kv.delete("sue.color").await?;
let message = messages.next().await.unwrap()?;
let metadata = message.info()?;
println!(
Expand Down
63 changes: 59 additions & 4 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,38 @@ impl Store {
/// })
/// .await?;
/// kv.put("key", "value".into()).await?;
/// kv.delete("key", None).await?;
/// kv.delete("key").await?;
/// # Ok(())
/// # }
/// ```
pub async fn delete<T: AsRef<str>>(
pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
self.delete_expect_revision(key, None).await
}

/// Deletes a given key if the revision matches. This is a non-destructive operation, which
/// sets a `DELETE` marker.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let revision = kv.put("key", "value".into()).await?;
/// kv.delete_expect_revision("key", Some(revision)).await?;
/// # Ok(())
/// # }
/// ```
pub async fn delete_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
Expand Down Expand Up @@ -707,11 +734,39 @@ impl Store {
/// .await?;
/// kv.put("key", "value".into()).await?;
/// kv.put("key", "another".into()).await?;
/// kv.purge("key", None).await?;
/// kv.purge("key").await?;
/// # Ok(())
/// # }
/// ```
pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
self.purge_expected_revision(key, None).await
}

/// Purges all the revisions of a entry destructively if the resision matches, leaving behind a single
/// purge entry in-place.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// kv.put("key", "value".into()).await?;
/// let revision = kv.put("key", "another".into()).await?;
/// kv.purge_expected_revision("key", Some(revision)).await?;
/// # Ok(())
/// # }
/// ```
pub async fn purge<T: AsRef<str>>(
pub async fn purge_expected_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
Expand Down
32 changes: 21 additions & 11 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ mod kv {
kv.put("key", payload.clone()).await.unwrap();
let value = kv.get("key").await.unwrap();
assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload);
kv.delete("key", None).await.unwrap();
kv.delete("key").await.unwrap();
let ss = kv.get("kv").await.unwrap();
assert!(ss.is_none());

Expand All @@ -247,7 +247,7 @@ mod kv {
}

#[tokio::test]
async fn delete_revision() {
async fn delete_expect_revision() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
Expand All @@ -274,10 +274,15 @@ mod kv {
assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload);

let wrong_revision = 3;
let failed = kv.delete("key", Some(wrong_revision)).await.is_err();
let failed = kv
.delete_expect_revision("key", Some(wrong_revision))
.await
.is_err();
assert!(failed);

kv.delete("key", Some(revision)).await.unwrap();
kv.delete_expect_revision("key", Some(revision))
.await
.unwrap();
let ss = kv.get("kv").await.unwrap();
assert!(ss.is_none());

Expand Down Expand Up @@ -325,13 +330,13 @@ mod kv {

let history = kv.history("dz").await.unwrap().count().await;
assert_eq!(history, 6);
kv.purge("dz", None).await.unwrap();
kv.purge("dz").await.unwrap();
let history = kv.history("dz").await.unwrap().count().await;
assert_eq!(history, 1);
}

#[tokio::test]
async fn purge_revision() {
async fn purge_expect_revision() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
Expand Down Expand Up @@ -367,10 +372,15 @@ mod kv {
assert_eq!(history, 6);

let wrong_revision = 3;
let failed = kv.purge("dz", Some(wrong_revision)).await.is_err();
let failed = kv
.purge_expected_revision("dz", Some(wrong_revision))
.await
.is_err();
assert!(failed);

kv.purge("dz", Some(revision)).await.unwrap();
kv.purge_expected_revision("dz", Some(revision))
.await
.unwrap();
let history = kv.history("dz").await.unwrap().count().await;
assert_eq!(history, 1);
}
Expand Down Expand Up @@ -701,7 +711,7 @@ mod kv {
assert_eq!(vec!["bar", "foo"], keys);

// Delete a key and make sure it doesn't show up in the keys list
kv.delete("bar", None).await.unwrap();
kv.delete("bar").await.unwrap();
let keys = kv
.keys()
.await
Expand All @@ -715,7 +725,7 @@ mod kv {
for i in 0..10 {
kv.put("bar", i.to_string().into()).await.unwrap();
}
kv.purge("foo", None).await.unwrap();
kv.purge("foo").await.unwrap();
let keys = kv
.keys()
.await
Expand Down Expand Up @@ -845,7 +855,7 @@ mod kv {
let name = test.get("name").await.unwrap();
assert_eq!(from_utf8(&name.unwrap()).unwrap(), "ivan");

test.purge("name", None).await.unwrap();
test.purge("name").await.unwrap();
let name = test.get("name").await.unwrap();
assert!(name.is_none());

Expand Down

0 comments on commit d52a419

Please sign in to comment.