Skip to content

Commit

Permalink
Add revision to kv operations
Browse files Browse the repository at this point in the history
  • Loading branch information
boba2fett authored Jan 3, 2024
1 parent b7ea56b commit 0424068
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 2 deletions.
77 changes: 77 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,37 @@ impl Store {
/// # }
/// ```
pub async fn delete<T: AsRef<str>>(&self, key: T) -> Result<(), DeleteError> {
self.delete_expect_revision(key, None).await
}

/// Deletes a given key if the revision matches. This is a non-destructive operation, which
/// sets a `DELETE` marker.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let revision = kv.put("key", "value".into()).await?;
/// kv.delete_expect_revision("key", Some(revision)).await?;
/// # Ok(())
/// # }
/// ```
pub async fn delete_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
) -> Result<(), DeleteError> {
if !is_valid_key(key.as_ref()) {
return Err(DeleteError::new(DeleteErrorKind::InvalidKey));
}
Expand All @@ -669,6 +700,13 @@ impl Store {
.map_err(|err| DeleteError::with_source(DeleteErrorKind::Other, err))?,
);

if let Some(revision) = revison {
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(revision),
);
}

self.stream
.context
.publish_with_headers(subject, headers, "".into())
Expand Down Expand Up @@ -701,6 +739,38 @@ impl Store {
/// # }
/// ```
pub async fn purge<T: AsRef<str>>(&self, key: T) -> Result<(), PurgeError> {
self.purge_expect_revision(key, None).await
}

/// Purges all the revisions of a entry destructively if the revision matches, leaving behind a single
/// purge entry in-place.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// kv.put("key", "value".into()).await?;
/// let revision = kv.put("key", "another".into()).await?;
/// kv.purge_expect_revision("key", Some(revision)).await?;
/// # Ok(())
/// # }
/// ```
pub async fn purge_expect_revision<T: AsRef<str>>(
&self,
key: T,
revison: Option<u64>,
) -> Result<(), PurgeError> {
if !is_valid_key(key.as_ref()) {
return Err(PurgeError::new(PurgeErrorKind::InvalidKey));
}
Expand All @@ -717,6 +787,13 @@ impl Store {
headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE));
headers.insert(NATS_ROLLUP, HeaderValue::from(ROLLUP_SUBJECT));

if let Some(revision) = revison {
headers.insert(
header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
HeaderValue::from(revision),
);
}

self.stream
.context
.publish_with_headers(subject, headers, "".into())
Expand Down
101 changes: 99 additions & 2 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ mod kv {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
// .connect(server.client_url())
.connect(server.client_url())
.await
.unwrap();
Expand Down Expand Up @@ -247,12 +246,60 @@ mod kv {
assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete);
}

#[tokio::test]
async fn delete_expect_revision() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "delete".into(),
description: "test_description".into(),
history: 10,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();
let payload: Bytes = "data".into();
let revision = kv.put("key", payload.clone()).await.unwrap();
let value = kv.get("key").await.unwrap();
assert_eq!(from_utf8(&value.unwrap()).unwrap(), payload);

let wrong_revision = 3;
let failed = kv
.delete_expect_revision("key", Some(wrong_revision))
.await
.is_err();
assert!(failed);

kv.delete_expect_revision("key", Some(revision))
.await
.unwrap();
let ss = kv.get("kv").await.unwrap();
assert!(ss.is_none());

let mut entries = kv.history("key").await.unwrap();

let first_op = entries.next().await;
assert_eq!(first_op.unwrap().unwrap().operation, Operation::Put);

let first_op = entries.next().await;
assert_eq!(first_op.unwrap().unwrap().operation, Operation::Delete);
}

#[tokio::test]
async fn purge() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
// .connect(server.client_url())
.connect(server.client_url())
.await
.unwrap();
Expand Down Expand Up @@ -288,6 +335,56 @@ mod kv {
assert_eq!(history, 1);
}

#[tokio::test]
async fn purge_expect_revision() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "purge".into(),
description: "test_description".into(),
history: 10,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();
kv.put("dz", "0".into()).await.unwrap();
kv.put("dz", "1".into()).await.unwrap();
kv.put("dz", "2".into()).await.unwrap();
kv.put("dz", "3".into()).await.unwrap();
kv.put("dz", "4".into()).await.unwrap();
let revision = kv.put("dz", "5".into()).await.unwrap();

kv.put("baz", "0".into()).await.unwrap();
kv.put("baz", "1".into()).await.unwrap();
kv.put("baz", "2".into()).await.unwrap();

let history = kv.history("dz").await.unwrap().count().await;
assert_eq!(history, 6);

let wrong_revision = 3;
let failed = kv
.purge_expect_revision("dz", Some(wrong_revision))
.await
.is_err();
assert!(failed);

kv.purge_expect_revision("dz", Some(revision))
.await
.unwrap();
let history = kv.history("dz").await.unwrap().count().await;
assert_eq!(history, 1);
}

#[tokio::test]
async fn history() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 0424068

Please sign in to comment.