diff --git a/.travis.yml b/.travis.yml index 32bdf711e..26b25eac9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,7 +23,10 @@ install: else cargo build || travis_terminate 1 fi + rustup component add rustfmt-preview + script: +- cargo fmt -- --check - py.test -v - cargo test notifications: diff --git a/src/client.rs b/src/client.rs index c49e5c8a6..4a37af461 100644 --- a/src/client.rs +++ b/src/client.rs @@ -510,11 +510,21 @@ where id: Some(webpush.uaid.to_simple().to_string()), ..Default::default() }); - event.tags.insert("ua_name".to_string(), ua_result.name.to_string()); - event.tags.insert("ua_os_family".to_string(), metrics_os.to_string()); - event.tags.insert("ua_os_ver".to_string(), ua_result.os_version.to_string()); - event.tags.insert("ua_browser_family".to_string(), metrics_browser.to_string()); - event.tags.insert("ua_browser_ver".to_string(), ua_result.version.to_string()); + event + .tags + .insert("ua_name".to_string(), ua_result.name.to_string()); + event + .tags + .insert("ua_os_family".to_string(), metrics_os.to_string()); + event + .tags + .insert("ua_os_ver".to_string(), ua_result.os_version.to_string()); + event + .tags + .insert("ua_browser_family".to_string(), metrics_browser.to_string()); + event + .tags + .insert("ua_browser_ver".to_string(), ua_result.version.to_string()); sentry::capture_event(event); err.display_chain().to_string() } else { @@ -629,14 +639,23 @@ where data: AuthClientData, }, - #[state_machine_future( - transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput) - )] + #[state_machine_future(transitions( + IncrementStorage, + CheckStorage, + AwaitDropUser, + AwaitMigrateUser, + AwaitInput + ))] DetermineAck { data: AuthClientData }, - #[state_machine_future( - transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete) - )] + #[state_machine_future(transitions( + DetermineAck, + Send, + AwaitInput, + AwaitRegister, + AwaitUnregister, + AwaitDelete + ))] AwaitInput { data: AuthClientData }, #[state_machine_future(transitions(AwaitIncrementStorage))] @@ -790,9 +809,11 @@ where let webpush_rc = data.webpush.clone(); let mut webpush = webpush_rc.borrow_mut(); match input { - Either::A(ClientMessage::Hello { .. }) => { - Err(ErrorKind::InvalidStateTransition("AwaitInput".to_string(), "Hello".to_string()).into()) - } + Either::A(ClientMessage::Hello { .. }) => Err(ErrorKind::InvalidStateTransition( + "AwaitInput".to_string(), + "Hello".to_string(), + ) + .into()), Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { let broadcast_delta = { let mut broadcast_subs = data.broadcast_subs.borrow_mut(); @@ -855,14 +876,15 @@ where } Either::A(ClientMessage::Nack { code, .. }) => { // only metric codes expected from the client (or 0) - let mcode = - code.and_then(|code| { + let mcode = code + .and_then(|code| { if code >= 301 && code <= 303 { Some(code) } else { None } - }).unwrap_or(0); + }) + .unwrap_or(0); data.srv .metrics .incr_with_tags("ua.command.nack") diff --git a/src/db/commands.rs b/src/db/commands.rs index 357b06603..0b5ae0c24 100644 --- a/src/db/commands.rs +++ b/src/db/commands.rs @@ -72,46 +72,47 @@ pub fn fetch_messages( }; let metrics = Rc::clone(metrics); - let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); - retry_if(move || ddb.query(input.clone()), cond) - .chain_err(|| "Error fetching messages") - .and_then(move |output| { - let mut notifs: Vec = - output.items.map_or_else(Vec::new, |items| { - debug!("Got response of: {:?}", items); - items - .into_iter() - .inspect(|i| debug!("Item: {:?}", i)) - .filter_map(|item| { - let item2 = item.clone(); - ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { - conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap") - }) - }) - .collect() - }); - if notifs.is_empty() { - return Ok(Default::default()); - } - - // Load the current_timestamp from the subscription registry entry which is - // the first DynamoDbNotification and remove it from the vec. - let timestamp = notifs.remove(0).current_timestamp; - // Convert any remaining DynamoDbNotifications to Notification's - let messages = notifs + retry_if( + move || ddb.query(input.clone()), + |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)), + ) + .chain_err(|| "Error fetching messages") + .and_then(move |output| { + let mut notifs: Vec = output.items.map_or_else(Vec::new, |items| { + debug!("Got response of: {:?}", items); + items .into_iter() - .filter_map(|ddb_notif| { - let ddb_notif2 = ddb_notif.clone(); - ok_or_inspect(ddb_notif.into_notif(), |e| { - conversion_err(&metrics, e, ddb_notif2, "into_notif") + .inspect(|i| debug!("Item: {:?}", i)) + .filter_map(|item| { + let item2 = item.clone(); + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap") }) }) - .collect(); - Ok(FetchMessageResponse { - timestamp, - messages, + .collect() + }); + if notifs.is_empty() { + return Ok(Default::default()); + } + + // Load the current_timestamp from the subscription registry entry which is + // the first DynamoDbNotification and remove it from the vec. + let timestamp = notifs.remove(0).current_timestamp; + // Convert any remaining DynamoDbNotifications to Notification's + let messages = notifs + .into_iter() + .filter_map(|ddb_notif| { + let ddb_notif2 = ddb_notif.clone(); + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, ddb_notif2, "into_notif") + }) }) + .collect(); + Ok(FetchMessageResponse { + timestamp, + messages, }) + }) } pub fn fetch_timestamp_messages( @@ -141,34 +142,36 @@ pub fn fetch_timestamp_messages( }; let metrics = Rc::clone(metrics); - let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); - retry_if(move || ddb.query(input.clone()), cond) - .chain_err(|| "Error fetching messages") - .and_then(move |output| { - let messages = output.items.map_or_else(Vec::new, |items| { - debug!("Got response of: {:?}", items); - items - .into_iter() - .filter_map(|item| { - let item2 = item.clone(); - ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { - conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap") - }) + retry_if( + move || ddb.query(input.clone()), + |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)), + ) + .chain_err(|| "Error fetching messages") + .and_then(move |output| { + let messages = output.items.map_or_else(Vec::new, |items| { + debug!("Got response of: {:?}", items); + items + .into_iter() + .filter_map(|item| { + let item2 = item.clone(); + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, item2, "serde_dynamodb_from_hashmap") }) - .filter_map(|ddb_notif: DynamoDbNotification| { - let ddb_notif2 = ddb_notif.clone(); - ok_or_inspect(ddb_notif.into_notif(), |e| { - conversion_err(&metrics, e, ddb_notif2, "into_notif") - }) + }) + .filter_map(|ddb_notif: DynamoDbNotification| { + let ddb_notif2 = ddb_notif.clone(); + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, ddb_notif2, "into_notif") }) - .collect() - }); - let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); - Ok(FetchMessageResponse { - timestamp, - messages, - }) + }) + .collect() + }); + let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); + Ok(FetchMessageResponse { + timestamp, + messages, }) + }) } pub fn drop_user( @@ -184,7 +187,8 @@ pub fn drop_user( retry_if( move || ddb.delete_item(input.clone()), |err: &DeleteItemError| matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error dropping user") + ) + .chain_err(|| "Error dropping user") } pub fn get_uaid( @@ -201,7 +205,8 @@ pub fn get_uaid( retry_if( move || ddb.get_item(input.clone()), |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error fetching user") + ) + .chain_err(|| "Error fetching user") } pub fn register_user( @@ -233,14 +238,16 @@ pub fn register_user( ) and ( attribute_not_exists(node_id) or (connected_at < :connected_at) - )"#.to_string(), + )"# + .to_string(), ), return_values: Some("ALL_OLD".to_string()), ..Default::default() }) }, |err: &PutItemError| matches!(err, &PutItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error storing user record") + ) + .chain_err(|| "Error storing user record") } pub fn update_user_message_month( @@ -264,9 +271,13 @@ pub fn update_user_message_month( }; retry_if( - move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())), + move || { + ddb.update_item(update_item.clone()) + .and_then(|_| future::ok(())) + }, |err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error updating user message month") + ) + .chain_err(|| "Error updating user message month") } pub fn all_channels( @@ -284,20 +295,22 @@ pub fn all_channels( ..Default::default() }; - let cond = |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)); - retry_if(move || ddb.get_item(input.clone()), cond) - .and_then(|output| { - let channels = output - .item - .and_then(|item| { - serde_dynamodb::from_hashmap::(item) - .ok() - .and_then(|notif| notif.chids) - }) - .unwrap_or_else(HashSet::new); - future::ok(channels) - }) - .or_else(|_err| future::ok(HashSet::new())) + retry_if( + move || ddb.get_item(input.clone()), + |err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)), + ) + .and_then(|output| { + let channels = output + .item + .and_then(|item| { + serde_dynamodb::from_hashmap::(item) + .ok() + .and_then(|notif| notif.chids) + }) + .unwrap_or_else(HashSet::new); + future::ok(channels) + }) + .or_else(|_err| future::ok(HashSet::new())) } pub fn save_channels( @@ -324,9 +337,13 @@ pub fn save_channels( }; retry_if( - move || ddb.update_item(update_item.clone()).and_then(|_| future::ok(())), + move || { + ddb.update_item(update_item.clone()) + .and_then(|_| future::ok(())) + }, |err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error saving channels") + ) + .chain_err(|| "Error saving channels") } pub fn unregister_channel_id( @@ -353,7 +370,8 @@ pub fn unregister_channel_id( retry_if( move || ddb.update_item(update_item.clone()), |err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)), - ).chain_err(|| "Error unregistering channel") + ) + .chain_err(|| "Error unregistering channel") } pub fn lookup_user( diff --git a/src/db/mod.rs b/src/db/mod.rs index 3cc4077b8..a832c43c0 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -127,11 +127,12 @@ impl DynamoStorage { }; retry_if( - move || { ddb.update_item(update_input.clone()) }, + move || ddb.update_item(update_input.clone()), |err: &UpdateItemError| { matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)) }, - ).chain_err(|| "Error incrementing storage") + ) + .chain_err(|| "Error incrementing storage") } pub fn hello( @@ -290,10 +291,12 @@ impl DynamoStorage { ..Default::default() }; - let cond = |err: &BatchWriteItemError| { - matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_)) - }; - retry_if(move || ddb.batch_write_item(batch_input.clone()), cond) + retry_if( + move || ddb.batch_write_item(batch_input.clone()), + |err: &BatchWriteItemError| { + matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_)) + }, + ) .and_then(|_| future::ok(())) .map_err(|err| { debug!("Error saving notification: {:?}", err); @@ -324,12 +327,14 @@ impl DynamoStorage { ..Default::default() }; - let cond = |err: &DeleteItemError| { - matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)) - }; - retry_if(move || ddb.delete_item(delete_input.clone()), cond) - .and_then(|_| future::ok(())) - .chain_err(|| "Error deleting notification") + retry_if( + move || ddb.delete_item(delete_input.clone()), + |err: &DeleteItemError| { + matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)) + }, + ) + .and_then(|_| future::ok(())) + .chain_err(|| "Error deleting notification") } pub fn check_storage( diff --git a/src/db/models.rs b/src/db/models.rs index 14cd05b11..1490ceb7b 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -258,7 +258,11 @@ mod tests { fn test_parse_sort_key_ver2() { let chid = Uuid::new_v4(); let sortkey_timestamp = us_since_epoch(); - let chidmessageid = format!("02:{}:{}", sortkey_timestamp, chid.to_hyphenated().to_string()); + let chidmessageid = format!( + "02:{}:{}", + sortkey_timestamp, + chid.to_hyphenated().to_string() + ); let key = DynamoDbNotification::parse_sort_key(&chidmessageid).unwrap(); assert_eq!(key.topic, None); assert_eq!(key.channel_id, chid); diff --git a/src/server/mod.rs b/src/server/mod.rs index e85df36b9..d1f5c1351 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -93,7 +93,7 @@ impl AutopushServer { sentry::ClientOptions { release: sentry_crate_release!(), ..Default::default() - } + }, )); register_panic_handler(); Some(guard) @@ -375,7 +375,8 @@ impl Server { ws.and_then(move |ws| { PingManager::new(&srv2, ws, uarx) .chain_err(|| "failed to make ping handler") - }).flatten(), + }) + .flatten(), ) } } @@ -408,7 +409,8 @@ impl Server { megaphone_token, opts.megaphone_poll_interval, &srv2, - ).expect("Unable to start megaphone updater"); + ) + .expect("Unable to start megaphone updater"); core.handle().spawn(fut.then(|res| { debug!("megaphone result: {:?}", res.map(drop)); Ok(()) @@ -943,9 +945,9 @@ fn write_status(socket: WebpushIo) -> MyFuture<()> { socket, StatusCode::Ok, json!({ - "status": "OK", - "version": env!("CARGO_PKG_VERSION"), - }), + "status": "OK", + "version": env!("CARGO_PKG_VERSION"), + }), ) } @@ -974,11 +976,11 @@ fn write_log_check(socket: WebpushIo) -> MyFuture<()> { socket, StatusCode::ImATeapot, json!({ - "code": code, - "errno": 999, - "error": "Test Failure", - "mesage": "FAILURE:Success", - }), + "code": code, + "errno": 999, + "error": "Test Failure", + "mesage": "FAILURE:Success", + }), ) } diff --git a/src/settings.rs b/src/settings.rs index 9a63a4f46..8e0febd1c 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -197,4 +197,4 @@ mod tests { let url = settings.endpoint_url(); assert_eq!("https://testname:8080", url); } -} \ No newline at end of file +}