Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] add a publish client + some improvement around labels. #12

Merged
merged 9 commits into from
Apr 7, 2022
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ futures = "0.3"
indexmap = { version = "1", features = ["serde"] }
humantime-serde = "1"
log = "0.4"
nom = "6"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tracing = "0.1"
url = "2"
urlencoding = "1.3"

http = { version = "0.2", optional = true }
openid = { version = "0.9.1", optional = true }
Expand Down
2 changes: 2 additions & 0 deletions src/command/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
//! Command and Control API.
pub mod v1;
94 changes: 94 additions & 0 deletions src/command/v1/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::error::ClientError;
use crate::openid::TokenProvider;
use crate::util::Client as TraitClient;
use serde::Serialize;
use std::fmt::Debug;
use tracing::instrument;
use url::Url;

/// A client for drogue cloud command and control API, backed by reqwest.
#[derive(Clone, Debug)]
pub struct Client<TP>
where
TP: TokenProvider,
{
client: reqwest::Client,
api_url: Url,
token_provider: TP,
}

type ClientResult<T> = Result<T, ClientError<reqwest::Error>>;

impl<TP> TraitClient<TP> for Client<TP>
where
TP: TokenProvider,
{
fn client(&self) -> &reqwest::Client {
&self.client
}

fn token_provider(&self) -> &TP {
&self.token_provider
}
}

impl<TP> Client<TP>
where
TP: TokenProvider,
{
/// Create a new client instance.
pub fn new(client: reqwest::Client, api_url: Url, token_provider: TP) -> Self {
Self {
client,
api_url,
token_provider,
}
}

fn url(&self, application: &str, device: &str) -> ClientResult<Url> {
let mut url = self.api_url.clone();

{
let mut path = url
.path_segments_mut()
.map_err(|_| ClientError::Request("Failed to get paths".into()))?;

path.extend(&[
"api",
"command",
"v1alpha1",
"apps",
&urlencoding::encode(application),
"devices",
&urlencoding::encode(device),
]);
}

Ok(url)
}

/// Send one way commands to devices.
///
/// The result will be true if the command was accepted.
/// False
#[instrument(skip(payload))]
pub async fn publish_command<A, D, C, P>(
&self,
application: A,
device: D,
command: C,
payload: Option<P>,
) -> ClientResult<Option<bool>>
where
A: AsRef<str> + Debug,
D: AsRef<str> + Debug,
C: AsRef<str> + Debug,
P: Serialize + Send + Sync,
{
let url = self.url(application.as_ref(), device.as_ref())?;
let query = vec![("command".to_string(), command.as_ref().to_string())];

self.create_with_query_parameters(url, payload, Some(query))
.await
}
}
5 changes: 5 additions & 0 deletions src/command/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[cfg(feature = "reqwest")]
mod client;

#[cfg(feature = "reqwest")]
pub use client::*;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A client for the Drogue IoT Cloud APIs.

pub mod admin;
pub mod command;
pub mod core;
pub mod error;
pub mod meta;
Expand Down
60 changes: 14 additions & 46 deletions src/registry/v1/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::data::*;
use crate::core::WithTracing;
use crate::openid::{TokenInjector, TokenProvider};
use crate::openid::TokenProvider;
use crate::registry::v1::labels::LabelSelector;
use crate::util::Client as ClientTrait;
use crate::{error::ClientError, Translator};
use futures::{stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -84,33 +84,19 @@ where
/// If the user does not have access to the API, the server side may return "not found"
/// as a response instead of "forbidden".
#[instrument]
pub async fn list_apps<L>(&self, labels: Option<L>) -> ClientResult<Option<Vec<Application>>>
pub async fn list_apps<L>(
&self,
labels: Option<LabelSelector>,
) -> ClientResult<Option<Vec<Application>>>
where
L: IntoIterator + Debug,
L::Item: AsRef<str>,
{
let mut req = self.client().get(self.url(None, None)?);

// todo it would be cool to have a programmatic way to construct labels selectors
// using drogue-cloud-service-api::labels::LabelSelector
// Also, allocating strings from the `&str` we have is terrible,
// but using only `as_ref()` from the iter was dropping the reference after the loop.
if let Some(labels) = labels {
let label_string = labels
.into_iter()
.map(|item| item.as_ref().to_string())
.collect::<Vec<String>>()
.join(",");

req = req.query(&[("labels", label_string)]);
}
let url = self.url(None, None)?;

let req = req
.propagate_current_context()
.inject_token(self.token_provider())
.await?;
let labels = labels.map(|l| l.to_query_parameters());

Self::read_response(req.send().await?).await
self.read_with_query_parameters(url, labels).await
}

/// Get an application by name.
Expand Down Expand Up @@ -210,37 +196,19 @@ where
/// If the user does not have access to the API, the server side may return "not found"
/// as a response instead of "forbidden".
#[instrument]
pub async fn list_devices<A, L>(
pub async fn list_devices<A>(
&self,
application: A,
labels: Option<L>,
labels: Option<LabelSelector>,
) -> ClientResult<Option<Vec<Device>>>
where
A: AsRef<str> + Debug,
L: IntoIterator + Debug,
L::Item: AsRef<str>,
{
let mut req = self
.client()
.get(self.url(Some(application.as_ref()), Some(""))?);

// todo refactor this duplicated code
if let Some(labels) = labels {
let label_string = labels
.into_iter()
.map(|item| item.as_ref().to_string())
.collect::<Vec<String>>()
.join(",");

req = req.query(&[("labels", label_string)]);
}
let url = self.url(Some(application.as_ref()), Some(""))?;

let req = req
.propagate_current_context()
.inject_token(self.token_provider())
.await?;
let labels = labels.map(|l| l.to_query_parameters());

Self::read_response(req.send().await?).await
self.read_with_query_parameters(url, labels).await
}

/// Update (overwrite) an application.
Expand Down
146 changes: 146 additions & 0 deletions src/registry/v1/data/common/labels/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#[cfg(feature = "nom")]
mod parser;

#[cfg(feature = "nom")]
pub use parser::*;

use futures::future::join;
use nom::into;
#[cfg(feature = "nom")]
use std::convert::TryFrom;
use std::fmt;

#[derive(Default, Debug)]
pub struct LabelSelector(pub Vec<Operation>);

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Operation {
Eq(String, String),
NotEq(String, String),
In(String, Vec<String>),
NotIn(String, Vec<String>),
Exists(String),
NotExists(String),
}

#[cfg(feature = "nom")]
impl TryFrom<&str> for LabelSelector {
jbtrystram marked this conversation as resolved.
Show resolved Hide resolved
type Error = parser::ParserError;

fn try_from(value: &str) -> Result<Self, Self::Error> {
Ok(LabelSelector(parser::parse_from(value)?))
}
}

#[cfg(feature = "nom")]
impl TryFrom<String> for LabelSelector {
type Error = parser::ParserError;

fn try_from(value: String) -> Result<Self, Self::Error> {
Ok(LabelSelector(parser::parse_from(&value)?))
}
}

impl fmt::Display for Operation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let (left, operation, right) = match self {
Operation::Eq(l, r) => (l.clone(), "=".to_string(), r.clone()),
Operation::NotEq(l, r) => (l.clone(), "!=".to_string(), r.clone()),
Operation::In(l, r) => (
l.clone(),
" in ".to_string(),
["(", r.join(", ").as_str(), ")"].concat(),
),
Operation::NotIn(l, r) => (
l.clone(),
" notin ".to_string(),
["(", r.join(", ").as_str(), ")"].concat(),
),
Operation::Exists(l) => (l.clone(), "".to_string(), "".to_string()),
Operation::NotExists(l) => ("!".to_string(), l.clone(), "".to_string()),
};

write!(f, "{}{}{}", left, operation, right)
}
}

impl From<Operation> for LabelSelector {
fn from(op: Operation) -> Self {
LabelSelector(vec![op])
}
}

impl LabelSelector {
/// Add another operation to a label selector.
///
pub fn add(mut self, op: Operation) -> Self {
jbtrystram marked this conversation as resolved.
Show resolved Hide resolved
self.0.push(op);
return self;
}

/// Convert a LabelSelector into query parameters for use with reqwest
///
pub fn to_query_parameters(&self) -> Vec<(String, String)> {
let mut labs = Vec::new();
let _ = &self.0.iter().map(|op| labs.push(op.to_string().clone()));

let labs = labs.join(",");

vec![("labels".to_string(), labs)]
}
}

#[cfg(test)]
mod test {
use crate::registry::v1::labels::Operation;

#[test]
fn test_serialize_equals_operation() {
let op = Operation::Eq("zone".to_string(), "europe".to_string());
assert_eq!(op.to_string(), "zone=europe");
}

#[test]
fn test_serialize_not_equals_operation() {
let op = Operation::NotEq("zone".to_string(), "europe".to_string());
assert_eq!(op.to_string(), "zone!=europe");
}

#[test]
fn test_serialize_in_operation() {
let op = Operation::In(
"country".to_string(),
vec![
"france".to_string(),
"germany".to_string(),
"serbia".to_string(),
],
);
assert_eq!(op.to_string(), "country in (france, germany, serbia)");
}

#[test]
fn test_serialize_not_in_operation() {
let op = Operation::NotIn(
"country".to_string(),
vec![
"france".to_string(),
"germany".to_string(),
"serbia".to_string(),
],
);
assert_eq!(op.to_string(), "country notin (france, germany, serbia)");
}

#[test]
fn test_serialize_exists_operation() {
let op = Operation::Exists("power".to_string());
assert_eq!(op.to_string(), "power");
}

#[test]
fn test_serialize_not_exists_operation() {
let op = Operation::NotExists("power".to_string());
assert_eq!(op.to_string(), "!power");
}
}
Loading