Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add /notif/ HTTP API handler
Browse files Browse the repository at this point in the history
Refactors the HTTP handling slightly for easier additions in the
future.

Closes #1062
  • Loading branch information
bbangert committed Jan 25, 2018
1 parent 7ddddd4 commit f599ee3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 47 deletions.
9 changes: 0 additions & 9 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,6 @@ where
+ Sink<SinkItem = ServerMessage, SinkError = Error>
+ 'static,
{
fn input(&mut self) -> Poll<ClientMessage, Error> {
let item = match self.ws.poll()? {
Async::Ready(None) => return Err("Client dropped".into()),
Async::Ready(Some(msg)) => Async::Ready(msg),
Async::NotReady => Async::NotReady,
};
Ok(item)
}

fn input_with_timeout(&mut self, timeout: &mut Timeout) -> Poll<ClientMessage, Error> {
let item = match timeout.poll()? {
Async::Ready(_) => return Err("Client timed out".into()),
Expand Down
82 changes: 52 additions & 30 deletions autopush_rs/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
//! Dummy module that's very likely to get entirely removed
//! Internal router HTTP API
//!
//! For now just a small HTTP server used to send notifications to our dummy
//! clients.
//! Accepts PUT requests to deliver notifications to a connected client or trigger
//! a client to check storage.
//!
//! Valid URL's:
//! PUT /push/UAID - Deliver notification to a client
//! PUT /notify/UAID - Tell a client to check storage
use std::str;
use std::rc::Rc;

use futures::future::err;
use futures::future::ok;
use futures::{Stream, Future};
use hyper::Method;
use hyper::{Method, StatusCode};
use hyper;
use serde_json;
use tokio_service::Service;
Expand All @@ -25,44 +29,62 @@ impl Service for Push {
type Future = Box<Future<Item = hyper::Response, Error = hyper::Error>>;

fn call(&self, req: hyper::Request) -> Self::Future {
if *req.method() != Method::Put && *req.method() != Method::Post {
debug!("not a PUT: {}", req.method());
return Box::new(err(hyper::Error::Method));
}
if req.uri().path().len() == 0 {
debug!("empty uri path");
return Box::new(err(hyper::Error::Incomplete));
let mut response = hyper::Response::new();
let req_path = req.path().to_string();
let path_vec: Vec<&str> = req_path.split("/").collect();
if path_vec.len() != 3 {
response.set_status(StatusCode::NotFound);
return Box::new(ok(response));
}
let req_uaid = req.uri().path()[6..].to_string();
let uaid = match Uuid::parse_str(&req_uaid) {
let (method_name, uaid) = (path_vec[1], path_vec[2]);
let uaid = match Uuid::parse_str(uaid) {
Ok(id) => id,
Err(_) => {
debug!("uri not uuid: {}", req.uri().to_string());
return Box::new(err(hyper::Error::Status));
response.set_status(StatusCode::BadRequest);
return Box::new(ok(response));
}
};

debug!("Got a message, now to do something!");

let body = req.body().concat2();
let srv = self.0.clone();
Box::new(body.and_then(move |body| {
let s = String::from_utf8(body.to_vec()).unwrap();
if let Ok(msg) = serde_json::from_str(&s) {
match srv.notify_client(uaid, msg) {
Ok(_) => return Ok(hyper::Response::new().with_status(hyper::StatusCode::Ok)),
_ => {
return Ok(
match (req.method(), method_name, uaid) {
(&Method::Put, "push", uaid) => {
// Due to consumption of body as a future we must return here
let body = req.body().concat2();
return Box::new(body.and_then(move |body| {
let s = String::from_utf8(body.to_vec()).unwrap();
if let Ok(msg) = serde_json::from_str(&s) {
match srv.notify_client(uaid, msg) {
Ok(_) => Ok(hyper::Response::new().with_status(StatusCode::Ok)),
_ => {
Ok(
hyper::Response::new()
.with_status(StatusCode::BadGateway)
.with_body("Client not available."),
)
}
}
} else {
Ok(
hyper::Response::new()
.with_status(hyper::StatusCode::BadRequest)
.with_body("Unable to decode body payload"),
)
}
}))
}
(&Method::Put, "notif", uaid) => {
match srv.check_client_storage(uaid) {
Ok(_) => response.set_status(StatusCode::Ok),
_ => {
response.set_status(StatusCode::BadGateway);
response.set_body("Client not available.");
}
}
}
Ok(hyper::Response::new().with_status(
hyper::StatusCode::NotFound,
))
}))
(_, "push", _) => response.set_status(StatusCode::MethodNotAllowed),
(_, "notif", _) => response.set_status(StatusCode::MethodNotAllowed),
_ => response.set_status(StatusCode::NotFound),
};
Box::new(ok(response))
}
}
31 changes: 23 additions & 8 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,16 +449,31 @@ impl Server {

/// A notification has come for the uaid
pub fn notify_client(&self, uaid: Uuid, notif: Notification) -> Result<()> {
let mut uaids = self.uaids.borrow_mut();
if let Some(client) = uaids.get_mut(&uaid) {
let uaids = self.uaids.borrow();
if let Some(client) = uaids.get(&uaid) {
debug!("Found a client to deliver a notification to");
// TODO: Don't unwrap, handle error properly
client
let result = client
.tx
.unbounded_send(ServerNotification::Notification(notif));
if result.is_ok() {
debug!("Dropped notification in queue");
return Ok(());
}
}
Err("User not connected".into())
}

/// A check for notification command has come for the uaid
pub fn check_client_storage(&self, uaid: Uuid) -> Result<()> {
let uaids = self.uaids.borrow();
if let Some(client) = uaids.get(&uaid) {
let result = client
.tx
.unbounded_send(ServerNotification::Notification(notif))
.unwrap();
debug!("Dropped notification in queue");
return Ok(());
.unbounded_send(ServerNotification::CheckStorage);
if result.is_ok() {
debug!("Told client to check storage");
return Ok(());
}
}
Err("User not connected".into())
}
Expand Down

0 comments on commit f599ee3

Please sign in to comment.