Skip to content

Commit

Permalink
feat: Add additional metrics for message tracking [CONSVC-1660] (#330)
Browse files Browse the repository at this point in the history
* feat: Add additional metrics for message tracking

* `notification.message.expired` -- In flight message has no TTL, endpoint is unavailable.
* `notification.message.stored` -- message stored for later transmission
* `notification.message.retrieved` -- message extracted from storage
* `ua.notification.sent` -- sent a notification to the websocket endpoint
* Add metrics to track delivery to OS platforms
* refactored UA parse to be a struct
* some modern clippy cleanup

Issue: CONSVC-1660, SYNC-3516
  • Loading branch information
jrconlin authored Jan 20, 2023
1 parent 9238056 commit 65ac1a3
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ jobs:
name: Check formatting
# Rust 1.65+ is stricter about dead_code and flags auto-generated methods created by
# state_machine_future. Since these are auto-generated, it's not possible to tag them as
# allowed. Adding `-A dead_code` until state_machine_future is removed.
# allowed. Adding `--allow=dead_code` until state_machine_future is removed.
command: |
cargo fmt -- --check
cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro -A dead_code
cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro --allow=dead_code
- run:
name: Integration tests
command: py.test -v
Expand Down
12 changes: 10 additions & 2 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use autopush_common::db::{DynamoDbNotification, DynamoDbUser};
use autopush_common::notification::Notification;
use autopush_common::util::sec_since_epoch;
use autopush_common::{ddb_item, hashmap, val};
use cadence::StatsdClient;
use cadence::{CountedExt, StatsdClient};
use rusoto_core::credential::StaticProvider;
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_dynamodb::{
Expand Down Expand Up @@ -270,6 +270,7 @@ impl DbClient for DbClientImpl {
Ok(())
}

// Return the list of active channelIDs for a given user.
async fn get_channels(&self, uaid: Uuid) -> DbResult<HashSet<Uuid>> {
// Channel IDs are stored in a special row in the message table, where
// chidmessageid = " "
Expand Down Expand Up @@ -366,6 +367,7 @@ impl DbClient for DbClientImpl {
}

async fn save_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> {
let topic = message.topic.is_some().to_string();
let input = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?,
table_name: self.message_table.clone(),
Expand All @@ -378,7 +380,13 @@ impl DbClient for DbClientImpl {
retryable_putitem_error(self.metrics.clone()),
)
.await?;

{
// Build the metric report
let mut metric = self.metrics.incr_with_tags("notification.message.stored");
metric = metric.with_tag("topic", &topic);
// TODO: include `internal` if meta is set.
metric.send();
}
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use uuid::Uuid;
/// Extracts notification data from `Subscription` and request data
#[derive(Clone, Debug)]
pub struct Notification {
/// Unique message_id for this notification
pub message_id: String,
/// The subscription information block
pub subscription: Subscription,
/// Set of associated crypto headers
pub headers: NotificationHeaders,
/// UNIX timestamp in seconds
pub timestamp: u64,
/// UNIX timestamp in milliseconds
pub sort_key_timestamp: u64,
/// The encrypted notification body
pub data: Option<String>,
}

Expand Down
10 changes: 9 additions & 1 deletion autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::extractors::notification::Notification;
use crate::routers::RouterError;
use actix_web::http::StatusCode;
use autopush_common::util::InsertOpt;
use cadence::{Counted, CountedExt, StatsdClient};
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use std::collections::HashMap;
use uuid::Uuid;

Expand Down Expand Up @@ -173,6 +173,14 @@ pub fn incr_success_metrics(
.with_tag("app_id", app_id)
.with_tag("destination", "Direct")
.send();
metrics
.time_with_tags(
"notification.total_request_time",
(autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000,
)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.send();
}

/// Common router test code
Expand Down
18 changes: 17 additions & 1 deletion autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::extractors::router_data_input::RouterDataInput;
use crate::routers::{Router, RouterError, RouterResponse};
use async_trait::async_trait;
use autopush_common::db::DynamoDbUser;
use cadence::{Counted, CountedExt, StatsdClient};
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use reqwest::{Response, StatusCode};
use serde_json::Value;
use std::collections::hash_map::RandomState;
Expand Down Expand Up @@ -73,10 +73,16 @@ impl Router for WebPushRouter {
}

if notification.headers.ttl == 0 {
let topic = notification.headers.topic.is_some().to_string();
trace!(
"Notification has a TTL of zero and was not successfully \
delivered, dropping it"
);
self.metrics
.incr_with_tags("notification.message.expired")
// TODO: include `internal` if meta is set.
.with_tag("topic", &topic)
.send();
return Ok(self.make_delivered_response(notification));
}

Expand Down Expand Up @@ -117,6 +123,16 @@ impl Router for WebPushRouter {
trace!("Response = {:?}", response);
if response.status() == 200 {
trace!("Node has delivered the message");
self.metrics
.time_with_tags(
"notification.total_request_time",
(notification.timestamp - autopush_common::util::sec_since_epoch())
* 1000,
)
.with_tag("platform", "websocket")
.with_tag("app_id", "direct")
.send();

Ok(self.make_delivered_response(notification))
} else {
trace!("Node has not delivered the message, returning stored response");
Expand Down
12 changes: 12 additions & 0 deletions autopush-common/src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn list_tables_sync(
.chain_err(|| "Unable to list tables")
}

/// Pull all pending messages for the user from storage
pub fn fetch_messages(
ddb: DynamoDbClient,
metrics: Arc<StatsdClient>,
Expand Down Expand Up @@ -136,6 +137,8 @@ pub fn fetch_messages(
})
}

/// Pull messages older than a given timestamp for a given user.
/// This also returns the latest message timestamp.
pub fn fetch_timestamp_messages(
ddb: DynamoDbClient,
metrics: Arc<StatsdClient>,
Expand Down Expand Up @@ -191,6 +194,7 @@ pub fn fetch_timestamp_messages(
})
}

/// Drop all user information from the Router table.
pub fn drop_user(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand All @@ -208,6 +212,7 @@ pub fn drop_user(
.chain_err(|| "Error dropping user")
}

/// Get the user information from the Router table.
pub fn get_uaid(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand All @@ -223,6 +228,7 @@ pub fn get_uaid(
.chain_err(|| "Error fetching user")
}

/// Register a user into the Router table.
pub fn register_user(
ddb: DynamoDbClient,
user: &DynamoDbUser,
Expand Down Expand Up @@ -264,6 +270,8 @@ pub fn register_user(
.chain_err(|| "Error storing user record")
}

/// Update the user's message month (Note: This is legacy for DynamoDB, but may still
/// be used by Stand Alone systems.)
pub fn update_user_message_month(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -294,6 +302,7 @@ pub fn update_user_message_month(
.chain_err(|| "Error updating user message month")
}

/// Return all known Channels for a given User.
pub fn all_channels(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -324,6 +333,7 @@ pub fn all_channels(
.or_else(|_err| future::ok(HashSet::new()))
}

/// Save the current list of Channels for a given user.
pub fn save_channels(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -357,6 +367,7 @@ pub fn save_channels(
.chain_err(|| "Error saving channels")
}

/// Remove a specific channel from the list of known channels for a given User
pub fn unregister_channel_id(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -385,6 +396,7 @@ pub fn unregister_channel_id(
.chain_err(|| "Error unregistering channel")
}

/// Respond with user information for a given user.
#[allow(clippy::too_many_arguments)]
pub fn lookup_user(
ddb: DynamoDbClient,
Expand Down
41 changes: 37 additions & 4 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env;
use std::sync::Arc;
use uuid::Uuid;

use cadence::StatsdClient;
use cadence::{Counted, CountedExt, StatsdClient};
use futures::{future, Future};
use futures_backoff::retry_if;
use rusoto_core::{HttpClient, Region};
Expand Down Expand Up @@ -322,19 +322,26 @@ impl DynamoStorage {
message_month: String,
message: Notification,
) -> impl Future<Item = (), Error = Error> {
let topic = message.topic.is_some().to_string();
let ddb = self.ddb.clone();
let put_item = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, message))
.unwrap(),
table_name: message_month,
..Default::default()
};

let metrics = self.metrics.clone();
retry_if(
move || ddb.put_item(put_item.clone()),
retryable_putitem_error,
)
.and_then(|_| future::ok(()))
.and_then(move |_| {
let mut metric = metrics.incr_with_tags("notification.message.stored");
// TODO: include `internal` if meta is set.
metric = metric.with_tag("topic", &topic);
metric.send();
future::ok(())
})
.chain_err(|| "Error saving notification")
}

Expand All @@ -346,9 +353,15 @@ impl DynamoStorage {
messages: Vec<Notification>,
) -> impl Future<Item = (), Error = Error> {
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let put_items: Vec<WriteRequest> = messages
.into_iter()
.filter_map(|n| {
// eventually include `internal` if `meta` defined.
metrics
.incr_with_tags("notification.message.stored")
.with_tag("topic", &n.topic.is_some().to_string())
.send();
serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, n))
.ok()
.map(|hm| WriteRequest {
Expand Down Expand Up @@ -386,7 +399,9 @@ impl DynamoStorage {
uaid: &Uuid,
notif: &Notification,
) -> impl Future<Item = (), Error = Error> {
let topic = notif.topic.is_some().to_string();
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let delete_input = DeleteItemInput {
table_name: table_name.to_string(),
key: ddb_item! {
Expand All @@ -400,10 +415,17 @@ impl DynamoStorage {
move || ddb.delete_item(delete_input.clone()),
retryable_delete_error,
)
.and_then(|_| future::ok(()))
.and_then(move |_| {
let mut metric = metrics.incr_with_tags("notification.message.deleted");
// TODO: include `internal` if meta is set.
metric = metric.with_tag("topic", &topic);
metric.send();
future::ok(())
})
.chain_err(|| "Error deleting notification")
}

/// Check to see if we have pending messages and return them if we do.
pub fn check_storage(
&self,
table_name: &str,
Expand All @@ -426,11 +448,16 @@ impl DynamoStorage {
let table_name = table_name.to_string();
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let rmetrics = metrics.clone();

response.and_then(move |resp| -> MyFuture<_> {
// Return now from this future if we have messages
if !resp.messages.is_empty() {
debug!("Topic message returns: {:?}", resp.messages);
rmetrics
.count_with_tags("notification.message.retrieved", resp.messages.len() as i64)
.with_tag("topic", "true")
.send();
return Box::new(future::ok(CheckStorageResponse {
include_topic: true,
messages: resp.messages,
Expand Down Expand Up @@ -461,6 +488,10 @@ impl DynamoStorage {
// If we didn't get a timestamp off the last query, use the original
// value if passed one
let timestamp = resp.timestamp.or(timestamp);
rmetrics
.count_with_tags("notification.message.retrieved", resp.messages.len() as i64)
.with_tag("topic", "false")
.send();
Ok(CheckStorageResponse {
include_topic: false,
messages: resp.messages,
Expand Down Expand Up @@ -532,6 +563,8 @@ impl DynamoStorage {
}
}

/// Get the list of current, valid message tables (Note: This is legacy for DynamoDB, but may still
/// be used for Stand Alone systems )
pub fn list_message_tables(ddb: &DynamoDbClient, prefix: &str) -> Result<Vec<String>> {
let mut names: Vec<String> = Vec::new();
let mut start_key = None;
Expand Down
20 changes: 19 additions & 1 deletion autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Default for DynamoDbUser {
}
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct DynamoDbNotification {
// DynamoDB <Hash key>
#[serde(serialize_with = "uuid_serializer")]
Expand Down Expand Up @@ -145,6 +145,24 @@ pub struct DynamoDbNotification {
updateid: Option<String>,
}

/// Ensure that the default for 'stored' is true.
impl Default for DynamoDbNotification {
fn default() -> Self {
Self {
uaid: Uuid::default(),
chidmessageid: String::default(),
current_timestamp: None,
chids: None,
timestamp: None,
expiry: 0,
ttl: None,
data: None,
headers: None,
updateid: None,
}
}
}

impl DynamoDbNotification {
fn parse_sort_key(key: &str) -> Result<RangeKey> {
lazy_static! {
Expand Down
Loading

0 comments on commit 65ac1a3

Please sign in to comment.