Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
[databroker-cli] Add support for access token
Browse files Browse the repository at this point in the history
  • Loading branch information
argerus committed Feb 21, 2023
1 parent 095f9f7 commit 0ca1b37
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 63 deletions.
77 changes: 69 additions & 8 deletions kuksa_databroker/databroker-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tonic::transport::Channel;

pub struct Client {
pub uri: Uri,
pub token: Option<tonic::metadata::AsciiMetadataValue>,
pub channel: Option<tonic::transport::Channel>,
connection_state_subs: Option<tokio::sync::broadcast::Sender<ConnectionState>>,
}
Expand All @@ -46,15 +47,40 @@ impl std::fmt::Display for ClientError {
}
}

#[derive(Debug)]
pub enum TokenError {
MalformedTokenError(String),
}

impl std::error::Error for TokenError {}
impl std::fmt::Display for TokenError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TokenError::MalformedTokenError(msg) => f.pad(msg),
}
}
}

impl Client {
pub fn new(uri: Uri) -> Self {
Client {
uri,
token: None,
channel: None,
connection_state_subs: None,
}
}

pub fn set_access_token(&mut self, token: impl AsRef<str>) -> Result<(), TokenError> {
match tonic::metadata::AsciiMetadataValue::from_str(&format!("Bearer {}", token.as_ref())) {
Ok(token) => {
self.token = Some(token);
Ok(())
}
Err(err) => Err(TokenError::MalformedTokenError(format!("{err}"))),
}
}

pub fn is_connected(&self) -> bool {
self.channel.is_some()
}
Expand Down Expand Up @@ -124,8 +150,10 @@ impl Client {
&mut self,
paths: Vec<String>,
) -> Result<Vec<proto::v1::Metadata>, ClientError> {
let mut client =
proto::v1::broker_client::BrokerClient::new(self.get_channel().await?.clone());
let mut client = proto::v1::broker_client::BrokerClient::with_interceptor(
self.get_channel().await?.clone(),
self.get_auth_interceptor(),
);
// Empty vec == all property metadata
let args = tonic::Request::new(proto::v1::GetMetadataRequest { names: paths });
match client.get_metadata(args).await {
Expand All @@ -144,8 +172,10 @@ impl Client {
HashMap<std::string::String, databroker_proto::sdv::databroker::v1::Datapoint>,
ClientError,
> {
let mut client =
proto::v1::broker_client::BrokerClient::new(self.get_channel().await?.clone());
let mut client = proto::v1::broker_client::BrokerClient::with_interceptor(
self.get_channel().await?.clone(),
self.get_auth_interceptor(),
);
let args = tonic::Request::new(proto::v1::GetDatapointsRequest {
datapoints: paths.iter().map(|path| path.as_ref().into()).collect(),
});
Expand All @@ -158,12 +188,29 @@ impl Client {
}
}

pub async fn set_datapoints(
&mut self,
datapoints: HashMap<String, proto::v1::Datapoint>,
) -> Result<proto::v1::SetDatapointsReply, ClientError> {
let args = tonic::Request::new(proto::v1::SetDatapointsRequest { datapoints });
let mut client = proto::v1::broker_client::BrokerClient::with_interceptor(
self.get_channel().await?.clone(),
self.get_auth_interceptor(),
);
match client.set_datapoints(args).await {
Ok(response) => Ok(response.into_inner()),
Err(err) => Err(ClientError::Status(err)),
}
}

pub async fn subscribe(
&mut self,
query: String,
) -> Result<tonic::Streaming<proto::v1::SubscribeReply>, ClientError> {
let mut client =
proto::v1::broker_client::BrokerClient::new(self.get_channel().await?.clone());
let mut client = proto::v1::broker_client::BrokerClient::with_interceptor(
self.get_channel().await?.clone(),
self.get_auth_interceptor(),
);
let args = tonic::Request::new(proto::v1::SubscribeRequest { query });

match client.subscribe(args).await {
Expand All @@ -176,13 +223,27 @@ impl Client {
&mut self,
datapoints: HashMap<i32, proto::v1::Datapoint>,
) -> Result<proto::v1::UpdateDatapointsReply, ClientError> {
let mut client =
proto::v1::collector_client::CollectorClient::new(self.get_channel().await?.clone());
let mut client = proto::v1::collector_client::CollectorClient::with_interceptor(
self.get_channel().await?.clone(),
self.get_auth_interceptor(),
);

let request = tonic::Request::new(proto::v1::UpdateDatapointsRequest { datapoints });
match client.update_datapoints(request).await {
Ok(response) => Ok(response.into_inner()),
Err(err) => Err(ClientError::Status(err)),
}
}

fn get_auth_interceptor(
&mut self,
) -> impl FnMut(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> + '_ {
move |mut req: tonic::Request<()>| {
if let Some(token) = &self.token {
// debug!("Inserting auth token: {:?}", token);
req.metadata_mut().insert("authorization", token.clone());
}
Ok(req)
}
}
}
147 changes: 92 additions & 55 deletions kuksa_databroker/databroker-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,24 @@ const TIMEOUT: Duration = Duration::from_millis(500);
const CLI_COMMANDS: &[(&str, &str, &str)] = &[
("connect", "[URI]", "Connect to server"),
("get", "<PATH> [[PATH] ...]", "Get signal value(s)"),
(
"provide",
"<PATH> <VALUE>",
"Provide (publish) a signal value",
),
("actuate", "<PATH> <VALUE>", "Actuate signal"),
("subscribe", "<QUERY>", "Subscribe to signals using a query"),
(
"metadata",
"[FILTER]",
"Fetch metadata. Provide FILTER to view metadata of signals matching filter.",
),
(
"provide",
"<PATH> <VALUE>",
"Provide (publish) a signal value",
),
("token", "<TOKEN>", "Use TOKEN as access token"),
(
"token-file",
"<FILE>",
"Use content of FILE as access token",
),
("help", "", "You're looking at it."),
("quit", "", "Quit"),
];
Expand All @@ -63,6 +69,9 @@ struct Cli {

// #[clap(short, long)]
// port: Option<u16>,
/// File containing access token
#[clap(long, value_name = "FILE", display_order = 2)]
token_file: Option<String>,

// Sub command
#[clap(subcommand)]
Expand Down Expand Up @@ -97,6 +106,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();

let mut client = client::Client::new(to_uri(cli.server)?);
if let Some(token_filename) = cli.token_file {
let token = std::fs::read_to_string(token_filename)?;
client.set_access_token(token)?;
}

let mut connection_state_subscription = client.subscribe_to_connection_state();
let interface_ref = interface.clone();
Expand Down Expand Up @@ -208,6 +221,45 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
}
"token" => {
interface.add_history_unique(line.clone());

if args.is_empty() {
print_usage(cmd);
continue;
}

match client.set_access_token(args) {
Ok(()) => {
print_info("Access token set.")?;
}
Err(err) => print_error(cmd, &format!("Malformed token: {err}"))?,
}
}
"token-file" => {
interface.add_history_unique(line.clone());

if args.is_empty() {
print_usage(cmd);
continue;
}

let token_filename = args.trim();
match std::fs::read_to_string(token_filename) {
Ok(token) => match client.set_access_token(token) {
Ok(()) => print_info("Access token set.")?,
Err(err) => {
print_error(cmd, &format!("Malformed token: {err}"))?
}
},
Err(err) => print_error(
cmd,
&format!(
"Failed to open token file \"{token_filename}\": {err}"
),
)?,
}
}
"actuate" => {
interface.add_history_unique(line.clone());

Expand Down Expand Up @@ -248,61 +300,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
continue;
}
match client.get_channel().await {
Ok(channel) => {
let mut client =
proto::v1::broker_client::BrokerClient::new(
channel.clone(),
);
let ts = Timestamp::from(SystemTime::now());
let datapoints = HashMap::from([(
metadata.name.clone(),
proto::v1::Datapoint {
timestamp: Some(ts),
value: Some(data_value.unwrap()),
},
)]);
let request =
tonic::Request::new(proto::v1::SetDatapointsRequest {
datapoints,
});
let ts = Timestamp::from(SystemTime::now());
let datapoints = HashMap::from([(
metadata.name.clone(),
proto::v1::Datapoint {
timestamp: Some(ts),
value: Some(data_value.unwrap()),
},
)]);

match client.set_datapoints(request).await {
Ok(response) => {
let message = response.into_inner();
if message.errors.is_empty() {
print_resp_ok(cmd)?;
} else {
for (id, error) in message.errors {
match proto::v1::DatapointError::from_i32(
error,
) {
Some(error) => {
print_resp_ok(cmd)?;
println!(
"Error setting {}: {}",
id,
Color::Red.paint(format!(
"{error:?}"
)),
);
}
None => print_resp_ok_fmt(
cmd,
format_args!(
"Error setting id {id}"
),
)?,
}
match client.set_datapoints(datapoints).await {
Ok(message) => {
if message.errors.is_empty() {
print_resp_ok(cmd)?;
} else {
for (id, error) in message.errors {
match proto::v1::DatapointError::from_i32(error) {
Some(error) => {
print_resp_ok(cmd)?;
println!(
"Error setting {}: {}",
id,
Color::Red.paint(format!("{error:?}")),
);
}
None => print_resp_ok_fmt(
cmd,
format_args!("Error setting id {id}"),
)?,
}
}
Err(err) => print_resp_err(cmd, &err)?,
}
}
Err(err) => {
print_error(cmd, format!("{err}"))?;
Err(ClientError::Status(status)) => {
print_resp_err(cmd, &status)?
}
Err(ClientError::Connection(msg)) => print_error(cmd, msg)?,
}
}
}
Expand All @@ -312,7 +345,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (path, value) = split_first_word(args);

if value.is_empty() {
println!("Usage: publish PATH VALUE");
print_usage(cmd);
continue;
}

Expand Down Expand Up @@ -386,7 +419,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
interface.add_history_unique(line.clone());

if args.is_empty() {
println!("Usage: subscribe QUERY");
print_usage(cmd);
continue;
}

Expand Down Expand Up @@ -1044,6 +1077,10 @@ impl<Term: Terminal> Completer<Term> for CliCompleter {
}
}
},
Some("token-file") => {
let path_completer = linefeed::complete::PathCompleter;
path_completer.complete(word, prompter, start, _end)
}
_ => None,
}
}
Expand Down

0 comments on commit 0ca1b37

Please sign in to comment.