diff --git a/src/client.rs b/src/client.rs index 42435b789..4b43f4125 100644 --- a/src/client.rs +++ b/src/client.rs @@ -343,10 +343,7 @@ where let response = Box::new(data.srv.ddb.hello( &connected_at, uaid.as_ref(), - &data.srv.opts.router_table_name, &data.srv.opts.router_url, - &data.srv.opts.message_table_names, - &data.srv.opts.current_message_month, &data.srv.metrics, )); transition!(AwaitProcessHello { @@ -567,12 +564,25 @@ where }, #[state_machine_future( - transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput) + transitions( + IncrementStorage, + CheckStorage, + AwaitDropUser, + AwaitMigrateUser, + AwaitInput + ) )] DetermineAck { data: AuthClientData }, #[state_machine_future( - transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete) + transitions( + DetermineAck, + Send, + AwaitInput, + AwaitRegister, + AwaitUnregister, + AwaitDelete + ) )] AwaitInput { data: AuthClientData }, @@ -683,11 +693,7 @@ where // Exceeded the max limit of stored messages: drop the user to trigger a // re-register debug!("Dropping user: exceeded msg_limit"); - let response = Box::new( - data.srv - .ddb - .drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid), - ); + let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid)); transition!(AwaitDropUser { response, data }); } else if !smessages.is_empty() { transition!(Send { smessages, data }); @@ -708,20 +714,15 @@ where transition!(CheckStorage { data }); } else if all_acked && webpush.flags.rotate_message_table { debug!("Triggering migration"); - let response = Box::new(data.srv.ddb.migrate_user( - &webpush.uaid, - &webpush.message_month, - &data.srv.opts.current_message_month, - &data.srv.opts.router_table_name, - )); - transition!(AwaitMigrateUser { response, data }); - } else if all_acked && webpush.flags.reset_uaid { - debug!("Dropping user: flagged reset_uaid"); let response = Box::new( data.srv .ddb - .drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid), + .migrate_user(&webpush.uaid, &webpush.message_month), ); + transition!(AwaitMigrateUser { response, data }); + } else if all_acked && webpush.flags.reset_uaid { + debug!("Dropping user: flagged reset_uaid"); + let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid)); transition!(AwaitDropUser { response, data }); } transition!(AwaitInput { data }) @@ -1006,7 +1007,7 @@ where let AwaitMigrateUser { data, .. } = await_migrate_user.take(); { let mut webpush = data.webpush.borrow_mut(); - webpush.message_month = data.srv.opts.current_message_month.clone(); + webpush.message_month = data.srv.ddb.current_message_month.clone(); webpush.flags.rotate_message_table = false; } transition!(DetermineAck { data }) diff --git a/src/db/mod.rs b/src/db/mod.rs index 09726e9d1..032f56810 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -22,7 +22,7 @@ mod commands; mod models; use errors::*; use protocol::Notification; -use server::Server; +use server::{Server, ServerOptions}; mod util; use util::timing::sec_since_epoch; @@ -62,10 +62,13 @@ pub enum RegisterResponse { pub struct DynamoStorage { ddb: Rc>, + router_table_name: String, + message_table_names: Vec, + pub current_message_month: String, } impl DynamoStorage { - pub fn new() -> Self { + pub fn from_opts(opts: &ServerOptions) -> Result { let ddb: Box = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { Box::new(DynamoDbClient::new( RequestDispatcher::default(), @@ -78,27 +81,22 @@ impl DynamoStorage { } else { Box::new(DynamoDbClient::simple(Region::default())) }; - Self { ddb: Rc::new(ddb) } - } + let ddb = Rc::new(ddb); - pub fn list_message_tables(&self, prefix: &str) -> Result> { - let mut names: Vec = Vec::new(); - let mut start_key = None; - loop { - let result = commands::list_tables(self.ddb.clone(), start_key).wait()?; - start_key = result.last_evaluated_table_name; - if let Some(table_names) = result.table_names { - names.extend(table_names); - } - if start_key.is_none() { - break; - } - } - let names = names - .into_iter() - .filter(|name| name.starts_with(prefix)) - .collect(); - Ok(names) + let mut message_table_names = list_message_tables(&ddb, &opts._message_table_name) + .map_err(|_| "Failed to locate message tables")?; + message_table_names.sort_unstable(); + let current_message_month = message_table_names + .last() + .ok_or("No last message month found")? + .to_string(); + + Ok(Self { + ddb, + router_table_name: opts._router_table_name.clone(), + message_table_names, + current_message_month, + }) } pub fn increment_storage( @@ -136,28 +134,24 @@ impl DynamoStorage { &self, connected_at: &u64, uaid: Option<&Uuid>, - router_table_name: &str, router_url: &str, - message_table_names: &[String], - current_message_month: &str, metrics: &StatsdClient, ) -> impl Future { - let router_table_name = router_table_name.to_string(); let response: MyFuture<(HelloResponse, Option)> = if let Some(uaid) = uaid { commands::lookup_user( self.ddb.clone(), &uaid, connected_at, router_url, - &router_table_name, - message_table_names, - current_message_month, + &self.router_table_name, + &self.message_table_names, + &self.current_message_month, metrics, ) } else { Box::new(future::ok(( HelloResponse { - message_month: current_message_month.to_string(), + message_month: self.current_message_month.clone(), connected_at: *connected_at, ..Default::default() }, @@ -166,6 +160,7 @@ impl DynamoStorage { }; let ddb = self.ddb.clone(); let router_url = router_url.to_string(); + let router_table_name = self.router_table_name.clone(); let connected_at = *connected_at; response.and_then(move |(mut hello_response, user_opt)| { @@ -179,7 +174,7 @@ impl DynamoStorage { let uaid = user.uaid; let mut err_response = hello_response.clone(); err_response.connected_at = connected_at; - commands::register_user(ddb, &user, router_table_name.as_ref()) + commands::register_user(ddb, &user, &router_table_name) .and_then(move |result| { debug!("Success adding user, item output: {:?}", result); hello_response.uaid = Some(uaid); @@ -223,12 +218,8 @@ impl DynamoStorage { Box::new(response) } - pub fn drop_uaid( - &self, - table_name: &str, - uaid: &Uuid, - ) -> impl Future { - commands::drop_user(self.ddb.clone(), uaid, table_name) + pub fn drop_uaid(&self, uaid: &Uuid) -> impl Future { + commands::drop_user(self.ddb.clone(), uaid, &self.router_table_name) .and_then(|_| future::ok(())) .chain_err(|| "Unable to drop user record") } @@ -249,15 +240,13 @@ impl DynamoStorage { &self, uaid: &Uuid, message_month: &str, - current_message_month: &str, - router_table_name: &str, ) -> impl Future { let uaid = *uaid; let ddb = self.ddb.clone(); let ddb2 = self.ddb.clone(); - let cur_month = current_message_month.to_string(); + let cur_month = self.current_message_month.to_string(); let cur_month2 = cur_month.clone(); - let router_table_name = router_table_name.to_string(); + let router_table_name = self.router_table_name.clone(); commands::all_channels(self.ddb.clone(), &uaid, message_month) .and_then(move |channels| -> MyFuture<_> { @@ -404,3 +393,23 @@ impl DynamoStorage { }) } } + +pub fn list_message_tables(ddb: &Rc>, prefix: &str) -> Result> { + let mut names: Vec = Vec::new(); + let mut start_key = None; + loop { + let result = commands::list_tables(Rc::clone(ddb), start_key).wait()?; + start_key = result.last_evaluated_table_name; + if let Some(table_names) = result.table_names { + names.extend(table_names); + } + if start_key.is_none() { + break; + } + } + let names = names + .into_iter() + .filter(|name| name.starts_with(prefix)) + .collect(); + Ok(names) +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 7f6d5a12c..a21070bcf 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -129,9 +129,8 @@ pub struct ServerOptions { pub auto_ping_timeout: Duration, pub max_connections: Option, pub close_handshake_timeout: Option, - pub message_table_names: Vec, - pub current_message_month: String, - pub router_table_name: String, + pub _message_table_name: String, + pub _router_table_name: String, pub router_url: String, pub endpoint_url: String, pub statsd_host: Option, @@ -157,13 +156,9 @@ impl ServerOptions { .collect(); let fernet = MultiFernet::new(fernets); - let ddb = DynamoStorage::new(); - let message_table_names = ddb - .list_message_tables(&settings.message_tablename) - .expect("Failed to locate message tables"); let router_url = settings.router_url(); let endpoint_url = settings.endpoint_url(); - let mut opts = Self { + Ok(Self { debug: settings.debug, port: settings.port, fernet, @@ -174,9 +169,8 @@ impl ServerOptions { Some(settings.statsd_host) }, statsd_port: settings.statsd_port, - message_table_names, - current_message_month: "".to_string(), - router_table_name: settings.router_tablename, + _message_table_name: settings.message_tablename, + _router_table_name: settings.router_tablename, router_url, endpoint_url, ssl_key: settings.router_ssl_key.map(PathBuf::from), @@ -199,14 +193,7 @@ impl ServerOptions { .expect("megaphone poll interval cannot be 0"), human_logs: settings.human_logs, msg_limit: settings.msg_limit, - }; - opts.message_table_names.sort_unstable(); - opts.current_message_month = opts - .message_table_names - .last() - .expect("No last message month found") - .to_string(); - Ok(opts) + }) } } @@ -317,7 +304,7 @@ impl Server { let srv = Rc::new(Server { opts: opts.clone(), broadcaster: RefCell::new(broadcaster), - ddb: DynamoStorage::new(), + ddb: DynamoStorage::from_opts(opts)?, uaids: RefCell::new(HashMap::new()), open_connections: Cell::new(0), handle: core.handle(),