Skip to content

Commit

Permalink
addressing issue Azure#255
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Chiarlone <[email protected]>
  • Loading branch information
danbugs committed May 27, 2022
1 parent 38b5ed1 commit 9c47c3d
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 308 deletions.
6 changes: 5 additions & 1 deletion sdk/messaging_servicebus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ categories = ["api-bindings"]
edition = "2021"

[dependencies]
azure_core = { path = "../core", version = "0.2" }
azure_core = "0.2"
base64 = "0.13"
chrono = "0.4"
log = "0.4"
url = "2.2"
hmac = "0.12"
sha2 = "0.10"
ring = "0.16"
http = "0.2"
bytes = "1.0"
thiserror = "1.0"

[dev-dependencies]
futures = "0.3"
Expand Down
104 changes: 53 additions & 51 deletions sdk/messaging_servicebus/examples/service_bus00.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,67 @@
// use azure_messaging_servicebus::prelude::*;
// use chrono::Duration;
use azure_messaging_servicebus::prelude::*;
use chrono::Duration;
use std::error::Error;

// async fn send(
// s: String,
// service_bus_namespace: String,
// event_hub_name: String,
// policy_name: String,
// policy_key: String,
// ) -> Result<(), Box<dyn Error>> {
// let mut client = Client::new(
// service_bus_namespace.to_owned(),
// event_hub_name.to_owned(),
// policy_name.to_owned(),
// policy_key.to_owned(),
// )?;
async fn send(
s: String,
service_bus_namespace: String,
event_hub_name: String,
policy_name: String,
policy_key: String,
) -> Result<(), Box<dyn Error>> {
let http_client = azure_core::new_http_client();
let mut client = Client::new(
http_client,
service_bus_namespace.to_owned(),
event_hub_name.to_owned(),
policy_name.to_owned(),
policy_key.to_owned(),
)?;

// println!("before {:?} message send!", s);
// match client.send_event(&s, Duration::days(1)).await {
// Ok(_) => println!("{:?} message sent!", s),
println!("before {:?} message send!", s);
match client.send_event(&s, Duration::days(1)).await {
Ok(_) => println!("{:?} message sent!", s),

// Err(error) => println!("{:?} failed to send message", error),
// }
Err(error) => println!("{:?} failed to send message", error),
}

// Ok(())
// }
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// // First we retrieve the account name and master key from environment variables.
// // We expect master keys (ie, not resource constrained)
// let service_bus_namespace = std::env::var("AZURE_SERVICE_BUS_NAMESPACE")
// .expect("Set env variable AZURE_SERVICE_BUS_NAMESPACE first!");
// let event_hub_name = std::env::var("AZURE_EVENT_HUB_NAME")
// .expect("Set env variable AZURE_EVENT_HUB_NAME first!");
// let policy_name =
// std::env::var("AZURE_POLICY_NAME").expect("Set env variable AZURE_POLICY_NAME first!");
// let policy_key =
// std::env::var("AZURE_POLICY_KEY").expect("Set env variable AZURE_POLICY_KEY first!");
// First we retrieve the account name and master key from environment variables.
// We expect master keys (ie, not resource constrained)
let service_bus_namespace = std::env::var("AZURE_SERVICE_BUS_NAMESPACE")
.expect("Set env variable AZURE_SERVICE_BUS_NAMESPACE first!");
let event_hub_name = std::env::var("AZURE_EVENT_HUB_NAME")
.expect("Set env variable AZURE_EVENT_HUB_NAME first!");
let policy_name =
std::env::var("AZURE_POLICY_NAME").expect("Set env variable AZURE_POLICY_NAME first!");
let policy_key =
std::env::var("AZURE_POLICY_KEY").expect("Set env variable AZURE_POLICY_KEY first!");

// let messages = vec![
// "These", "are", "useless", "messages", "provided", "for", "free", "with", "love",
// ];
// println!(
// "Sending the following messages: {:?}. \
// Please note they will be sent out of order!",
// messages
// );
let messages = vec![
"These", "are", "useless", "messages", "provided", "for", "free", "with", "love",
];
println!(
"Sending the following messages: {:?}. \
Please note they will be sent out of order!",
messages
);

// let mut v = Vec::new();
// for s in messages.into_iter() {
// v.push(send(
// s.to_owned(),
// service_bus_namespace.to_owned(),
// event_hub_name.to_owned(),
// policy_name.to_owned(),
// policy_key.to_owned(),
// ))
// }
let mut v = Vec::new();
for s in messages.into_iter() {
v.push(send(
s.to_owned(),
service_bus_namespace.to_owned(),
event_hub_name.to_owned(),
policy_name.to_owned(),
policy_key.to_owned(),
))
}

// futures::future::join_all(v).await;
futures::future::join_all(v).await;

Ok(())
}
78 changes: 78 additions & 0 deletions sdk/messaging_servicebus/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/// A specialized Result type.
pub type Result<T> = std::result::Result<T, Error>;

#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
CoreError(#[from] azure_core::Error),
#[error("Parse error: {0}")]
ParseError(#[from] azure_core::ParseError),
#[error("Permission error: {0}")]
PermissionError(#[from] azure_core::PermissionError),
#[error("Parse bool error: {0}")]
ParseBoolError(#[from] std::str::ParseBoolError),
#[error("To str error: {0}")]
ToStrError(#[from] http::header::ToStrError),
#[error("URL parse error: {0}")]
UrlParseError(#[from] url::ParseError),
#[error("Date time parse error: {0}")]
DateTimeParseError(#[from] chrono::format::ParseError),
#[error("HTTP error: {0}")]
HttpError(#[from] http::Error),
#[error("Traversing error: {0}")]
TraversingError(#[from] azure_core::TraversingError),
#[error("From UTF-8 error: {0}")]
FromUtf8Error(#[from] std::string::FromUtf8Error),
#[error("Invalid status code: {0:?}")]
InvalidStatusCode(#[from] http::status::InvalidStatusCode),
#[error("UTF-8 conversion error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("base64 decode error: {0}")]
Base64DecodeError(#[from] base64::DecodeError),
#[error("A required header is missing: {0}")]
MissingHeaderError(String),
#[error(
"An expected JSON node is missing: {} of expected type {}",
value,
expected_type
)]
MissingValueError {
value: String,
expected_type: String,
},
#[error("Parse int error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("Header not found: {0}")]
HeaderNotFound(String),
#[error("Error parsing the transaction response: {0:?}")]
TransactionResponseParseError(String),
#[error("Generic error: {0}")]
GenericErrorWithText(String),
#[error("Operation not supported. Operation == {0}, reason == {1}")]
OperationNotSupported(String, String),
#[error("UnexpectedXMLError: {0}")]
UnexpectedXMLError(String),
#[error("digest length {0} bytes instead of 16")]
DigestNot16BytesLong(usize),
#[error("CRC64 length {0} bytes instead of 8")]
CRC64Not8BytesLong(usize),
#[error("At least one of these headers must be present: {0:?}")]
HeadersNotFound(Vec<String>),
#[error("error writing the header value: {0}")]
InvalidHeaderValue(#[from] azure_core::HttpHeaderError),
#[error("error generating hmac: {0}")]
Hmac(#[from] hmac::digest::InvalidLength),
}

impl From<azure_core::error::Error> for Error {
fn from(err: azure_core::error::Error) -> Self {
Self::CoreError(err.into())
}
}

impl From<azure_core::HttpError> for Error {
fn from(error: azure_core::HttpError) -> Self {
Self::CoreError(azure_core::Error::Http(error))
}
}
77 changes: 42 additions & 35 deletions sdk/messaging_servicebus/src/event_hub/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::sync::Arc;

use crate::event_hub::{
delete_message, peek_lock, peek_lock_full, receive_and_delete, renew_lock, send_event,
unlock_message, PeekLockResponse,
};
use chrono::Duration;
use hyper_rustls::HttpsConnector;
use ring::hmac::Key;

type HttpClient = hyper::Client<HttpsConnector<hyper::client::HttpConnector>>;
use azure_core::HttpClient;

pub struct Client {
namespace: String,
event_hub: String,
policy_name: String,
signing_key: Key,
http_client: HttpClient,
http_client: Arc<dyn HttpClient>,
}

impl Client {
pub fn new<N, E, P, K>(
http_client: Arc<dyn HttpClient>,
namespace: N,
event_hub: E,
policy_name: P,
Expand All @@ -30,7 +32,6 @@ impl Client {
K: AsRef<str>,
{
let signing_key = Key::new(ring::hmac::HMAC_SHA256, key.as_ref().as_bytes());
let http_client = hyper::Client::builder().build(HttpsConnector::with_native_roots());

Ok(Client {
namespace: namespace.into(),
Expand All @@ -41,11 +42,7 @@ impl Client {
})
}

pub async fn send_event(
&mut self,
event_body: &str,
duration: Duration,
) -> Result<(), azure_core::Error> {
pub async fn send_event(&mut self, event_body: &str, duration: Duration) -> crate::Result<()> {
send_event(
&self.http_client,
&self.namespace,
Expand All @@ -62,24 +59,28 @@ impl Client {
&mut self,
duration: Duration,
timeout: Option<Duration>,
) -> Result<String, azure_core::Error> {
peek_lock(
&self.http_client,
&self.namespace,
&self.event_hub,
&self.policy_name,
&self.signing_key,
duration,
timeout,
)
.await
) -> crate::Result<String> {
Ok(std::str::from_utf8(
&peek_lock(
&self.http_client,
&self.namespace,
&self.event_hub,
&self.policy_name,
&self.signing_key,
duration,
timeout,
)
.await?
.into_body(),
)?
.to_string())
}

pub async fn peek_lock_full(
&mut self,
duration: Duration,
timeout: Option<Duration>,
) -> Result<PeekLockResponse, azure_core::Error> {
) -> crate::Result<PeekLockResponse> {
peek_lock_full(
&self.http_client,
&self.namespace,
Expand All @@ -92,24 +93,28 @@ impl Client {
.await
}

pub async fn receive_and_delete(&mut self, duration: Duration) -> Result<String, azure_core::Error> {
receive_and_delete(
&self.http_client,
&self.namespace,
&self.event_hub,
&self.policy_name,
&self.signing_key,
duration,
)
.await
pub async fn receive_and_delete(&mut self, duration: Duration) -> crate::Result<String> {
Ok(std::str::from_utf8(
&receive_and_delete(
&self.http_client,
&self.namespace,
&self.event_hub,
&self.policy_name,
&self.signing_key,
duration,
)
.await?
.into_body(),
)?
.to_string())
}

pub async fn unlock_message(
&mut self,
message_id: &str,
lock_token: &str,
duration: Duration,
) -> Result<(), azure_core::Error> {
) -> crate::Result<()> {
unlock_message(
&self.http_client,
&self.namespace,
Expand All @@ -128,7 +133,7 @@ impl Client {
message_id: &str,
lock_token: &str,
duration: Duration,
) -> Result<(), azure_core::Error> {
) -> crate::Result<()> {
delete_message(
&self.http_client,
&self.namespace,
Expand All @@ -147,7 +152,7 @@ impl Client {
message_id: &str,
lock_token: &str,
duration: Duration,
) -> Result<(), azure_core::Error> {
) -> crate::Result<()> {
renew_lock(
&self.http_client,
&self.namespace,
Expand All @@ -172,7 +177,9 @@ mod test {
pub fn client_enc() {
let str_to_sign = "This must be secret!";

let c = Client::new("namespace", "event_hub", "policy", "key").unwrap();
let http_client = azure_core::new_http_client();

let c = Client::new(http_client, "namespace", "event_hub", "policy", "key").unwrap();

let sig = hmac::sign(&c.signing_key, str_to_sign.as_bytes());
let sig = ::base64::encode(sig.as_ref());
Expand Down
Loading

0 comments on commit 9c47c3d

Please sign in to comment.