From 171e70e8a3012f01f0ca90f1c1955d3f4b7b3a07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Wed, 27 Dec 2023 14:36:26 +0100 Subject: [PATCH 1/6] Add option for revision to kv delete and purge --- async-nats/examples/kv.rs | 2 +- async-nats/src/jetstream/kv/mod.rs | 22 ++++++++++++++++++---- async-nats/tests/kv_tests.rs | 12 +++++------- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/async-nats/examples/kv.rs b/async-nats/examples/kv.rs index 1172326b2..27d0bfb2e 100644 --- a/async-nats/examples/kv.rs +++ b/async-nats/examples/kv.rs @@ -138,7 +138,7 @@ async fn main() -> Result<(), async_nats::Error> { // Unsurprisingly, we get the new updated value as a message. // Since it's KV interface, we should be able to delete a key as well. // Does this result in a new message? - kv.delete("sue.color").await?; + kv.delete("sue.color", None).await?; let message = messages.next().await.unwrap()?; let metadata = message.info()?; println!( diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index b14eb13af..bd9bdabc1 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -644,11 +644,11 @@ impl Store { /// }) /// .await?; /// kv.put("key", "value".into()).await?; - /// kv.delete("key").await?; + /// kv.delete("key", None).await?; /// # Ok(()) /// # } /// ``` - pub async fn delete>(&self, key: T) -> Result<(), DeleteError> { + pub async fn delete>(&self, key: T, revison: Option) -> Result<(), DeleteError> { if !is_valid_key(key.as_ref()) { return Err(DeleteError::new(DeleteErrorKind::InvalidKey)); } @@ -669,6 +669,13 @@ impl Store { .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?, ); + if let Some(revision) = revison { + headers.insert( + header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, + HeaderValue::from(revision), + ); + } + self.stream .context .publish_with_headers(subject, headers, "".into()) @@ -696,11 +703,11 @@ impl Store { /// .await?; /// kv.put("key", "value".into()).await?; /// kv.put("key", "another".into()).await?; - /// kv.purge("key").await?; + /// kv.purge("key", None).await?; /// # Ok(()) /// # } /// ``` - pub async fn purge>(&self, key: T) -> Result<(), PurgeError> { + pub async fn purge>(&self, key: T, revison: Option) -> Result<(), PurgeError> { if !is_valid_key(key.as_ref()) { return Err(PurgeError::new(PurgeErrorKind::InvalidKey)); } @@ -717,6 +724,13 @@ impl Store { headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE)); headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT)); + if let Some(revision) = revison { + headers.insert( + header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, + HeaderValue::from(revision), + ); + } + self.stream .context .publish_with_headers(subject, headers, "".into()) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index b41906059..1d7f3702d 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -212,7 +212,6 @@ mod kv { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) - // .connect(server.client_url()) .connect(server.client_url()) .await .unwrap(); @@ -234,7 +233,7 @@ mod kv { kv.put("key", payload.clone()).await.unwrap(); let value = kv.get("key").await.unwrap(); assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload); - kv.delete("key").await.unwrap(); + kv.delete("key", None).await.unwrap(); let ss = kv.get("kv").await.unwrap(); assert!(ss.is_none()); @@ -252,7 +251,6 @@ mod kv { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) - // .connect(server.client_url()) .connect(server.client_url()) .await .unwrap(); @@ -283,7 +281,7 @@ mod kv { let history = kv.history("dz").await.unwrap().count().await; assert_eq!(history, 6); - kv.purge("dz").await.unwrap(); + kv.purge("dz", None).await.unwrap(); let history = kv.history("dz").await.unwrap().count().await; assert_eq!(history, 1); } @@ -614,7 +612,7 @@ mod kv { assert_eq!(vec!["bar", "foo"], keys); // Delete a key and make sure it doesn't show up in the keys list - kv.delete("bar").await.unwrap(); + kv.delete("bar", None).await.unwrap(); let keys = kv .keys() .await @@ -628,7 +626,7 @@ mod kv { for i in 0..10 { kv.put("bar", i.to_string().into()).await.unwrap(); } - kv.purge("foo").await.unwrap(); + kv.purge("foo", None).await.unwrap(); let keys = kv .keys() .await @@ -758,7 +756,7 @@ mod kv { let name = test.get("name").await.unwrap(); assert_eq!(from_utf8(&name.unwrap()).unwrap(), "ivan"); - test.purge("name").await.unwrap(); + test.purge("name", None).await.unwrap(); let name = test.get("name").await.unwrap(); assert!(name.is_none()); From 160f24f9dbea326612f5ffaacf20dfed48879968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Wed, 27 Dec 2023 14:39:23 +0100 Subject: [PATCH 2/6] Add tests for kv revisions --- async-nats/tests/kv_tests.rs | 89 ++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 1d7f3702d..59c3488b0 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -246,6 +246,50 @@ mod kv { assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete); } + #[tokio::test] + async fn delete_revision() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "delete".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + let payload: Bytes = "data".into(); + let revision = kv.put("key", payload.clone()).await.unwrap(); + let value = kv.get("key").await.unwrap(); + assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload); + + let wrong_revision = 3; + let failed = kv.delete("key", Some(wrong_revision)).await.is_err(); + assert!(failed); + + kv.delete("key", Some(revision)).await.unwrap(); + let ss = kv.get("kv").await.unwrap(); + assert!(ss.is_none()); + + let mut entries = kv.history("key").await.unwrap(); + + let first_op = entries.next().await; + assert_eq!(first_op.unwrap().unwrap().operation, Operation::Put); + + let first_op = entries.next().await; + assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete); + } + #[tokio::test] async fn purge() { let server = nats_server::run_server("tests/configs/jetstream.conf"); @@ -286,6 +330,51 @@ mod kv { assert_eq!(history, 1); } + #[tokio::test] + async fn purge_revision() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "purge".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + kv.put("dz", "0".into()).await.unwrap(); + kv.put("dz", "1".into()).await.unwrap(); + kv.put("dz", "2".into()).await.unwrap(); + kv.put("dz", "3".into()).await.unwrap(); + kv.put("dz", "4".into()).await.unwrap(); + let revision = kv.put("dz", "5".into()).await.unwrap(); + + kv.put("baz", "0".into()).await.unwrap(); + kv.put("baz", "1".into()).await.unwrap(); + kv.put("baz", "2".into()).await.unwrap(); + + let history = kv.history("dz").await.unwrap().count().await; + assert_eq!(history, 6); + + let wrong_revision = 3; + let failed = kv.purge("dz", Some(wrong_revision)).await.is_err(); + assert!(failed); + + kv.purge("dz", Some(revision)).await.unwrap(); + let history = kv.history("dz").await.unwrap().count().await; + assert_eq!(history, 1); + } + #[tokio::test] async fn history() { let server = nats_server::run_server("tests/configs/jetstream.conf"); From 7cb3755b3a5b65ea63437d76457a1df3f0dae795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Thu, 28 Dec 2023 00:22:17 +0100 Subject: [PATCH 3/6] Update format for longer signature --- async-nats/src/jetstream/kv/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index bd9bdabc1..c22b4bb39 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -648,7 +648,11 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn delete>(&self, key: T, revison: Option) -> Result<(), DeleteError> { + pub async fn delete>( + &self, + key: T, + revison: Option, + ) -> Result<(), DeleteError> { if !is_valid_key(key.as_ref()) { return Err(DeleteError::new(DeleteErrorKind::InvalidKey)); } @@ -707,7 +711,11 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn purge>(&self, key: T, revison: Option) -> Result<(), PurgeError> { + pub async fn purge>( + &self, + key: T, + revison: Option, + ) -> Result<(), PurgeError> { if !is_valid_key(key.as_ref()) { return Err(PurgeError::new(PurgeErrorKind::InvalidKey)); } From d52a4190855448cc15516d9bfde25816d11fc739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Tue, 2 Jan 2024 12:59:08 +0100 Subject: [PATCH 4/6] Move kv expect revision to own functions --- async-nats/examples/kv.rs | 2 +- async-nats/src/jetstream/kv/mod.rs | 63 ++++++++++++++++++++++++++++-- async-nats/tests/kv_tests.rs | 32 +++++++++------ 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/async-nats/examples/kv.rs b/async-nats/examples/kv.rs index 27d0bfb2e..1172326b2 100644 --- a/async-nats/examples/kv.rs +++ b/async-nats/examples/kv.rs @@ -138,7 +138,7 @@ async fn main() -> Result<(), async_nats::Error> { // Unsurprisingly, we get the new updated value as a message. // Since it's KV interface, we should be able to delete a key as well. // Does this result in a new message? - kv.delete("sue.color", None).await?; + kv.delete("sue.color").await?; let message = messages.next().await.unwrap()?; let metadata = message.info()?; println!( diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index c22b4bb39..1a58444b6 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -644,11 +644,38 @@ impl Store { /// }) /// .await?; /// kv.put("key", "value".into()).await?; - /// kv.delete("key", None).await?; + /// kv.delete("key").await?; /// # Ok(()) /// # } /// ``` - pub async fn delete>( + pub async fn delete>(&self, key: T) -> Result<(), DeleteError> { + self.delete_expect_revision(key, None).await + } + + /// Deletes a given key if the revision matches. This is a non-destructive operation, which + /// sets a `DELETE` marker. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// let revision = kv.put("key", "value".into()).await?; + /// kv.delete_expect_revision("key", Some(revision)).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn delete_expect_revision>( &self, key: T, revison: Option, @@ -707,11 +734,39 @@ impl Store { /// .await?; /// kv.put("key", "value".into()).await?; /// kv.put("key", "another".into()).await?; - /// kv.purge("key", None).await?; + /// kv.purge("key").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn purge>(&self, key: T) -> Result<(), PurgeError> { + self.purge_expected_revision(key, None).await + } + + /// Purges all the revisions of a entry destructively if the resision matches, leaving behind a single + /// purge entry in-place. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// kv.put("key", "value".into()).await?; + /// let revision = kv.put("key", "another".into()).await?; + /// kv.purge_expected_revision("key", Some(revision)).await?; /// # Ok(()) /// # } /// ``` - pub async fn purge>( + pub async fn purge_expected_revision>( &self, key: T, revison: Option, diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 59c3488b0..c20d55754 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -233,7 +233,7 @@ mod kv { kv.put("key", payload.clone()).await.unwrap(); let value = kv.get("key").await.unwrap(); assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload); - kv.delete("key", None).await.unwrap(); + kv.delete("key").await.unwrap(); let ss = kv.get("kv").await.unwrap(); assert!(ss.is_none()); @@ -247,7 +247,7 @@ mod kv { } #[tokio::test] - async fn delete_revision() { + async fn delete_expect_revision() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) @@ -274,10 +274,15 @@ mod kv { assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload); let wrong_revision = 3; - let failed = kv.delete("key", Some(wrong_revision)).await.is_err(); + let failed = kv + .delete_expect_revision("key", Some(wrong_revision)) + .await + .is_err(); assert!(failed); - kv.delete("key", Some(revision)).await.unwrap(); + kv.delete_expect_revision("key", Some(revision)) + .await + .unwrap(); let ss = kv.get("kv").await.unwrap(); assert!(ss.is_none()); @@ -325,13 +330,13 @@ mod kv { let history = kv.history("dz").await.unwrap().count().await; assert_eq!(history, 6); - kv.purge("dz", None).await.unwrap(); + kv.purge("dz").await.unwrap(); let history = kv.history("dz").await.unwrap().count().await; assert_eq!(history, 1); } #[tokio::test] - async fn purge_revision() { + async fn purge_expect_revision() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) @@ -367,10 +372,15 @@ mod kv { assert_eq!(history, 6); let wrong_revision = 3; - let failed = kv.purge("dz", Some(wrong_revision)).await.is_err(); + let failed = kv + .purge_expected_revision("dz", Some(wrong_revision)) + .await + .is_err(); assert!(failed); - kv.purge("dz", Some(revision)).await.unwrap(); + kv.purge_expected_revision("dz", Some(revision)) + .await + .unwrap(); let history = kv.history("dz").await.unwrap().count().await; assert_eq!(history, 1); } @@ -701,7 +711,7 @@ mod kv { assert_eq!(vec!["bar", "foo"], keys); // Delete a key and make sure it doesn't show up in the keys list - kv.delete("bar", None).await.unwrap(); + kv.delete("bar").await.unwrap(); let keys = kv .keys() .await @@ -715,7 +725,7 @@ mod kv { for i in 0..10 { kv.put("bar", i.to_string().into()).await.unwrap(); } - kv.purge("foo", None).await.unwrap(); + kv.purge("foo").await.unwrap(); let keys = kv .keys() .await @@ -845,7 +855,7 @@ mod kv { let name = test.get("name").await.unwrap(); assert_eq!(from_utf8(&name.unwrap()).unwrap(), "ivan"); - test.purge("name", None).await.unwrap(); + test.purge("name").await.unwrap(); let name = test.get("name").await.unwrap(); assert!(name.is_none()); From e5d2a861f3250ebb4317ff66a8bda30aa6597e38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Tue, 2 Jan 2024 13:02:51 +0100 Subject: [PATCH 5/6] Make naming consistent to expect_revision --- async-nats/src/jetstream/kv/mod.rs | 6 +++--- async-nats/tests/kv_tests.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 1a58444b6..51b5b024f 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -739,7 +739,7 @@ impl Store { /// # } /// ``` pub async fn purge>(&self, key: T) -> Result<(), PurgeError> { - self.purge_expected_revision(key, None).await + self.purge_expect_revision(key, None).await } /// Purges all the revisions of a entry destructively if the resision matches, leaving behind a single @@ -762,11 +762,11 @@ impl Store { /// .await?; /// kv.put("key", "value".into()).await?; /// let revision = kv.put("key", "another".into()).await?; - /// kv.purge_expected_revision("key", Some(revision)).await?; + /// kv.purge_expect_revision("key", Some(revision)).await?; /// # Ok(()) /// # } /// ``` - pub async fn purge_expected_revision>( + pub async fn purge_expect_revision>( &self, key: T, revison: Option, diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index c20d55754..d1e8688c2 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -373,12 +373,12 @@ mod kv { let wrong_revision = 3; let failed = kv - .purge_expected_revision("dz", Some(wrong_revision)) + .purge_expect_revision("dz", Some(wrong_revision)) .await .is_err(); assert!(failed); - kv.purge_expected_revision("dz", Some(revision)) + kv.purge_expect_revision("dz", Some(revision)) .await .unwrap(); let history = kv.history("dz").await.unwrap().count().await; From d580356da0e29760ec9336302f58f9491327e8aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20T=C3=BCnte?= Date: Wed, 3 Jan 2024 14:51:34 +0100 Subject: [PATCH 6/6] Fix typo in doc Co-authored-by: Tomasz Pietrek --- async-nats/src/jetstream/kv/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 51b5b024f..1628b3e9a 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -742,7 +742,7 @@ impl Store { self.purge_expect_revision(key, None).await } - /// Purges all the revisions of a entry destructively if the resision matches, leaving behind a single + /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single /// purge entry in-place. /// /// # Examples