Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for queries #240

Merged
merged 5 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/core/src/headers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,5 @@ pub const REQUIRES_SYNC: &str = "x-ms-requires-sync";
pub const VERSION: &str = "x-ms-version";
pub const PROPERTIES: &str = "x-ms-properties";
pub const NAMESPACE_ENABLED: &str = "x-ms-namespace-enabled";
pub const MAX_ITEM_COUNT: &str = "x-ms-max-item-count";
pub const ITEM_TYPE: &str = "x-ms-item-type";
8 changes: 8 additions & 0 deletions sdk/core/src/headers/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ pub fn content_type_from_headers(headers: &HeaderMap) -> Result<&str, Error> {
.to_str()?)
}

pub fn item_count_from_headers(headers: &HeaderMap) -> Result<u32, Error> {
Ok(headers
.get(crate::headers::MAX_ITEM_COUNT)
.ok_or_else(|| Error::HeaderNotFound(crate::MAX_ITEM_COUNT.to_owned()))?
.to_str()?
.parse()?)
}

#[cfg(feature = "enable_hyper")]
pub async fn perform_http_request(
client: &Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::headers;
use azure_core::AddAsHeader;
use crate::AddAsHeader;
use http::request::Builder;

/// The max number of items in the collection
Expand All @@ -16,9 +16,9 @@ impl MaxItemCount {
impl AddAsHeader for MaxItemCount {
fn add_as_header(&self, builder: Builder) -> Builder {
if self.0 <= 0 {
builder.header(headers::HEADER_MAX_ITEM_COUNT, -1)
builder.header(headers::MAX_ITEM_COUNT, -1)
} else {
builder.header(headers::HEADER_MAX_ITEM_COUNT, self.0)
builder.header(headers::MAX_ITEM_COUNT, self.0)
}
}
}
2 changes: 2 additions & 0 deletions sdk/core/src/request_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod if_source_modified_since_condition;
mod lease;
mod lease_break_period;
mod lease_duration;
mod max_item_count;
mod max_results;
mod metadata;
mod next_marker;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub use if_source_modified_since_condition::IfSourceModifiedSinceCondition;
pub use lease::LeaseId;
pub use lease_break_period::LeaseBreakPeriod;
pub use lease_duration::LeaseDuration;
pub use max_item_count::MaxItemCount;
pub use max_results::MaxResults;
pub use metadata::Metadata;
pub use next_marker::NextMarker;
Expand Down
8 changes: 0 additions & 8 deletions sdk/cosmos/src/headers/from_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ pub(crate) fn request_charge_from_headers(headers: &HeaderMap) -> Result<f64, Co
.parse()?)
}

pub(crate) fn item_count_from_headers(headers: &HeaderMap) -> Result<u32, CosmosError> {
Ok(headers
.get(HEADER_ITEM_COUNT)
.ok_or_else(|| CosmosError::HeaderNotFound(HEADER_ITEM_COUNT.to_owned()))?
.to_str()?
.parse()?)
}

pub(crate) fn role_from_headers(headers: &HeaderMap) -> Result<u32, CosmosError> {
Ok(headers
.get(HEADER_ROLE)
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/headers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ pub(crate) const HEADER_VERSION: &str = "x-ms-version"; // Cow[str]
pub(crate) const HEADER_DATE: &str = "x-ms-date"; // [String]
pub(crate) const HEADER_DOCUMENTDB_IS_UPSERT: &str = "x-ms-documentdb-is-upsert"; // [bool]
pub(crate) const HEADER_INDEXING_DIRECTIVE: &str = "x-ms-indexing-directive"; // [IndexingDirective]
pub(crate) const HEADER_MAX_ITEM_COUNT: &str = "x-ms-max-item-count"; // [u64]
pub(crate) const HEADER_ITEM_COUNT: &str = "x-ms-item-count"; // [u64]
pub(crate) const HEADER_CONSISTENCY_LEVEL: &str = "x-ms-consistency-level"; // [ConsistencyLevel]
pub(crate) const HEADER_SESSION_TOKEN: &str = "x-ms-session-token"; // [ContinuationToken]
pub(crate) const HEADER_ALLOW_MULTIPLE_WRITES: &str = "x-ms-cosmos-allow-tentative-writes"; // [bool]
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,11 @@ mod consistency_level;
mod cosmos_entity;
mod errors;
mod headers;
mod max_item_count;
mod resource_quota;
mod to_json_vector;

pub use consistency_level::ConsistencyLevel;
pub use cosmos_entity::CosmosEntity;
pub use max_item_count::MaxItemCount;
pub use resource_quota::ResourceQuota;

pub use errors::CosmosError;
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! ```

#[doc(inline)]
pub use crate::{ConsistencyLevel, CosmosEntity, CosmosError, MaxItemCount};
pub use crate::{ConsistencyLevel, CosmosEntity, CosmosError};

#[doc(inline)]
pub use crate::clients::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::headers::from_headers::*;
use crate::CosmosError;
use azure_core::headers::session_token_from_headers;
use azure_core::headers::{item_count_from_headers, session_token_from_headers};
use chrono::{DateTime, Utc};
use http::response::Response;

Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/src/responses/list_attachments_response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::headers::from_headers::*;
use crate::resources::Attachment;
use crate::{CosmosError, ResourceQuota};
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::headers::{
continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers,
};
use azure_core::SessionToken;
use chrono::{DateTime, Utc};
use http::response::Response;
Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/src/responses/list_documents_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::headers::from_headers::*;
use crate::resources::document::{Document, DocumentAttributes};
use crate::{CosmosError, ResourceQuota};

use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::headers::{
continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers,
};
use azure_core::SessionToken;
use chrono::{DateTime, Utc};
use http::response::Response;
Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/src/responses/list_triggers_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::headers::from_headers::*;
use crate::resources::Trigger;
use crate::CosmosError;
use crate::ResourceQuota;
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::headers::{
continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers,
};
use chrono::{DateTime, Utc};
use http::response::Response;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::headers::from_headers::*;
use crate::resources::UserDefinedFunction;
use crate::CosmosError;
use crate::ResourceQuota;
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::headers::{
continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers,
};
use chrono::{DateTime, Utc};
use http::response::Response;

Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/src/responses/query_documents_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::errors::ConversionToDocumentError;
use crate::headers::from_headers::*;
use crate::resources::document::DocumentAttributes;
use crate::{CosmosError, ResourceQuota};
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::headers::{
continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers,
};
use azure_core::SessionToken;
use chrono::{DateTime, Utc};
use http::response::Response;
Expand Down
3 changes: 2 additions & 1 deletion sdk/iothub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ url = "2.2"
thiserror = "1.0"

[dev-dependencies]
tokio = { version = "1.0", features = ["macros"] }
env_logger = "0.8"
hyper = "0.14"
hyper-rustls = "0.22"
reqwest = "0.11.0"
tokio = { version = "1.0", features = ["macros"] }
47 changes: 47 additions & 0 deletions sdk/iothub/examples/query_iothub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use iothub::service::ServiceClient;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
env_logger::init();

let iothub_connection_string = std::env::var("IOTHUB_CONNECTION_STRING")
.expect("Set env variable IOTHUB_CONNECTION_STRING first!");

let query = "SELECT * FROM devices";
println!("Invoking query '{}' on the IoT Hub", query);

let http_client = azure_core::new_http_client();
let service_client =
ServiceClient::from_connection_string(http_client, iothub_connection_string, 3600)?;

let response = service_client
.query()
.max_item_count(1)
.execute(query)
.await?;

println!(
"Response of first result: {}",
serde_json::to_string(&response.result)?
);

let token = match response.continuation_token {
Some(val) => val,
None => return Ok(()),
};

let response = service_client
.query()
.max_item_count(1)
.continuation(token.as_str())
.execute(query)
.await?;

println!(
"Response of second result: {}",
serde_json::to_string(&response.result)?
);

Ok(())
}
18 changes: 17 additions & 1 deletion sdk/iothub/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod responses;

use crate::service::requests::{
get_identity, get_twin, CreateOrUpdateDeviceIdentityBuilder,
CreateOrUpdateModuleIdentityBuilder, DeleteIdentityBuilder, InvokeMethodBuilder,
CreateOrUpdateModuleIdentityBuilder, DeleteIdentityBuilder, InvokeMethodBuilder, QueryBuilder,
UpdateOrReplaceTwinBuilder,
};
use crate::service::resources::identity::IdentityOperation;
Expand Down Expand Up @@ -676,6 +676,22 @@ impl ServiceClient {
)
}

/// Invoke a query
///
/// ```
/// use std::sync::Arc;
/// use azure_core::HttpClient;
/// use iothub::service::ServiceClient;
///
/// # let http_client = azure_core::new_http_client();
/// # let connection_string = "HostName=cool-iot-hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=YSB2ZXJ5IHNlY3VyZSBrZXkgaXMgaW1wb3J0YW50Cg==";
/// let iothub = ServiceClient::from_connection_string(http_client, connection_string, 3600).expect("Failed to create the ServiceClient!");
/// let query_builder = iothub.query();
/// ```
pub fn query(&self) -> QueryBuilder<'_, '_> {
QueryBuilder::new(&self)
}

/// Prepares a request that can be used by any request builders.
pub(crate) fn prepare_request(&self, uri: &str, method: Method) -> RequestBuilder {
RequestBuilder::new()
Expand Down
2 changes: 2 additions & 0 deletions sdk/iothub/src/service/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod delete_identity_builder;
mod get_identity;
mod get_twin;
mod invoke_method_builder;
mod query_builder;
mod update_or_replace_twin_builder;

pub use create_or_update_device_identity_builder::CreateOrUpdateDeviceIdentityBuilder;
Expand All @@ -12,4 +13,5 @@ pub use delete_identity_builder::DeleteIdentityBuilder;
pub(crate) use get_identity::get_identity;
pub(crate) use get_twin::get_twin;
pub use invoke_method_builder::InvokeMethodBuilder;
pub use query_builder::QueryBuilder;
pub use update_or_replace_twin_builder::UpdateOrReplaceTwinBuilder;
78 changes: 78 additions & 0 deletions sdk/iothub/src/service/requests/query_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#![allow(missing_docs)]

use std::convert::TryInto;

use azure_core::prelude::*;
use azure_core::setters;
use http::{Method, StatusCode};
use serde::Serialize;

use crate::service::{responses::QueryResponse, ServiceClient, API_VERSION};

/// Body for the Query request
#[derive(Serialize, Debug)]
struct QueryBody {
query: String,
}

/// Builder for creating queries
pub struct QueryBuilder<'a, 'b> {
service_client: &'a ServiceClient,
max_item_count: MaxItemCount,
continuation: Option<Continuation<'b>>,
}

impl<'a, 'b> QueryBuilder<'a, 'b> {
/// Create a new query struct
pub(crate) fn new(service_client: &'a ServiceClient) -> Self {
Self {
service_client,
max_item_count: MaxItemCount::new(-1),
continuation: None,
}
}

azure_core::setters! {
continuation: &'b str => Some(Continuation::new(continuation)),
max_item_count: i32 => MaxItemCount::new(max_item_count),
}

/// Invoke a qiven query on the IoT Hub
///
/// ```
/// use std::sync::Arc;
/// use azure_core::HttpClient;
/// use iothub::service::ServiceClient;
///
/// # let http_client = azure_core::new_http_client();
/// # let connection_string = "HostName=cool-iot-hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=YSB2ZXJ5IHNlY3VyZSBrZXkgaXMgaW1wb3J0YW50Cg==";
/// let iothub = ServiceClient::from_connection_string(http_client, connection_string, 3600).expect("Failed to create the ServiceClient!");
/// let query_builder = iothub.query().max_item_count(1).continuation("some_token").execute("SELECT * FROM devices");
/// ```
pub async fn execute<S>(self, query: S) -> Result<QueryResponse, crate::Error>
where
S: Into<String>,
{
let uri = format!(
"https://{}.azure-devices.net/devices/query?api-version={}",
self.service_client.iothub_name, API_VERSION
);

let query_body = QueryBody {
query: query.into(),
};
let body = azure_core::to_json(&query_body)?;

let request = self.service_client.prepare_request(&uri, Method::POST);
let request = azure_core::headers::add_optional_header(&self.continuation, request);
let request = azure_core::headers::add_mandatory_header(&self.max_item_count, request);
let request = request.body(body)?;

Ok(self
.service_client
.http_client()
.execute_request_check_status(request, StatusCode::OK)
.await?
.try_into()?)
}
}
2 changes: 2 additions & 0 deletions sdk/iothub/src/service/responses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ mod device_twin_response;
mod invoke_method_response;
mod module_identity_response;
mod module_twin_response;
mod query_response;

pub use device_identity_response::DeviceIdentityResponse;
pub use device_twin_response::DeviceTwinResponse;
pub use invoke_method_response::InvokeMethodResponse;
pub use module_identity_response::ModuleIdentityResponse;
pub use module_twin_response::ModuleTwinResponse;
pub use query_response::QueryResponse;
30 changes: 30 additions & 0 deletions sdk/iothub/src/service/responses/query_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use azure_core::headers::{
self, continuation_token_from_headers_optional, string_from_headers_mandatory,
};
use http::response::Response;
use serde_json::Value;

/// The response for a query invocation
pub struct QueryResponse {
/// The result of the query
pub result: Value,
/// The continuation token for the next result of the query
pub continuation_token: Option<String>,
/// The type of the item in the result
pub item_type: String,
}

impl std::convert::TryFrom<Response<bytes::Bytes>> for QueryResponse {
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let headers = response.headers();
let body: &[u8] = response.body();

Ok(QueryResponse {
result: serde_json::from_slice(body)?,
continuation_token: continuation_token_from_headers_optional(headers)?,
item_type: string_from_headers_mandatory(headers, headers::ITEM_TYPE)?.to_string(),
})
}
}