diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index b14eb13af..1628b3e9a 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -649,6 +649,37 @@ impl Store { /// # } /// ``` pub async fn delete>(&self, key: T) -> Result<(), DeleteError> { + self.delete_expect_revision(key, None).await + } + + /// Deletes a given key if the revision matches. This is a non-destructive operation, which + /// sets a `DELETE` marker. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// let revision = kv.put("key", "value".into()).await?; + /// kv.delete_expect_revision("key", Some(revision)).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn delete_expect_revision>( + &self, + key: T, + revison: Option, + ) -> Result<(), DeleteError> { if !is_valid_key(key.as_ref()) { return Err(DeleteError::new(DeleteErrorKind::InvalidKey)); } @@ -669,6 +700,13 @@ impl Store { .map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?, ); + if let Some(revision) = revison { + headers.insert( + header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, + HeaderValue::from(revision), + ); + } + self.stream .context .publish_with_headers(subject, headers, "".into()) @@ -701,6 +739,38 @@ impl Store { /// # } /// ``` pub async fn purge>(&self, key: T) -> Result<(), PurgeError> { + self.purge_expect_revision(key, None).await + } + + /// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single + /// purge entry in-place. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::StreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// kv.put("key", "value".into()).await?; + /// let revision = kv.put("key", "another".into()).await?; + /// kv.purge_expect_revision("key", Some(revision)).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn purge_expect_revision>( + &self, + key: T, + revison: Option, + ) -> Result<(), PurgeError> { if !is_valid_key(key.as_ref()) { return Err(PurgeError::new(PurgeErrorKind::InvalidKey)); } @@ -717,6 +787,13 @@ impl Store { headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE)); headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT)); + if let Some(revision) = revison { + headers.insert( + header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, + HeaderValue::from(revision), + ); + } + self.stream .context .publish_with_headers(subject, headers, "".into()) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index b41906059..d1e8688c2 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -212,7 +212,6 @@ mod kv { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) - // .connect(server.client_url()) .connect(server.client_url()) .await .unwrap(); @@ -247,12 +246,60 @@ mod kv { assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete); } + #[tokio::test] + async fn delete_expect_revision() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "delete".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + let payload: Bytes = "data".into(); + let revision = kv.put("key", payload.clone()).await.unwrap(); + let value = kv.get("key").await.unwrap(); + assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload); + + let wrong_revision = 3; + let failed = kv + .delete_expect_revision("key", Some(wrong_revision)) + .await + .is_err(); + assert!(failed); + + kv.delete_expect_revision("key", Some(revision)) + .await + .unwrap(); + let ss = kv.get("kv").await.unwrap(); + assert!(ss.is_none()); + + let mut entries = kv.history("key").await.unwrap(); + + let first_op = entries.next().await; + assert_eq!(first_op.unwrap().unwrap().operation, Operation::Put); + + let first_op = entries.next().await; + assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete); + } + #[tokio::test] async fn purge() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|event| async move { println!("event: {event:?}") }) - // .connect(server.client_url()) .connect(server.client_url()) .await .unwrap(); @@ -288,6 +335,56 @@ mod kv { assert_eq!(history, 1); } + #[tokio::test] + async fn purge_expect_revision() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "purge".into(), + description: "test_description".into(), + history: 10, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + kv.put("dz", "0".into()).await.unwrap(); + kv.put("dz", "1".into()).await.unwrap(); + kv.put("dz", "2".into()).await.unwrap(); + kv.put("dz", "3".into()).await.unwrap(); + kv.put("dz", "4".into()).await.unwrap(); + let revision = kv.put("dz", "5".into()).await.unwrap(); + + kv.put("baz", "0".into()).await.unwrap(); + kv.put("baz", "1".into()).await.unwrap(); + kv.put("baz", "2".into()).await.unwrap(); + + let history = kv.history("dz").await.unwrap().count().await; + assert_eq!(history, 6); + + let wrong_revision = 3; + let failed = kv + .purge_expect_revision("dz", Some(wrong_revision)) + .await + .is_err(); + assert!(failed); + + kv.purge_expect_revision("dz", Some(revision)) + .await + .unwrap(); + let history = kv.history("dz").await.unwrap().count().await; + assert_eq!(history, 1); + } + #[tokio::test] async fn history() { let server = nats_server::run_server("tests/configs/jetstream.conf");