Skip to content

Commit

Permalink
refactor: push table names down into DynamoStorage
Browse files Browse the repository at this point in the history
to avoid passing them around everywhere (eventually we'll move
message_month as well we're fully migrated)

Issue #33
  • Loading branch information
pjenvey committed Jul 20, 2018
1 parent 0e31edc commit e4fd231
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 82 deletions.
43 changes: 22 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,7 @@ where
let response = Box::new(data.srv.ddb.hello(
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_table_name,
&data.srv.opts.router_url,
&data.srv.opts.message_table_names,
&data.srv.opts.current_message_month,
&data.srv.metrics,
));
transition!(AwaitProcessHello {
Expand Down Expand Up @@ -567,12 +564,25 @@ where
},

#[state_machine_future(
transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput)
transitions(
IncrementStorage,
CheckStorage,
AwaitDropUser,
AwaitMigrateUser,
AwaitInput
)
)]
DetermineAck { data: AuthClientData<T> },

#[state_machine_future(
transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete)
transitions(
DetermineAck,
Send,
AwaitInput,
AwaitRegister,
AwaitUnregister,
AwaitDelete
)
)]
AwaitInput { data: AuthClientData<T> },

Expand Down Expand Up @@ -683,11 +693,7 @@ where
// Exceeded the max limit of stored messages: drop the user to trigger a
// re-register
debug!("Dropping user: exceeded msg_limit");
let response = Box::new(
data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid),
);
let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid));
transition!(AwaitDropUser { response, data });
} else if !smessages.is_empty() {
transition!(Send { smessages, data });
Expand All @@ -708,20 +714,15 @@ where
transition!(CheckStorage { data });
} else if all_acked && webpush.flags.rotate_message_table {
debug!("Triggering migration");
let response = Box::new(data.srv.ddb.migrate_user(
&webpush.uaid,
&webpush.message_month,
&data.srv.opts.current_message_month,
&data.srv.opts.router_table_name,
));
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
debug!("Dropping user: flagged reset_uaid");
let response = Box::new(
data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid),
.migrate_user(&webpush.uaid, &webpush.message_month),
);
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
debug!("Dropping user: flagged reset_uaid");
let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid));
transition!(AwaitDropUser { response, data });
}
transition!(AwaitInput { data })
Expand Down Expand Up @@ -993,7 +994,7 @@ where
let AwaitMigrateUser { data, .. } = await_migrate_user.take();
{
let mut webpush = data.webpush.borrow_mut();
webpush.message_month = data.srv.opts.current_message_month.clone();
webpush.message_month = data.srv.ddb.current_message_month.clone();
webpush.flags.rotate_message_table = false;
}
transition!(DetermineAck { data })
Expand Down
91 changes: 50 additions & 41 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod commands;
mod models;
use errors::*;
use protocol::Notification;
use server::Server;
use server::{Server, ServerOptions};
mod util;
use util::timing::sec_since_epoch;

Expand Down Expand Up @@ -62,10 +62,13 @@ pub enum RegisterResponse {

pub struct DynamoStorage {
ddb: Rc<Box<DynamoDb>>,
router_table_name: String,
message_table_names: Vec<String>,
pub current_message_month: String,
}

impl DynamoStorage {
pub fn new() -> Self {
pub fn from_opts(opts: &ServerOptions) -> Result<Self> {
let ddb: Box<DynamoDb> = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
Box::new(DynamoDbClient::new(
RequestDispatcher::default(),
Expand All @@ -78,27 +81,22 @@ impl DynamoStorage {
} else {
Box::new(DynamoDbClient::simple(Region::default()))
};
Self { ddb: Rc::new(ddb) }
}
let ddb = Rc::new(ddb);

pub fn list_message_tables(&self, prefix: &str) -> Result<Vec<String>> {
let mut names: Vec<String> = Vec::new();
let mut start_key = None;
loop {
let result = commands::list_tables(self.ddb.clone(), start_key).wait()?;
start_key = result.last_evaluated_table_name;
if let Some(table_names) = result.table_names {
names.extend(table_names);
}
if start_key.is_none() {
break;
}
}
let names = names
.into_iter()
.filter(|name| name.starts_with(prefix))
.collect();
Ok(names)
let mut message_table_names = list_message_tables(ddb.clone(), &opts._message_table_name)
.map_err(|_| "Failed to locate message tables")?;
message_table_names.sort_unstable();
let current_message_month = message_table_names
.last()
.ok_or("No last message month found")?
.to_string();

Ok(Self {
ddb,
router_table_name: opts._router_table_name.clone(),
message_table_names,
current_message_month,
})
}

pub fn increment_storage(
Expand Down Expand Up @@ -136,28 +134,24 @@ impl DynamoStorage {
&self,
connected_at: &u64,
uaid: Option<&Uuid>,
router_table_name: &str,
router_url: &str,
message_table_names: &[String],
current_message_month: &str,
metrics: &StatsdClient,
) -> impl Future<Item = HelloResponse, Error = Error> {
let router_table_name = router_table_name.to_string();
let response: MyFuture<(HelloResponse, Option<DynamoDbUser>)> = if let Some(uaid) = uaid {
commands::lookup_user(
self.ddb.clone(),
&uaid,
connected_at,
router_url,
&router_table_name,
message_table_names,
current_message_month,
&self.router_table_name,
&self.message_table_names,
&self.current_message_month,
metrics,
)
} else {
Box::new(future::ok((
HelloResponse {
message_month: current_message_month.to_string(),
message_month: self.current_message_month.clone(),
connected_at: *connected_at,
..Default::default()
},
Expand All @@ -166,6 +160,7 @@ impl DynamoStorage {
};
let ddb = self.ddb.clone();
let router_url = router_url.to_string();
let router_table_name = self.router_table_name.clone();
let connected_at = *connected_at;

response.and_then(move |(mut hello_response, user_opt)| {
Expand All @@ -179,7 +174,7 @@ impl DynamoStorage {
let uaid = user.uaid;
let mut err_response = hello_response.clone();
err_response.connected_at = connected_at;
commands::register_user(ddb, &user, router_table_name.as_ref())
commands::register_user(ddb, &user, &router_table_name)
.and_then(move |result| {
debug!("Success adding user, item output: {:?}", result);
hello_response.uaid = Some(uaid);
Expand Down Expand Up @@ -223,12 +218,8 @@ impl DynamoStorage {
Box::new(response)
}

pub fn drop_uaid(
&self,
table_name: &str,
uaid: &Uuid,
) -> impl Future<Item = (), Error = Error> {
commands::drop_user(self.ddb.clone(), uaid, table_name)
pub fn drop_uaid(&self, uaid: &Uuid) -> impl Future<Item = (), Error = Error> {
commands::drop_user(self.ddb.clone(), uaid, &self.router_table_name)
.and_then(|_| future::ok(()))
.chain_err(|| "Unable to drop user record")
}
Expand All @@ -249,15 +240,13 @@ impl DynamoStorage {
&self,
uaid: &Uuid,
message_month: &str,
current_message_month: &str,
router_table_name: &str,
) -> impl Future<Item = (), Error = Error> {
let uaid = *uaid;
let ddb = self.ddb.clone();
let ddb2 = self.ddb.clone();
let cur_month = current_message_month.to_string();
let cur_month = self.current_message_month.to_string();
let cur_month2 = cur_month.clone();
let router_table_name = router_table_name.to_string();
let router_table_name = self.router_table_name.clone();

commands::all_channels(self.ddb.clone(), &uaid, message_month)
.and_then(move |channels| -> MyFuture<_> {
Expand Down Expand Up @@ -404,3 +393,23 @@ impl DynamoStorage {
})
}
}

pub fn list_message_tables(ddb: Rc<Box<DynamoDb>>, prefix: &str) -> Result<Vec<String>> {
let mut names: Vec<String> = Vec::new();
let mut start_key = None;
loop {
let result = commands::list_tables(ddb.clone(), start_key).wait()?;
start_key = result.last_evaluated_table_name;
if let Some(table_names) = result.table_names {
names.extend(table_names);
}
if start_key.is_none() {
break;
}
}
let names = names
.into_iter()
.filter(|name| name.starts_with(prefix))
.collect();
Ok(names)
}
27 changes: 7 additions & 20 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ pub struct ServerOptions {
pub auto_ping_timeout: Duration,
pub max_connections: Option<u32>,
pub close_handshake_timeout: Option<Duration>,
pub message_table_names: Vec<String>,
pub current_message_month: String,
pub router_table_name: String,
pub _message_table_name: String,
pub _router_table_name: String,
pub router_url: String,
pub endpoint_url: String,
pub statsd_host: Option<String>,
Expand All @@ -157,13 +156,9 @@ impl ServerOptions {
.collect();
let fernet = MultiFernet::new(fernets);

let ddb = DynamoStorage::new();
let message_table_names = ddb
.list_message_tables(&settings.message_tablename)
.expect("Failed to locate message tables");
let router_url = settings.router_url();
let endpoint_url = settings.endpoint_url();
let mut opts = Self {
Ok(Self {
debug: settings.debug,
port: settings.port,
fernet,
Expand All @@ -174,9 +169,8 @@ impl ServerOptions {
Some(settings.statsd_host)
},
statsd_port: settings.statsd_port,
message_table_names,
current_message_month: "".to_string(),
router_table_name: settings.router_tablename,
_message_table_name: settings.message_tablename,
_router_table_name: settings.router_tablename,
router_url,
endpoint_url,
ssl_key: settings.router_ssl_key.map(PathBuf::from),
Expand All @@ -199,14 +193,7 @@ impl ServerOptions {
.expect("megaphone poll interval cannot be 0"),
human_logs: settings.human_logs,
msg_limit: settings.msg_limit,
};
opts.message_table_names.sort_unstable();
opts.current_message_month = opts
.message_table_names
.last()
.expect("No last message month found")
.to_string();
Ok(opts)
})
}
}

Expand Down Expand Up @@ -317,7 +304,7 @@ impl Server {
let srv = Rc::new(Server {
opts: opts.clone(),
broadcaster: RefCell::new(broadcaster),
ddb: DynamoStorage::new(),
ddb: DynamoStorage::from_opts(opts)?,
uaids: RefCell::new(HashMap::new()),
open_connections: Cell::new(0),
handle: core.handle(),
Expand Down

0 comments on commit e4fd231

Please sign in to comment.