From 6409e8206dffe998051e1dfaef96eaf8170826e7 Mon Sep 17 00:00:00 2001 From: Baptiste Le Morlec Date: Tue, 13 Aug 2024 00:05:13 +0200 Subject: [PATCH] Adding BrokerProperties when sending message to queue or topic (#1693) * Adding BrokerProperties when sending message to queue or topic * PR feedback: BrokerProperties to SendMessageOptions --- sdk/messaging_servicebus/Cargo.toml | 1 + sdk/messaging_servicebus/README.md | 2 +- .../examples/service_bus00.rs | 2 +- .../examples/service_bus01.rs | 2 +- sdk/messaging_servicebus/src/lib.rs | 2 +- .../src/service_bus/mod.rs | 122 ++++++++++++++++-- .../src/service_bus/queue_client.rs | 9 +- .../src/service_bus/topic_client.rs | 9 +- sdk/messaging_servicebus/tests/service_bus.rs | 42 +++++- 9 files changed, 169 insertions(+), 22 deletions(-) diff --git a/sdk/messaging_servicebus/Cargo.toml b/sdk/messaging_servicebus/Cargo.toml index 2ab1957673..aa745c4ee2 100644 --- a/sdk/messaging_servicebus/Cargo.toml +++ b/sdk/messaging_servicebus/Cargo.toml @@ -21,6 +21,7 @@ tracing = "0.1.40" url = "2.2" bytes = "1.0" serde = "1.0" +serde_json = "1.0" [dev-dependencies] futures = "0.3" diff --git a/sdk/messaging_servicebus/README.md b/sdk/messaging_servicebus/README.md index 9e8933eeee..8a5d5a7537 100644 --- a/sdk/messaging_servicebus/README.md +++ b/sdk/messaging_servicebus/README.md @@ -23,7 +23,7 @@ async fn main() -> azure_core::Result<()> { policy_key, )?; - client.send_message("hello world").await?; + client.send_message("hello world", None).await?; let received_message = client.receive_and_delete_message().await?; println!("Received Message: {}", received_message); diff --git a/sdk/messaging_servicebus/examples/service_bus00.rs b/sdk/messaging_servicebus/examples/service_bus00.rs index e631e4b8df..25d1a01746 100644 --- a/sdk/messaging_servicebus/examples/service_bus00.rs +++ b/sdk/messaging_servicebus/examples/service_bus00.rs @@ -28,7 +28,7 @@ async fn main() { let message_to_send = "hello, world!"; client - .send_message(message_to_send) + .send_message(message_to_send, None) .await .expect("Failed to send message while testing receive"); diff --git a/sdk/messaging_servicebus/examples/service_bus01.rs b/sdk/messaging_servicebus/examples/service_bus01.rs index e8b32ae8b0..5c4b80effb 100644 --- a/sdk/messaging_servicebus/examples/service_bus01.rs +++ b/sdk/messaging_servicebus/examples/service_bus01.rs @@ -34,7 +34,7 @@ async fn main() { let message_to_send = "hello, world!"; sender - .send_message(message_to_send) + .send_message(message_to_send, None) .await .expect("Failed to send message while testing receive"); diff --git a/sdk/messaging_servicebus/src/lib.rs b/sdk/messaging_servicebus/src/lib.rs index 32225378e3..c79df1c06e 100644 --- a/sdk/messaging_servicebus/src/lib.rs +++ b/sdk/messaging_servicebus/src/lib.rs @@ -22,7 +22,7 @@ async fn main() -> azure_core::Result<()> { policy_key, )?; - client.send_message("hello world").await?; + client.send_message("hello world", None).await?; let received_message = client.receive_and_delete_message().await?; println!("Received Message: {}", received_message); diff --git a/sdk/messaging_servicebus/src/service_bus/mod.rs b/sdk/messaging_servicebus/src/service_bus/mod.rs index 49d94e9487..43708ea1e3 100644 --- a/sdk/messaging_servicebus/src/service_bus/mod.rs +++ b/sdk/messaging_servicebus/src/service_bus/mod.rs @@ -7,10 +7,14 @@ pub use self::{ }; use crate::utils::{craft_peek_lock_url, get_head_url}; use azure_core::{ - auth::Secret, error::Error, from_json, headers, hmac::hmac_sha256, CollectedResponse, - HttpClient, Method, Request, StatusCode, Url, + auth::Secret, + error::Error, + from_json, + headers::{self, HeaderName, HeaderValue, CONTENT_TYPE}, + hmac::hmac_sha256, + CollectedResponse, HttpClient, Method, Request, StatusCode, Url, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ str::FromStr, time::Duration, @@ -21,6 +25,7 @@ use url::form_urlencoded::{self, Serializer}; /// Default duration for the SAS token in days — We might want to make this configurable at some point const DEFAULT_SAS_DURATION: u64 = 3_600; // seconds = 1 hour +const BROKER_PROPERTIES: HeaderName = HeaderName::from_static("brokerproperties"); /// Prepares an HTTP request fn finalize_request( @@ -89,10 +94,11 @@ async fn send_message( policy_name: &str, signing_key: &Secret, msg: &str, + send_message_options: Option, ) -> azure_core::Result<()> { let url = format!("https://{namespace}.servicebus.windows.net/{queue_or_topic}/messages"); - let req = finalize_request( + let mut req = finalize_request( &url, Method::Post, Some(msg.to_string()), @@ -100,6 +106,8 @@ async fn send_message( signing_key, )?; + req.insert_headers(&send_message_options); + http_client .as_ref() .execute_request_check_status(&req) @@ -284,7 +292,7 @@ impl PeekLockResponse { pub struct BrokerProperties { pub delivery_count: i32, pub enqueued_sequence_number: Option, - #[serde(deserialize_with = "BrokerProperties::option_rfc2822")] + #[serde(with = "time::serde::rfc2822::option")] pub enqueued_time_utc: Option, pub lock_token: String, #[serde(with = "time::serde::rfc2822")] @@ -295,15 +303,6 @@ pub struct BrokerProperties { pub time_to_live: f64, } -impl BrokerProperties { - fn option_rfc2822<'de, D>(value: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - Ok(Some(time::serde::rfc2822::deserialize(value)?)) - } -} - impl FromStr for BrokerProperties { type Err = azure_core::Error; @@ -311,3 +310,98 @@ impl FromStr for BrokerProperties { from_json(s) } } + +#[derive(Debug, Default)] +pub struct SendMessageOptions { + pub content_type: Option, + pub broker_properties: Option, +} + +impl headers::AsHeaders for SendMessageOptions { + type Iter = std::vec::IntoIter<(HeaderName, HeaderValue)>; + + fn as_headers(&self) -> Self::Iter { + let mut headers: Vec<(HeaderName, HeaderValue)> = vec![]; + + if let Some(content_type) = &self.content_type { + headers.push((CONTENT_TYPE, content_type.into())); + } + + if let Some(broker_properties) = &self.broker_properties { + headers.push(( + BROKER_PROPERTIES, + serde_json::to_string(broker_properties).unwrap().into(), + )); + } + + headers.into_iter() + } +} + +#[derive(Clone, Debug, Serialize, Default)] +#[serde(rename_all = "PascalCase")] +pub struct SettableBrokerProperties { + #[serde(skip_serializing_if = "Option::is_none")] + pub correlation_id: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub session_id: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub message_id: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub label: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub reply_to: Option, + + #[serde( + skip_serializing_if = "Option::is_none", + serialize_with = "duration_to_seconds_f64" + )] + pub time_to_live: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub to: Option, + + #[serde( + with = "time::serde::rfc2822::option", + skip_serializing_if = "Option::is_none" + )] + pub scheduled_enqueue_time_utc: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub reply_to_session_id: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_key: Option, +} + +fn duration_to_seconds_f64(duration: &Option, serializer: S) -> Result +where + S: serde::Serializer, +{ + if let Some(duration) = duration { + serializer.serialize_f64(duration.as_secs_f64()) + } else { + serializer.serialize_none() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::service_bus::SettableBrokerProperties; + + #[test] + fn test_duration_serialize() { + let dur = SettableBrokerProperties { + time_to_live: Some(Duration::from_millis(4444)), + ..Default::default() + }; + let dur_str = serde_json::to_string(&dur).unwrap(); + assert_eq!(dur_str, r#"{"TimeToLive":4.444}"#); + } +} diff --git a/sdk/messaging_servicebus/src/service_bus/queue_client.rs b/sdk/messaging_servicebus/src/service_bus/queue_client.rs index 60af53df42..1e7835d050 100644 --- a/sdk/messaging_servicebus/src/service_bus/queue_client.rs +++ b/sdk/messaging_servicebus/src/service_bus/queue_client.rs @@ -11,6 +11,8 @@ use std::time::Duration; use azure_core::{auth::Secret, error::Error, HttpClient}; +use super::SendMessageOptions; + /// Client object that allows interaction with the `ServiceBus` API #[derive(Debug, Clone)] pub struct QueueClient { @@ -49,7 +51,11 @@ impl QueueClient { } /// Sends a message to the queue - pub async fn send_message(&self, msg: &str) -> Result<(), Error> { + pub async fn send_message( + &self, + msg: &str, + send_message_options: Option, + ) -> Result<(), Error> { send_message( &self.http_client, &self.namespace, @@ -57,6 +63,7 @@ impl QueueClient { &self.policy_name, &self.signing_key, msg, + send_message_options, ) .await } diff --git a/sdk/messaging_servicebus/src/service_bus/topic_client.rs b/sdk/messaging_servicebus/src/service_bus/topic_client.rs index e9e037ecc1..67951f9a56 100644 --- a/sdk/messaging_servicebus/src/service_bus/topic_client.rs +++ b/sdk/messaging_servicebus/src/service_bus/topic_client.rs @@ -11,6 +11,8 @@ use std::time::Duration; use azure_core::{auth::Secret, error::Error, HttpClient}; +use super::SendMessageOptions; + /// Client object that allows interaction with the `ServiceBus` API #[derive(Debug, Clone)] pub struct TopicClient { @@ -73,7 +75,11 @@ impl TopicSender { Self { topic_client } } /// Sends a message to the topic - pub async fn send_message(&self, msg: &str) -> Result<(), Error> { + pub async fn send_message( + &self, + msg: &str, + send_message_options: Option, + ) -> Result<(), Error> { send_message( &self.topic_client.http_client, &self.topic_client.namespace, @@ -81,6 +87,7 @@ impl TopicSender { &self.topic_client.policy_name, &self.topic_client.signing_key, msg, + send_message_options, ) .await } diff --git a/sdk/messaging_servicebus/tests/service_bus.rs b/sdk/messaging_servicebus/tests/service_bus.rs index 36a05379bf..c0aa36bf43 100644 --- a/sdk/messaging_servicebus/tests/service_bus.rs +++ b/sdk/messaging_servicebus/tests/service_bus.rs @@ -1,13 +1,51 @@ #![cfg(all(test, feature = "test_e2e"))] // to run this, do: `cargo test --features test_e2e` -use azure_messaging_servicebus::service_bus::QueueClient; +use azure_messaging_servicebus::service_bus::{ + QueueClient, SendMessageOptions, SettableBrokerProperties, +}; use std::time::Duration; +use time::OffsetDateTime; #[tokio::test] async fn send_message_test() { let client = create_client().unwrap(); client - .send_message("hello, world!") + .send_message("hello, world!", None) + .await + .expect("Failed to send message"); +} + +#[tokio::test] +async fn send_message_delayed_test() { + let client = create_client().unwrap(); + client + .send_message( + "hello, world!", + Some(SendMessageOptions { + broker_properties: Some(SettableBrokerProperties { + scheduled_enqueue_time_utc: Some( + OffsetDateTime::now_utc() + Duration::from_secs(60), + ), + ..Default::default() + }), + ..Default::default() + }), + ) + .await + .expect("Failed to send message"); +} + +#[tokio::test] +async fn send_message_with_content_type() { + let client = create_client().unwrap(); + client + .send_message( + r#"{"message": "content"}"#, + Some(SendMessageOptions { + content_type: Some("application/json".into()), + ..Default::default() + }), + ) .await .expect("Failed to send message"); }