diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 56c93805..0125b9a4 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -531,15 +531,6 @@ where + Sink + 'static, { - fn input(&mut self) -> Poll { - let item = match self.ws.poll()? { - Async::Ready(None) => return Err("Client dropped".into()), - Async::Ready(Some(msg)) => Async::Ready(msg), - Async::NotReady => Async::NotReady, - }; - Ok(item) - } - fn input_with_timeout(&mut self, timeout: &mut Timeout) -> Poll { let item = match timeout.poll()? { Async::Ready(_) => return Err("Client timed out".into()), diff --git a/autopush_rs/src/http.rs b/autopush_rs/src/http.rs index b1f52b14..3f5802ef 100644 --- a/autopush_rs/src/http.rs +++ b/autopush_rs/src/http.rs @@ -1,14 +1,18 @@ -//! Dummy module that's very likely to get entirely removed +//! Internal router HTTP API //! -//! For now just a small HTTP server used to send notifications to our dummy -//! clients. +//! Accepts PUT requests to deliver notifications to a connected client or trigger +//! a client to check storage. +//! +//! Valid URL's: +//! PUT /push/UAID - Deliver notification to a client +//! PUT /notify/UAID - Tell a client to check storage use std::str; use std::rc::Rc; -use futures::future::err; +use futures::future::ok; use futures::{Stream, Future}; -use hyper::Method; +use hyper::{Method, StatusCode}; use hyper; use serde_json; use tokio_service::Service; @@ -25,44 +29,62 @@ impl Service for Push { type Future = Box>; fn call(&self, req: hyper::Request) -> Self::Future { - if *req.method() != Method::Put && *req.method() != Method::Post { - debug!("not a PUT: {}", req.method()); - return Box::new(err(hyper::Error::Method)); - } - if req.uri().path().len() == 0 { - debug!("empty uri path"); - return Box::new(err(hyper::Error::Incomplete)); + let mut response = hyper::Response::new(); + let req_path = req.path().to_string(); + let path_vec: Vec<&str> = req_path.split("/").collect(); + if path_vec.len() != 3 { + response.set_status(StatusCode::NotFound); + return Box::new(ok(response)); } - let req_uaid = req.uri().path()[6..].to_string(); - let uaid = match Uuid::parse_str(&req_uaid) { + let (method_name, uaid) = (path_vec[1], path_vec[2]); + let uaid = match Uuid::parse_str(uaid) { Ok(id) => id, Err(_) => { debug!("uri not uuid: {}", req.uri().to_string()); - return Box::new(err(hyper::Error::Status)); + response.set_status(StatusCode::BadRequest); + return Box::new(ok(response)); } }; - - debug!("Got a message, now to do something!"); - - let body = req.body().concat2(); let srv = self.0.clone(); - Box::new(body.and_then(move |body| { - let s = String::from_utf8(body.to_vec()).unwrap(); - if let Ok(msg) = serde_json::from_str(&s) { - match srv.notify_client(uaid, msg) { - Ok(_) => return Ok(hyper::Response::new().with_status(hyper::StatusCode::Ok)), - _ => { - return Ok( + match (req.method(), method_name, uaid) { + (&Method::Put, "push", uaid) => { + // Due to consumption of body as a future we must return here + let body = req.body().concat2(); + return Box::new(body.and_then(move |body| { + let s = String::from_utf8(body.to_vec()).unwrap(); + if let Ok(msg) = serde_json::from_str(&s) { + match srv.notify_client(uaid, msg) { + Ok(_) => Ok(hyper::Response::new().with_status(StatusCode::Ok)), + _ => { + Ok( + hyper::Response::new() + .with_status(StatusCode::BadGateway) + .with_body("Client not available."), + ) + } + } + } else { + Ok( hyper::Response::new() .with_status(hyper::StatusCode::BadRequest) .with_body("Unable to decode body payload"), ) } + })) + } + (&Method::Put, "notif", uaid) => { + match srv.check_client_storage(uaid) { + Ok(_) => response.set_status(StatusCode::Ok), + _ => { + response.set_status(StatusCode::BadGateway); + response.set_body("Client not available."); + } } } - Ok(hyper::Response::new().with_status( - hyper::StatusCode::NotFound, - )) - })) + (_, "push", _) => response.set_status(StatusCode::MethodNotAllowed), + (_, "notif", _) => response.set_status(StatusCode::MethodNotAllowed), + _ => response.set_status(StatusCode::NotFound), + }; + Box::new(ok(response)) } } diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index b6b0d3f0..183c49af 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -449,16 +449,31 @@ impl Server { /// A notification has come for the uaid pub fn notify_client(&self, uaid: Uuid, notif: Notification) -> Result<()> { - let mut uaids = self.uaids.borrow_mut(); - if let Some(client) = uaids.get_mut(&uaid) { + let uaids = self.uaids.borrow(); + if let Some(client) = uaids.get(&uaid) { debug!("Found a client to deliver a notification to"); - // TODO: Don't unwrap, handle error properly - client + let result = client + .tx + .unbounded_send(ServerNotification::Notification(notif)); + if result.is_ok() { + debug!("Dropped notification in queue"); + return Ok(()); + } + } + Err("User not connected".into()) + } + + /// A check for notification command has come for the uaid + pub fn check_client_storage(&self, uaid: Uuid) -> Result<()> { + let uaids = self.uaids.borrow(); + if let Some(client) = uaids.get(&uaid) { + let result = client .tx - .unbounded_send(ServerNotification::Notification(notif)) - .unwrap(); - debug!("Dropped notification in queue"); - return Ok(()); + .unbounded_send(ServerNotification::CheckStorage); + if result.is_ok() { + debug!("Told client to check storage"); + return Ok(()); + } } Err("User not connected".into()) }