From 5221462f097dc463958bb840d9199a8b7fef77df Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 29 Aug 2022 16:32:15 -0700 Subject: [PATCH 01/18] feat: Add additional metrics for message tracking * `notification.message.expired` -- In flight message has no TTL, endpoint is unavailable. * `notification.message.stored` -- message stored for later transmission * `notification.message.retrieved` -- message extracted from storage * `ua.notification.sent` -- sent a notification to the websocket endpoint Issue: CONSVC-1660 --- autoendpoint/src/routers/webpush.rs | 4 ++++ autopush-common/src/db/mod.rs | 26 ++++++++++++++++++++++---- autopush/src/client.rs | 4 ++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index 4c33a9ece..5276a36ac 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -77,6 +77,7 @@ impl Router for WebPushRouter { "Notification has a TTL of zero and was not successfully \ delivered, dropping it" ); + self.metrics.incr("notification.message.expired")?; return Ok(self.make_delivered_response(notification)); } @@ -166,6 +167,9 @@ impl WebPushRouter { ) .await .map_err(|e| ApiErrorKind::Router(RouterError::SaveDb(e)).into()) + .map(|_| { + self.metrics.incr("notification.message.stored").ok(); + }) } /// Remove the node ID from a user. This is done if the user is no longer diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 6b7b591bf..df826ff2e 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -3,7 +3,7 @@ use std::env; use std::sync::Arc; use uuid::Uuid; -use cadence::StatsdClient; +use cadence::{Counted, CountedExt, StatsdClient}; use futures::{future, Future}; use futures_backoff::retry_if; use rusoto_core::{HttpClient, Region}; @@ -329,12 +329,15 @@ impl DynamoStorage { table_name: message_month, ..Default::default() }; - + let metrics = self.metrics.clone(); retry_if( move || ddb.put_item(put_item.clone()), retryable_putitem_error, ) - .and_then(|_| future::ok(())) + .and_then(move |_| { + metrics.incr("notification.db.stored").ok(); + future::ok(()) + }) .chain_err(|| "Error saving notification") } @@ -346,6 +349,8 @@ impl DynamoStorage { messages: Vec, ) -> impl Future { let ddb = self.ddb.clone(); + let item_count = messages.len(); + let metrics = self.metrics.clone(); let put_items: Vec = messages .into_iter() .filter_map(|n| { @@ -366,7 +371,12 @@ impl DynamoStorage { move || ddb.batch_write_item(batch_input.clone()), retryable_batchwriteitem_error, ) - .and_then(|_| future::ok(())) + .and_then(move |_| { + metrics + .count("notification.message.stored", item_count as i64) + .ok(); + future::ok(()) + }) .map_err(|err| { debug!("Error saving notification: {:?}", err); err @@ -426,11 +436,16 @@ impl DynamoStorage { let table_name = table_name.to_string(); let ddb = self.ddb.clone(); let metrics = self.metrics.clone(); + let rmetrics = metrics.clone(); response.and_then(move |resp| -> MyFuture<_> { // Return now from this future if we have messages if !resp.messages.is_empty() { debug!("Topic message returns: {:?}", resp.messages); + // should we incidcate that these are topic messages? + rmetrics + .count("notification.message.retrieved", resp.messages.len() as i64) + .ok(); return Box::new(future::ok(CheckStorageResponse { include_topic: true, messages: resp.messages, @@ -461,6 +476,9 @@ impl DynamoStorage { // If we didn't get a timestamp off the last query, use the original // value if passed one let timestamp = resp.timestamp.or(timestamp); + rmetrics + .count("notification.message.retrieved", resp.messages.len() as i64) + .ok(); Ok(CheckStorageResponse { include_topic: false, messages: resp.messages, diff --git a/autopush/src/client.rs b/autopush/src/client.rs index 7c568fcc3..73795b778 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -1344,6 +1344,10 @@ fn emit_metrics_for_send(metrics: &StatsdClient, notif: &Notification, source: & if notif.topic.is_some() { metrics.incr("ua.notification.topic").ok(); } + metrics + .incr_with_tags("ua.notification.sent") + .with_tag("source", source) + .send(); metrics .count_with_tags( "ua.message_data", From ffd80f3f0c71cec124be9daa32b88fc10f5d760a Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 31 Aug 2022 15:58:56 -0700 Subject: [PATCH 02/18] f add metric and tag for storage --- autoendpoint/src/routers/webpush.rs | 6 +++++- autopush-common/src/db/mod.rs | 24 ++++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index 5276a36ac..10ef89622 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -168,7 +168,11 @@ impl WebPushRouter { .await .map_err(|e| ApiErrorKind::Router(RouterError::SaveDb(e)).into()) .map(|_| { - self.metrics.incr("notification.message.stored").ok(); + self.metrics + .incr_with_tags("notification.message.stored") + .with_tag("topic", ¬ification.headers.topic.is_some().to_string()) + // TODO: include `internal` if meta is set. + .send(); }) } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index df826ff2e..a67e88e08 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -349,11 +349,15 @@ impl DynamoStorage { messages: Vec, ) -> impl Future { let ddb = self.ddb.clone(); - let item_count = messages.len(); let metrics = self.metrics.clone(); let put_items: Vec = messages .into_iter() .filter_map(|n| { + // eventually include `internal` if `meta` defined. + metrics + .incr_with_tags("notification.message.stored") + .with_tag("topic", &n.topic.is_some().to_string()) + .send(); serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, n)) .ok() .map(|hm| WriteRequest { @@ -371,12 +375,7 @@ impl DynamoStorage { move || ddb.batch_write_item(batch_input.clone()), retryable_batchwriteitem_error, ) - .and_then(move |_| { - metrics - .count("notification.message.stored", item_count as i64) - .ok(); - future::ok(()) - }) + .and_then(move |_| future::ok(())) .map_err(|err| { debug!("Error saving notification: {:?}", err); err @@ -442,10 +441,10 @@ impl DynamoStorage { // Return now from this future if we have messages if !resp.messages.is_empty() { debug!("Topic message returns: {:?}", resp.messages); - // should we incidcate that these are topic messages? rmetrics - .count("notification.message.retrieved", resp.messages.len() as i64) - .ok(); + .count_with_tags("notification.message.retrieved", resp.messages.len() as i64) + .with_tag("topic", "true") + .send(); return Box::new(future::ok(CheckStorageResponse { include_topic: true, messages: resp.messages, @@ -477,8 +476,9 @@ impl DynamoStorage { // value if passed one let timestamp = resp.timestamp.or(timestamp); rmetrics - .count("notification.message.retrieved", resp.messages.len() as i64) - .ok(); + .count_with_tags("notification.message.retrieved", resp.messages.len() as i64) + .with_tag("topic", "false") + .send(); Ok(CheckStorageResponse { include_topic: false, messages: resp.messages, From 279e5ffe2eac186a3ec8abacd6b9c9d16dc1fa6c Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 31 Aug 2022 16:06:32 -0700 Subject: [PATCH 03/18] f remove unneeded `move` --- autopush-common/src/db/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index a67e88e08..a0374eca5 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -375,7 +375,7 @@ impl DynamoStorage { move || ddb.batch_write_item(batch_input.clone()), retryable_batchwriteitem_error, ) - .and_then(move |_| future::ok(())) + .and_then(|_| future::ok(())) .map_err(|err| { debug!("Error saving notification: {:?}", err); err From f4168855fabd25dbb28181b2f04c0ca6d6600623 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 20 Sep 2022 15:05:40 -0700 Subject: [PATCH 04/18] f add metrics, comments, stored flag. --- autoendpoint/src/db/client.rs | 1 + autoendpoint/src/extractors/notification.rs | 8 +++++++ autoendpoint/src/routers/common.rs | 11 +++++++++- autoendpoint/src/routers/webpush.rs | 20 ++++++++++++----- autopush-common/src/db/commands.rs | 12 +++++++++++ autopush-common/src/db/mod.rs | 3 +++ autopush-common/src/db/models.rs | 24 ++++++++++++++++++++- autopush-common/src/notification.rs | 2 ++ 8 files changed, 74 insertions(+), 7 deletions(-) diff --git a/autoendpoint/src/db/client.rs b/autoendpoint/src/db/client.rs index df6f0bb0b..d6115c397 100644 --- a/autoendpoint/src/db/client.rs +++ b/autoendpoint/src/db/client.rs @@ -270,6 +270,7 @@ impl DbClient for DbClientImpl { Ok(()) } + // Return the list of active channelIDs for a given user. async fn get_channels(&self, uaid: Uuid) -> DbResult> { // Channel IDs are stored in a special row in the message table, where // chidmessageid = " " diff --git a/autoendpoint/src/extractors/notification.rs b/autoendpoint/src/extractors/notification.rs index 02792e18a..6513cf30d 100644 --- a/autoendpoint/src/extractors/notification.rs +++ b/autoendpoint/src/extractors/notification.rs @@ -16,14 +16,20 @@ use uuid::Uuid; /// Extracts notification data from `Subscription` and request data #[derive(Clone, Debug)] pub struct Notification { + /// Unique message_id for this notification pub message_id: String, + /// The subscription information block pub subscription: Subscription, + /// Set of associated crypto headers pub headers: NotificationHeaders, /// UNIX timestamp in seconds pub timestamp: u64, /// UNIX timestamp in milliseconds pub sort_key_timestamp: u64, + /// The encrypted notification body pub data: Option, + /// has the notification been pulled from storage? + pub stored: bool, } impl FromRequest for Notification { @@ -84,6 +90,7 @@ impl FromRequest for Notification { timestamp, sort_key_timestamp, data, + stored: false, }) } .boxed_local() @@ -108,6 +115,7 @@ impl From for autopush_common::notification::Notification { Some(headers) } }, + stored: notification.stored, } } } diff --git a/autoendpoint/src/routers/common.rs b/autoendpoint/src/routers/common.rs index 477a7c1c3..f17cc75d0 100644 --- a/autoendpoint/src/routers/common.rs +++ b/autoendpoint/src/routers/common.rs @@ -4,7 +4,7 @@ use crate::extractors::notification::Notification; use crate::routers::RouterError; use actix_web::http::StatusCode; use autopush_common::util::InsertOpt; -use cadence::{Counted, CountedExt, StatsdClient}; +use cadence::{Counted, CountedExt, StatsdClient, Timed}; use std::collections::HashMap; use uuid::Uuid; @@ -173,6 +173,14 @@ pub fn incr_success_metrics( .with_tag("app_id", app_id) .with_tag("destination", "Direct") .send(); + metrics + .time_with_tags( + "notification.total_request_time", + (autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000, + ) + .with_tag("platform", platform) + .with_tag("app_id", app_id) + .send(); } /// Common router test code @@ -221,6 +229,7 @@ pub mod tests { timestamp: 0, sort_key_timestamp: 0, data, + stored: false, } } } diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index 10ef89622..606cdc2ba 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -5,7 +5,7 @@ use crate::extractors::router_data_input::RouterDataInput; use crate::routers::{Router, RouterError, RouterResponse}; use async_trait::async_trait; use autopush_common::db::DynamoDbUser; -use cadence::{Counted, CountedExt, StatsdClient}; +use cadence::{Counted, CountedExt, StatsdClient, Timed}; use reqwest::{Response, StatusCode}; use serde_json::Value; use std::collections::hash_map::RandomState; @@ -118,6 +118,17 @@ impl Router for WebPushRouter { trace!("Response = {:?}", response); if response.status() == 200 { trace!("Node has delivered the message"); + self.metrics + .time_with_tags( + "notification.total_request_time", + (notification.timestamp - autopush_common::util::sec_since_epoch()) + * 1000, + ) + .with_tag("platform", "websocket") + .with_tag("app_id", "direct") + .with_tag("stored", ¬ification.stored.to_string()) + .send(); + Ok(self.make_delivered_response(notification)) } else { trace!("Node has not delivered the message, returning stored response"); @@ -160,11 +171,10 @@ impl WebPushRouter { /// Store a notification in the database async fn store_notification(&self, notification: &Notification) -> ApiResult<()> { + let mut bundle = notification.clone(); + bundle.stored = true; self.ddb - .save_message( - notification.subscription.user.uaid, - notification.clone().into(), - ) + .save_message(notification.subscription.user.uaid, bundle.into()) .await .map_err(|e| ApiErrorKind::Router(RouterError::SaveDb(e)).into()) .map(|_| { diff --git a/autopush-common/src/db/commands.rs b/autopush-common/src/db/commands.rs index 132b516a8..73d7b9469 100644 --- a/autopush-common/src/db/commands.rs +++ b/autopush-common/src/db/commands.rs @@ -75,6 +75,7 @@ pub fn list_tables_sync( .chain_err(|| "Unable to list tables") } +/// Pull all pending messages for the user from storage pub fn fetch_messages( ddb: DynamoDbClient, metrics: Arc, @@ -136,6 +137,8 @@ pub fn fetch_messages( }) } +/// Pull messages older than a given timestamp for a given user. +/// This also returns the latest message timestamp. pub fn fetch_timestamp_messages( ddb: DynamoDbClient, metrics: Arc, @@ -191,6 +194,7 @@ pub fn fetch_timestamp_messages( }) } +/// Drop all user information from the Router table. pub fn drop_user( ddb: DynamoDbClient, uaid: &Uuid, @@ -208,6 +212,7 @@ pub fn drop_user( .chain_err(|| "Error dropping user") } +/// Get the user information from the Router table. pub fn get_uaid( ddb: DynamoDbClient, uaid: &Uuid, @@ -223,6 +228,7 @@ pub fn get_uaid( .chain_err(|| "Error fetching user") } +/// Register a user into the Router table. pub fn register_user( ddb: DynamoDbClient, user: &DynamoDbUser, @@ -264,6 +270,8 @@ pub fn register_user( .chain_err(|| "Error storing user record") } +/// Update the user's message month (Note: This is legacy for DynamoDB, but may still +/// be used by Stand Alone systems.) pub fn update_user_message_month( ddb: DynamoDbClient, uaid: &Uuid, @@ -294,6 +302,7 @@ pub fn update_user_message_month( .chain_err(|| "Error updating user message month") } +/// Return all known Channels for a given User. pub fn all_channels( ddb: DynamoDbClient, uaid: &Uuid, @@ -324,6 +333,7 @@ pub fn all_channels( .or_else(|_err| future::ok(HashSet::new())) } +/// Save the current list of Channels for a given user. pub fn save_channels( ddb: DynamoDbClient, uaid: &Uuid, @@ -357,6 +367,7 @@ pub fn save_channels( .chain_err(|| "Error saving channels") } +/// Remove a specific channel from the list of known channels for a given User pub fn unregister_channel_id( ddb: DynamoDbClient, uaid: &Uuid, @@ -385,6 +396,7 @@ pub fn unregister_channel_id( .chain_err(|| "Error unregistering channel") } +/// Respond with user information for a given user. #[allow(clippy::too_many_arguments)] pub fn lookup_user( ddb: DynamoDbClient, diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index a0374eca5..600e4ac77 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -413,6 +413,7 @@ impl DynamoStorage { .chain_err(|| "Error deleting notification") } + /// Check to see if we have pending messages and return them if we do. pub fn check_storage( &self, table_name: &str, @@ -550,6 +551,8 @@ impl DynamoStorage { } } +/// Get the list of current, valid message tables (Note: This is legacy for DynamoDB, but may still +/// be used for Stand Alone systems ) pub fn list_message_tables(ddb: &DynamoDbClient, prefix: &str) -> Result> { let mut names: Vec = Vec::new(); let mut start_key = None; diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 3be9c5cbe..569df7629 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -105,7 +105,7 @@ impl Default for DynamoDbUser { } } -#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct DynamoDbNotification { // DynamoDB #[serde(serialize_with = "uuid_serializer")] @@ -143,6 +143,27 @@ pub struct DynamoDbNotification { // value before sending it to storage or a connection node. #[serde(skip_serializing_if = "Option::is_none")] updateid: Option, + // This message was stored + stored: bool, +} + +/// Ensure that the default for 'stored' is true. +impl Default for DynamoDbNotification { + fn default() -> Self { + Self { + uaid: Uuid::default(), + chidmessageid: String::default(), + current_timestamp: None, + chids: None, + timestamp: None, + expiry: 0, + ttl: None, + data: None, + headers: None, + updateid: None, + stored: true, + } + } } impl DynamoDbNotification { @@ -216,6 +237,7 @@ impl DynamoDbNotification { data: self.data, headers: self.headers.map(|m| m.into()), sortkey_timestamp: key.sortkey_timestamp, + stored: true, }) } diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index 5c205a28e..9a365f9b9 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -23,6 +23,8 @@ pub struct Notification { pub sortkey_timestamp: Option, #[serde(skip_serializing_if = "Option::is_none")] pub headers: Option>, + #[serde(skip_serializing)] + pub stored: bool, } impl Notification { From 765fa0e60bd9bafcb3d849efe3ffc0125dcde659 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 24 Oct 2022 14:38:29 -0700 Subject: [PATCH 05/18] f r's --- autoendpoint/src/db/client.rs | 13 ++++++++-- autoendpoint/src/routers/webpush.rs | 14 +++++----- autopush-common/src/db/mod.rs | 20 +++++++++++++-- autopush/src/client.rs | 40 +++++++++++++---------------- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/autoendpoint/src/db/client.rs b/autoendpoint/src/db/client.rs index d6115c397..1126f5da3 100644 --- a/autoendpoint/src/db/client.rs +++ b/autoendpoint/src/db/client.rs @@ -8,7 +8,7 @@ use autopush_common::db::{DynamoDbNotification, DynamoDbUser}; use autopush_common::notification::Notification; use autopush_common::util::sec_since_epoch; use autopush_common::{ddb_item, hashmap, val}; -use cadence::StatsdClient; +use cadence::{CountedExt, StatsdClient}; use rusoto_core::credential::StaticProvider; use rusoto_core::{HttpClient, Region, RusotoError}; use rusoto_dynamodb::{ @@ -367,6 +367,7 @@ impl DbClient for DbClientImpl { } async fn save_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> { + let topic = message.headers.as_ref().is_some().to_string(); let input = PutItemInput { item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?, table_name: self.message_table.clone(), @@ -379,7 +380,15 @@ impl DbClient for DbClientImpl { retryable_putitem_error(self.metrics.clone()), ) .await?; - + { + // Build the metric report + let mut metric = self.metrics.incr_with_tags("notification.message.stored"); + if !topic.is_empty() { + metric = metric.with_tag("topic", &topic); + } + // TODO: include `internal` if meta is set. + metric.send(); + } Ok(()) } diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index 606cdc2ba..17f0e5f68 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -73,11 +73,16 @@ impl Router for WebPushRouter { } if notification.headers.ttl == 0 { + let topic = notification.headers.topic.is_some().to_string(); trace!( "Notification has a TTL of zero and was not successfully \ delivered, dropping it" ); - self.metrics.incr("notification.message.expired")?; + self.metrics + .incr_with_tags("notification.message.expired") + // TODO: include `internal` if meta is set. + .with_tag("topic", &topic) + .send(); return Ok(self.make_delivered_response(notification)); } @@ -177,13 +182,6 @@ impl WebPushRouter { .save_message(notification.subscription.user.uaid, bundle.into()) .await .map_err(|e| ApiErrorKind::Router(RouterError::SaveDb(e)).into()) - .map(|_| { - self.metrics - .incr_with_tags("notification.message.stored") - .with_tag("topic", ¬ification.headers.topic.is_some().to_string()) - // TODO: include `internal` if meta is set. - .send(); - }) } /// Remove the node ID from a user. This is done if the user is no longer diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 600e4ac77..c54885aa8 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -322,6 +322,7 @@ impl DynamoStorage { message_month: String, message: Notification, ) -> impl Future { + let topic = message.headers.as_ref().is_some().to_string(); let ddb = self.ddb.clone(); let put_item = PutItemInput { item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, message)) @@ -335,7 +336,12 @@ impl DynamoStorage { retryable_putitem_error, ) .and_then(move |_| { - metrics.incr("notification.db.stored").ok(); + let mut metric = metrics.incr_with_tags("notification.message.stored"); + // TODO: include `internal` if meta is set. + if !topic.is_empty() { + metric = metric.with_tag("topic", &topic); + } + metric.send(); future::ok(()) }) .chain_err(|| "Error saving notification") @@ -395,7 +401,9 @@ impl DynamoStorage { uaid: &Uuid, notif: &Notification, ) -> impl Future { + let topic = notif.headers.as_ref().is_some().to_string(); let ddb = self.ddb.clone(); + let metrics = self.metrics.clone(); let delete_input = DeleteItemInput { table_name: table_name.to_string(), key: ddb_item! { @@ -409,7 +417,15 @@ impl DynamoStorage { move || ddb.delete_item(delete_input.clone()), retryable_delete_error, ) - .and_then(|_| future::ok(())) + .and_then(move |_| { + let mut metric = metrics.incr_with_tags("notification.message.deleted"); + // TODO: include `internal` if meta is set. + if !topic.is_empty() { + metric = metric.with_tag("topic", &topic); + } + metric.send(); + future::ok(()) + }) .chain_err(|| "Error deleting notification") } diff --git a/autopush/src/client.rs b/autopush/src/client.rs index 73795b778..471c37c3f 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -1193,25 +1193,22 @@ where // Filter out TTL expired messages let now = sec_since_epoch(); let srv = data.srv.clone(); - messages = messages - .into_iter() - .filter(|n| { - if !n.expired(now) { - return true; - } - if n.sortkey_timestamp.is_none() { - srv.handle.spawn( - srv.ddb - .delete_message(&webpush.message_month, &webpush.uaid, n) - .then(|_| { - debug!("Deleting expired message without sortkey_timestamp"); - Ok(()) - }), - ); - } - false - }) - .collect(); + messages.retain(|n| { + if !n.expired(now) { + return true; + } + if n.sortkey_timestamp.is_none() { + srv.handle.spawn( + srv.ddb + .delete_message(&webpush.message_month, &webpush.uaid, n) + .then(|_| { + debug!("Deleting expired message without sortkey_timestamp"); + Ok(()) + }), + ); + } + false + }); webpush.flags.increment_storage = !include_topic && timestamp.is_some(); // If there's still messages send them out if !messages.is_empty() { @@ -1341,12 +1338,11 @@ where } fn emit_metrics_for_send(metrics: &StatsdClient, notif: &Notification, source: &'static str) { - if notif.topic.is_some() { - metrics.incr("ua.notification.topic").ok(); - } metrics .incr_with_tags("ua.notification.sent") .with_tag("source", source) + .with_tag("topic", ¬if.topic.is_some().to_string()) + // TODO: include `internal` if meta is set .send(); metrics .count_with_tags( From 1651de059f5850ab4f8b5098c1f6036db7ed0d2f Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 24 Oct 2022 15:16:14 -0700 Subject: [PATCH 06/18] f allow dead code because of f StateMachineFuture --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5c670f38e..975a3d3d8 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -87,7 +87,8 @@ jobs: name: Check formatting command: | cargo fmt -- --check - cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro + # allow `dead_code` because the `StateMachineFuture` procmacro generates a false positive. + cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro --allow=dead_code - run: name: Integration tests command: py.test -v From 9e907e28f003bb2a51a4f12c211e534f32000482 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 27 Oct 2022 09:27:55 -0700 Subject: [PATCH 07/18] f set default for stored. --- autopush-common/src/notification.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index 9a365f9b9..fb7d81823 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -6,6 +6,10 @@ use uuid::Uuid; use crate::util::ms_since_epoch; +fn bool_true() -> bool { + true +} + #[derive(Serialize, Default, Deserialize, Clone, Debug)] pub struct Notification { #[serde(rename = "channelID")] @@ -23,7 +27,7 @@ pub struct Notification { pub sortkey_timestamp: Option, #[serde(skip_serializing_if = "Option::is_none")] pub headers: Option>, - #[serde(skip_serializing)] + #[serde(default="bool_true", skip_serializing)] pub stored: bool, } From 3b11f90b4da347ad85e33dda252b0c329e73f298 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 27 Oct 2022 09:52:17 -0700 Subject: [PATCH 08/18] f fmt --- autopush-common/src/notification.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index fb7d81823..c880bac6d 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -27,7 +27,7 @@ pub struct Notification { pub sortkey_timestamp: Option, #[serde(skip_serializing_if = "Option::is_none")] pub headers: Option>, - #[serde(default="bool_true", skip_serializing)] + #[serde(default = "bool_true", skip_serializing)] pub stored: bool, } From 7b0f490a5873645f3c83e64e66fda07218bb959b Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 27 Oct 2022 10:44:46 -0700 Subject: [PATCH 09/18] f set stored as optional --- autopush-common/src/db/models.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 569df7629..7af66a4f4 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -105,6 +105,10 @@ impl Default for DynamoDbUser { } } +fn bool_true() -> bool { + true +} + #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct DynamoDbNotification { // DynamoDB @@ -144,6 +148,7 @@ pub struct DynamoDbNotification { #[serde(skip_serializing_if = "Option::is_none")] updateid: Option, // This message was stored + #[serde(default="bool_true")] stored: bool, } From 5da5c026b3775a22edc3a1da03a3b23dd72f8018 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 27 Oct 2022 10:55:28 -0700 Subject: [PATCH 10/18] f fmt --- autopush-common/src/db/models.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 7af66a4f4..37a4e9526 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -148,7 +148,7 @@ pub struct DynamoDbNotification { #[serde(skip_serializing_if = "Option::is_none")] updateid: Option, // This message was stored - #[serde(default="bool_true")] + #[serde(default = "bool_true")] stored: bool, } From 246e9a4c95c9495f3351b9fe1db1cb3e31951ea8 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 6 Dec 2022 14:47:33 -0800 Subject: [PATCH 11/18] f fix for SYNC-3516 * Add metrics to track delivery to OS platforms * refactored UA parse to be a struct * some modern clippy cleanup --- autoendpoint/src/extractors/notification.rs | 4 +- autoendpoint/src/settings.rs | 4 +- autopush-common/src/db/models.rs | 2 +- autopush-common/src/logging.rs | 2 +- autopush/src/client.rs | 49 +++++---- autopush/src/user_agent.rs | 105 ++++++++++++-------- 6 files changed, 102 insertions(+), 64 deletions(-) diff --git a/autoendpoint/src/extractors/notification.rs b/autoendpoint/src/extractors/notification.rs index 6513cf30d..9004622f5 100644 --- a/autoendpoint/src/extractors/notification.rs +++ b/autoendpoint/src/extractors/notification.rs @@ -166,7 +166,7 @@ impl Notification { map.insert( "channelID", - serde_json::to_value(&self.subscription.channel_id).unwrap(), + serde_json::to_value(self.subscription.channel_id).unwrap(), ); map.insert("version", serde_json::to_value(&self.message_id).unwrap()); map.insert("ttl", serde_json::to_value(self.headers.ttl).unwrap()); @@ -174,7 +174,7 @@ impl Notification { map.insert("timestamp", serde_json::to_value(self.timestamp).unwrap()); if let Some(data) = &self.data { - map.insert("data", serde_json::to_value(&data).unwrap()); + map.insert("data", serde_json::to_value(data).unwrap()); let headers: HashMap<_, _> = self.headers.clone().into(); map.insert("headers", serde_json::to_value(&headers).unwrap()); diff --git a/autoendpoint/src/settings.rs b/autoendpoint/src/settings.rs index 1047992c1..fef476894 100644 --- a/autoendpoint/src/settings.rs +++ b/autoendpoint/src/settings.rs @@ -123,7 +123,7 @@ impl Settings { /// Initialize the fernet encryption instance pub fn make_fernet(&self) -> MultiFernet { - let keys = &self.crypto_keys.replace('"', "").replace(' ', ""); + let keys = &self.crypto_keys.replace(['"', ' '], ""); let fernets = Self::read_list_from_str(keys, "Invalid AUTOEND_CRYPTO_KEYS") .map(|key| { debug!("Fernet keys: {:?}", &key); @@ -135,7 +135,7 @@ impl Settings { /// Get the list of auth hash keys pub fn auth_keys(&self) -> Vec { - let keys = &self.auth_keys.replace('"', "").replace(' ', ""); + let keys = &self.auth_keys.replace(['"', ' '], ""); Self::read_list_from_str(keys, "Invalid AUTOEND_AUTH_KEYS") .map(|v| v.to_owned()) .collect() diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 37a4e9526..c8e9bea88 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -175,7 +175,7 @@ impl DynamoDbNotification { fn parse_sort_key(key: &str) -> Result { lazy_static! { static ref RE: RegexSet = - RegexSet::new(&[r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap(); + RegexSet::new([r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap(); } if !RE.is_match(key) { return Err("Invalid chidmessageid".into()); diff --git a/autopush-common/src/logging.rs b/autopush-common/src/logging.rs index 502d42c66..307ed891d 100644 --- a/autopush-common/src/logging.rs +++ b/autopush-common/src/logging.rs @@ -9,7 +9,7 @@ use slog_mozlog_json::MozLogJson; pub fn init_logging(json: bool) -> Result<()> { let logger = if json { let hostname = get_ec2_instance_id() - .map(&str::to_owned) + .map(str::to_owned) .or_else(get_hostname) .ok_or("Couldn't get_hostname")?; diff --git a/autopush/src/client.rs b/autopush/src/client.rs index 471c37c3f..01fcd18bf 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -27,7 +27,7 @@ use autopush_common::util::{ms_since_epoch, sec_since_epoch}; use crate::megaphone::{Broadcast, BroadcastSubs}; use crate::server::protocol::{ClientMessage, ServerMessage, ServerNotification}; use crate::server::Server; -use crate::user_agent::parse_user_agent; +use crate::user_agent::UserAgentInfo; // Created and handed to the AutopushServer pub struct RegisteredClient { @@ -163,6 +163,7 @@ pub struct WebPushClient { last_ping: u64, stats: SessionStatistics, deferred_user_registration: Option, + ua_info: UserAgentInfo, } impl Default for WebPushClient { @@ -182,6 +183,7 @@ impl Default for WebPushClient { last_ping: Default::default(), stats: Default::default(), deferred_user_registration: Default::default(), + ua_info: Default::default(), } } } @@ -473,6 +475,7 @@ where ..Default::default() }, deferred_user_registration, + ua_info: UserAgentInfo::from(user_agent.as_ref()), ..Default::default() })); @@ -600,16 +603,17 @@ where } let now = ms_since_epoch(); let elapsed = (now - webpush.connected_at) / 1_000; - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(&user_agent); + let ua_info = UserAgentInfo::from(user_agent.as_ref()); // dogstatsd doesn't support timers: use histogram instead srv.metrics .time_with_tags("ua.connection.lifespan", elapsed) - .with_tag("ua_os_family", metrics_os) - .with_tag("ua_browser_family", metrics_browser) + .with_tag("ua_os_family", &ua_info.metrics_os) + .with_tag("ua_browser_family", &ua_info.metrics_browser) .send(); // Log out the sentry message if applicable and convert to error msg let error = if let Some(ref err) = error { + let ua_info = ua_info.clone(); let mut event = event_from_error_chain(err); event.user = Some(sentry::User { id: Some(webpush.uaid.as_simple().to_string()), @@ -617,19 +621,19 @@ where }); event .tags - .insert("ua_name".to_string(), ua_result.name.to_string()); + .insert("ua_name".to_string(), ua_info.browser_name); event .tags - .insert("ua_os_family".to_string(), metrics_os.to_string()); + .insert("ua_os_family".to_string(), ua_info.metrics_os); event .tags - .insert("ua_os_ver".to_string(), ua_result.os_version.to_string()); + .insert("ua_os_ver".to_string(), ua_info.os_version); event .tags - .insert("ua_browser_family".to_string(), metrics_browser.to_string()); + .insert("ua_browser_family".to_string(), ua_info.metrics_browser); event .tags - .insert("ua_browser_ver".to_string(), ua_result.version.to_string()); + .insert("ua_browser_ver".to_string(), ua_info.browser_version); sentry::capture_event(event); err.display_chain().to_string() } else { @@ -658,12 +662,12 @@ where "uaid_reset" => stats.uaid_reset, "existing_uaid" => stats.existing_uaid, "connection_type" => &stats.connection_type, - "ua_name" => ua_result.name, - "ua_os_family" => metrics_os, - "ua_os_ver" => ua_result.os_version.into_owned(), - "ua_browser_family" => metrics_browser, - "ua_browser_ver" => ua_result.version, - "ua_category" => ua_result.category, + "ua_name" => ua_info.browser_name, + "ua_os_family" => ua_info.metrics_os, + "ua_os_ver" => ua_info.os_version, + "ua_browser_family" => ua_info.metrics_browser, + "ua_browser_ver" => ua_info.browser_version, + "ua_category" => ua_info.category, "connection_time" => elapsed, "direct_acked" => stats.direct_acked, "direct_storage" => stats.direct_storage, @@ -1100,7 +1104,7 @@ where webpush.unacked_direct_notifs.push(notif.clone()); } debug!("Got a notification to send, sending!"); - emit_metrics_for_send(&data.srv.metrics, ¬if, "Direct"); + emit_metrics_for_send(&data.srv.metrics, ¬if, "Direct", &webpush.ua_info); transition!(Send { smessages: vec![ServerMessage::Notification(notif)], data, @@ -1217,7 +1221,9 @@ where .extend(messages.iter().cloned()); let smessages: Vec<_> = messages .into_iter() - .inspect(|msg| emit_metrics_for_send(&data.srv.metrics, msg, "Stored")) + .inspect(|msg| { + emit_metrics_for_send(&data.srv.metrics, msg, "Stored", &webpush.ua_info) + }) .map(ServerMessage::Notification) .collect(); webpush.sent_from_storage += smessages.len() as u32; @@ -1337,11 +1343,17 @@ where } } -fn emit_metrics_for_send(metrics: &StatsdClient, notif: &Notification, source: &'static str) { +fn emit_metrics_for_send( + metrics: &StatsdClient, + notif: &Notification, + source: &'static str, + user_agent_info: &UserAgentInfo, +) { metrics .incr_with_tags("ua.notification.sent") .with_tag("source", source) .with_tag("topic", ¬if.topic.is_some().to_string()) + .with_tag("os", &user_agent_info.metrics_os) // TODO: include `internal` if meta is set .send(); metrics @@ -1350,5 +1362,6 @@ fn emit_metrics_for_send(metrics: &StatsdClient, notif: &Notification, source: & notif.data.as_ref().map_or(0, |data| data.len() as i64), ) .with_tag("source", source) + .with_tag("os", &user_agent_info.metrics_os) .send(); } diff --git a/autopush/src/user_agent.rs b/autopush/src/user_agent.rs index 17e448430..3b7fb26e0 100644 --- a/autopush/src/user_agent.rs +++ b/autopush/src/user_agent.rs @@ -1,4 +1,4 @@ -use woothee::parser::{Parser, WootheeResult}; +use woothee::parser::{Parser}; // List of valid user-agent attributes to keep, anything not in this // list is considered 'Other'. We log the user-agent on connect always @@ -11,74 +11,99 @@ const VALID_UA_BROWSER: &[&str] = &["Chrome", "Firefox", "Safari", "Opera"]; // field). Windows has many values and we only care that its Windows const VALID_UA_OS: &[&str] = &["Firefox OS", "Linux", "Mac OSX"]; -pub fn parse_user_agent(agent: &str) -> (WootheeResult, &str, &str) { - let parser = Parser::new(); - let wresult = parser.parse(agent).unwrap_or_else(|| WootheeResult { - name: "", - category: "", - os: "", - os_version: "".into(), - browser_type: "", - version: "", - vendor: "", - }); +#[derive(Clone, Debug, Default)] +pub struct UserAgentInfo { + _user_agent_string: String, + pub category: String, + pub browser_name: String, + pub browser_version: String, + pub metrics_browser: String, + pub metrics_os: String, + pub os_version: String, + pub os: String, +} + +impl From<&str> for UserAgentInfo { + fn from(user_agent_string: &str) -> Self { + let parser = Parser::new(); + let wresult = parser.parse(user_agent_string).unwrap_or_default(); - // Determine a base os/browser for metrics' tags - let metrics_os = if wresult.os.starts_with("Windows") { - "Windows" - } else if VALID_UA_OS.contains(&wresult.os) { - wresult.os - } else { - "Other" - }; - let metrics_browser = if VALID_UA_BROWSER.contains(&wresult.name) { - wresult.name - } else { - "Other" - }; - (wresult, metrics_os, metrics_browser) + // Determine a base os/browser for metrics' tags + let metrics_os = if wresult.os.starts_with("Windows") { + "Windows" + } else if VALID_UA_OS.contains(&wresult.os) { + wresult.os + } else { + "Other" + }; + let metrics_browser = if VALID_UA_BROWSER.contains(&wresult.name) { + wresult.name + } else { + "Other" + }; + + Self { + category: wresult.category.to_owned(), + browser_name: wresult.name.to_owned(), + browser_version: wresult.version.to_owned(), + metrics_browser: metrics_browser.to_owned(), + metrics_os: metrics_os.to_owned(), + os_version: wresult.os_version.to_string(), + os: wresult.os.to_owned(), + _user_agent_string: user_agent_string.to_owned(), + } + } +} + +/* +impl UserAgentInfo { + pub fn parsed(&self) -> WootheeResult { + let parser = Parser::new(); + parser.parse(&self._user_agent_string).unwrap_or_default() + } } +*/ #[cfg(test)] mod tests { - use super::parse_user_agent; + use super::UserAgentInfo; #[test] fn test_linux() { let agent = r#"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.1.2) Gecko/20090807 Mandriva Linux/1.9.1.2-1.1mud2009.1 (2009.1) Firefox/3.5.2 FirePHP/0.3,gzip(gfe),gzip(gfe)"#; - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(agent); - assert_eq!(metrics_os, "Linux"); + let ua_result = UserAgentInfo::from(agent); + assert_eq!(ua_result.metrics_os, "Linux"); assert_eq!(ua_result.os, "Linux"); - assert_eq!(metrics_browser, "Firefox"); + assert_eq!(ua_result.metrics_browser, "Firefox"); } #[test] fn test_windows() { let agent = r#"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3 (.NET CLR 3.5.30729)"#; - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(agent); - assert_eq!(metrics_os, "Windows"); + let ua_result = UserAgentInfo::from(agent); + assert_eq!(ua_result.metrics_os, "Windows"); assert_eq!(ua_result.os, "Windows 7"); - assert_eq!(metrics_browser, "Firefox"); + assert_eq!(ua_result.metrics_browser, "Firefox"); } #[test] fn test_osx() { let agent = r#"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.5; rv:2.1.1) Gecko/ Firefox/5.0.1"#; - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(agent); - assert_eq!(metrics_os, "Mac OSX"); + let ua_result = UserAgentInfo::from(agent); + assert_eq!(ua_result.metrics_os, "Mac OSX"); assert_eq!(ua_result.os, "Mac OSX"); - assert_eq!(metrics_browser, "Firefox"); + assert_eq!(ua_result.metrics_browser, "Firefox"); } #[test] fn test_other() { let agent = r#"BlackBerry9000/4.6.0.167 Profile/MIDP-2.0 Configuration/CLDC-1.1 VendorID/102"#; - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(agent); - assert_eq!(metrics_os, "Other"); + let ua_result = UserAgentInfo::from(agent); + assert_eq!(ua_result.metrics_os, "Other"); assert_eq!(ua_result.os, "BlackBerry"); - assert_eq!(metrics_browser, "Other"); - assert_eq!(ua_result.name, "UNKNOWN"); + assert_eq!(ua_result.metrics_browser, "Other"); + assert_eq!(ua_result.browser_name, "UNKNOWN"); } } From 2dad8ea3881f4f17dc7e790426f88ec0422a07af Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 6 Dec 2022 14:50:48 -0800 Subject: [PATCH 12/18] f fmt --- autopush/src/user_agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopush/src/user_agent.rs b/autopush/src/user_agent.rs index 3b7fb26e0..e32deec98 100644 --- a/autopush/src/user_agent.rs +++ b/autopush/src/user_agent.rs @@ -1,4 +1,4 @@ -use woothee::parser::{Parser}; +use woothee::parser::Parser; // List of valid user-agent attributes to keep, anything not in this // list is considered 'Other'. We log the user-agent on connect always From a3dfb024a4b14be40c54ea5a0a0c0429cfd6d284 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 17 Jan 2023 10:18:33 -0800 Subject: [PATCH 13/18] f r's --- autoendpoint/src/db/client.rs | 6 ++---- autoendpoint/src/extractors/notification.rs | 6 +----- autoendpoint/src/routers/common.rs | 1 - autoendpoint/src/routers/webpush.rs | 8 ++++---- autopush-common/src/db/mod.rs | 12 ++++-------- autopush-common/src/db/models.rs | 1 - autopush-common/src/notification.rs | 8 +------- 7 files changed, 12 insertions(+), 30 deletions(-) diff --git a/autoendpoint/src/db/client.rs b/autoendpoint/src/db/client.rs index 1126f5da3..66065f54c 100644 --- a/autoendpoint/src/db/client.rs +++ b/autoendpoint/src/db/client.rs @@ -367,7 +367,7 @@ impl DbClient for DbClientImpl { } async fn save_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> { - let topic = message.headers.as_ref().is_some().to_string(); + let topic = message.topic.is_some().to_string(); let input = PutItemInput { item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?, table_name: self.message_table.clone(), @@ -383,9 +383,7 @@ impl DbClient for DbClientImpl { { // Build the metric report let mut metric = self.metrics.incr_with_tags("notification.message.stored"); - if !topic.is_empty() { - metric = metric.with_tag("topic", &topic); - } + metric = metric.with_tag("topic", &topic); // TODO: include `internal` if meta is set. metric.send(); } diff --git a/autoendpoint/src/extractors/notification.rs b/autoendpoint/src/extractors/notification.rs index 9004622f5..10de8cbf3 100644 --- a/autoendpoint/src/extractors/notification.rs +++ b/autoendpoint/src/extractors/notification.rs @@ -28,8 +28,6 @@ pub struct Notification { pub sort_key_timestamp: u64, /// The encrypted notification body pub data: Option, - /// has the notification been pulled from storage? - pub stored: bool, } impl FromRequest for Notification { @@ -90,7 +88,6 @@ impl FromRequest for Notification { timestamp, sort_key_timestamp, data, - stored: false, }) } .boxed_local() @@ -115,7 +112,6 @@ impl From for autopush_common::notification::Notification { Some(headers) } }, - stored: notification.stored, } } } @@ -177,7 +173,7 @@ impl Notification { map.insert("data", serde_json::to_value(data).unwrap()); let headers: HashMap<_, _> = self.headers.clone().into(); - map.insert("headers", serde_json::to_value(&headers).unwrap()); + map.insert("headers", serde_json::to_value(headers).unwrap()); } map diff --git a/autoendpoint/src/routers/common.rs b/autoendpoint/src/routers/common.rs index f17cc75d0..ee3683bfe 100644 --- a/autoendpoint/src/routers/common.rs +++ b/autoendpoint/src/routers/common.rs @@ -229,7 +229,6 @@ pub mod tests { timestamp: 0, sort_key_timestamp: 0, data, - stored: false, } } } diff --git a/autoendpoint/src/routers/webpush.rs b/autoendpoint/src/routers/webpush.rs index 17f0e5f68..9b54e5589 100644 --- a/autoendpoint/src/routers/webpush.rs +++ b/autoendpoint/src/routers/webpush.rs @@ -131,7 +131,6 @@ impl Router for WebPushRouter { ) .with_tag("platform", "websocket") .with_tag("app_id", "direct") - .with_tag("stored", ¬ification.stored.to_string()) .send(); Ok(self.make_delivered_response(notification)) @@ -176,10 +175,11 @@ impl WebPushRouter { /// Store a notification in the database async fn store_notification(&self, notification: &Notification) -> ApiResult<()> { - let mut bundle = notification.clone(); - bundle.stored = true; self.ddb - .save_message(notification.subscription.user.uaid, bundle.into()) + .save_message( + notification.subscription.user.uaid, + notification.clone().into(), + ) .await .map_err(|e| ApiErrorKind::Router(RouterError::SaveDb(e)).into()) } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index c54885aa8..a91da26da 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -322,7 +322,7 @@ impl DynamoStorage { message_month: String, message: Notification, ) -> impl Future { - let topic = message.headers.as_ref().is_some().to_string(); + let topic = message.topic.is_some().to_string(); let ddb = self.ddb.clone(); let put_item = PutItemInput { item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, message)) @@ -338,9 +338,7 @@ impl DynamoStorage { .and_then(move |_| { let mut metric = metrics.incr_with_tags("notification.message.stored"); // TODO: include `internal` if meta is set. - if !topic.is_empty() { - metric = metric.with_tag("topic", &topic); - } + metric = metric.with_tag("topic", &topic); metric.send(); future::ok(()) }) @@ -401,7 +399,7 @@ impl DynamoStorage { uaid: &Uuid, notif: &Notification, ) -> impl Future { - let topic = notif.headers.as_ref().is_some().to_string(); + let topic = notif.topic.is_some().to_string(); let ddb = self.ddb.clone(); let metrics = self.metrics.clone(); let delete_input = DeleteItemInput { @@ -420,9 +418,7 @@ impl DynamoStorage { .and_then(move |_| { let mut metric = metrics.incr_with_tags("notification.message.deleted"); // TODO: include `internal` if meta is set. - if !topic.is_empty() { - metric = metric.with_tag("topic", &topic); - } + metric = metric.with_tag("topic", &topic); metric.send(); future::ok(()) }) diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index c8e9bea88..0ed9e4ab2 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -242,7 +242,6 @@ impl DynamoDbNotification { data: self.data, headers: self.headers.map(|m| m.into()), sortkey_timestamp: key.sortkey_timestamp, - stored: true, }) } diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index c880bac6d..9d6b8f471 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -6,10 +6,6 @@ use uuid::Uuid; use crate::util::ms_since_epoch; -fn bool_true() -> bool { - true -} - #[derive(Serialize, Default, Deserialize, Clone, Debug)] pub struct Notification { #[serde(rename = "channelID")] @@ -27,8 +23,6 @@ pub struct Notification { pub sortkey_timestamp: Option, #[serde(skip_serializing_if = "Option::is_none")] pub headers: Option>, - #[serde(default = "bool_true", skip_serializing)] - pub stored: bool, } impl Notification { @@ -63,7 +57,7 @@ impl Notification { } pub fn expired(&self, at_sec: u64) -> bool { - at_sec >= self.timestamp as u64 + self.ttl as u64 + at_sec >= self.timestamp as u64 + self.ttl } } From cd3af0d83f13b1bbb7b3db2e43e885a68dd3c39f Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 17 Jan 2023 11:02:08 -0800 Subject: [PATCH 14/18] f fix circleci --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 81febb751..70a24185a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -87,10 +87,10 @@ jobs: name: Check formatting # Rust 1.65+ is stricter about dead_code and flags auto-generated methods created by # state_machine_future. Since these are auto-generated, it's not possible to tag them as - # allowed. Adding `-A dead_code` until state_machine_future is removed. + # allowed. Adding `--allow=dead_code` until state_machine_future is removed. command: | cargo fmt -- --check - cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro -A dead_code + cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro --allow=dead_code - run: name: Integration tests command: py.test -v From 1f5eb977eaa01cd559f7caa4f21d586d7e7b5268 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 17 Jan 2023 11:29:33 -0800 Subject: [PATCH 15/18] f remove unused stored --- autopush-common/src/db/models.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 0ed9e4ab2..a650e33e4 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -105,10 +105,6 @@ impl Default for DynamoDbUser { } } -fn bool_true() -> bool { - true -} - #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct DynamoDbNotification { // DynamoDB @@ -147,9 +143,6 @@ pub struct DynamoDbNotification { // value before sending it to storage or a connection node. #[serde(skip_serializing_if = "Option::is_none")] updateid: Option, - // This message was stored - #[serde(default = "bool_true")] - stored: bool, } /// Ensure that the default for 'stored' is true. @@ -166,7 +159,6 @@ impl Default for DynamoDbNotification { data: None, headers: None, updateid: None, - stored: true, } } } From 6b901c218de5784c018e461494891afb03593859 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 18 Jan 2023 08:23:20 -0800 Subject: [PATCH 16/18] f remove user_agent string pass and parse --- autopush/src/client.rs | 13 +++++-------- autopush/src/user_agent.rs | 9 --------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/autopush/src/client.rs b/autopush/src/client.rs index 5f3923a49..f2339b830 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -312,7 +312,7 @@ where AwaitSessionComplete { auth_state_machine: AuthClientStateFuture, srv: Rc, - user_agent: String, + //user_agent: String, webpush: Rc>, }, @@ -320,7 +320,7 @@ where AwaitRegistryDisconnect { response: MyFuture<()>, srv: Rc, - user_agent: String, + //user_agent: String, webpush: Rc>, error: Option, }, @@ -512,7 +512,6 @@ where let AwaitRegistryConnect { srv, ws, - user_agent, webpush, broadcast_subs, .. @@ -531,7 +530,6 @@ where transition!(AwaitSessionComplete { auth_state_machine, srv, - user_agent, webpush, }) } @@ -558,7 +556,7 @@ where let AwaitSessionComplete { srv, - user_agent, + //user_agent, webpush, .. } = session_complete.take(); @@ -570,7 +568,7 @@ where transition!(AwaitRegistryDisconnect { response, srv, - user_agent, + //user_agent, webpush, error, }) @@ -584,7 +582,6 @@ where let AwaitRegistryDisconnect { srv, - user_agent, webpush, error, .. @@ -603,7 +600,7 @@ where } let now = ms_since_epoch(); let elapsed = (now - webpush.connected_at) / 1_000; - let ua_info = UserAgentInfo::from(user_agent.as_ref()); + let ua_info = webpush.ua_info.clone(); // dogstatsd doesn't support timers: use histogram instead srv.metrics .time_with_tags("ua.connection.lifespan", elapsed) diff --git a/autopush/src/user_agent.rs b/autopush/src/user_agent.rs index e32deec98..b69c825f2 100644 --- a/autopush/src/user_agent.rs +++ b/autopush/src/user_agent.rs @@ -55,15 +55,6 @@ impl From<&str> for UserAgentInfo { } } -/* -impl UserAgentInfo { - pub fn parsed(&self) -> WootheeResult { - let parser = Parser::new(); - parser.parse(&self._user_agent_string).unwrap_or_default() - } -} -*/ - #[cfg(test)] mod tests { use super::UserAgentInfo; From 1ef32cc35d04c553eda13abef9690d18b27a3f6a Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 18 Jan 2023 10:53:54 -0800 Subject: [PATCH 17/18] f removed extra //user_agent, elements --- autopush/src/client.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/autopush/src/client.rs b/autopush/src/client.rs index f2339b830..cb92cf936 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -312,7 +312,6 @@ where AwaitSessionComplete { auth_state_machine: AuthClientStateFuture, srv: Rc, - //user_agent: String, webpush: Rc>, }, @@ -320,7 +319,6 @@ where AwaitRegistryDisconnect { response: MyFuture<()>, srv: Rc, - //user_agent: String, webpush: Rc>, error: Option, }, @@ -556,7 +554,6 @@ where let AwaitSessionComplete { srv, - //user_agent, webpush, .. } = session_complete.take(); @@ -568,7 +565,6 @@ where transition!(AwaitRegistryDisconnect { response, srv, - //user_agent, webpush, error, }) From 96befa5ff82c10e661d466160cdd755d23715bb1 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 18 Jan 2023 16:59:58 -0800 Subject: [PATCH 18/18] f fmt --- autopush/src/client.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/autopush/src/client.rs b/autopush/src/client.rs index cb92cf936..00d0dd0c1 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -552,11 +552,7 @@ where } }; - let AwaitSessionComplete { - srv, - webpush, - .. - } = session_complete.take(); + let AwaitSessionComplete { srv, webpush, .. } = session_complete.take(); let response = srv .clients