Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add additional metrics for message tracking [CONSVC-1660] #330

Merged
merged 19 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ jobs:
name: Check formatting
# Rust 1.65+ is stricter about dead_code and flags auto-generated methods created by
# state_machine_future. Since these are auto-generated, it's not possible to tag them as
# allowed. Adding `-A dead_code` until state_machine_future is removed.
# allowed. Adding `--allow=dead_code` until state_machine_future is removed.
command: |
cargo fmt -- --check
cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro -A dead_code
cargo clippy --all --all-targets --all-features -- -D warnings --deny=clippy::dbg_macro --allow=dead_code
- run:
name: Integration tests
command: py.test -v
Expand Down
12 changes: 10 additions & 2 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use autopush_common::db::{DynamoDbNotification, DynamoDbUser};
use autopush_common::notification::Notification;
use autopush_common::util::sec_since_epoch;
use autopush_common::{ddb_item, hashmap, val};
use cadence::StatsdClient;
use cadence::{CountedExt, StatsdClient};
use rusoto_core::credential::StaticProvider;
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_dynamodb::{
Expand Down Expand Up @@ -270,6 +270,7 @@ impl DbClient for DbClientImpl {
Ok(())
}

// Return the list of active channelIDs for a given user.
async fn get_channels(&self, uaid: Uuid) -> DbResult<HashSet<Uuid>> {
// Channel IDs are stored in a special row in the message table, where
// chidmessageid = " "
Expand Down Expand Up @@ -366,6 +367,7 @@ impl DbClient for DbClientImpl {
}

async fn save_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> {
let topic = message.topic.is_some().to_string();
let input = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?,
table_name: self.message_table.clone(),
Expand All @@ -378,7 +380,13 @@ impl DbClient for DbClientImpl {
retryable_putitem_error(self.metrics.clone()),
)
.await?;

{
// Build the metric report
let mut metric = self.metrics.incr_with_tags("notification.message.stored");
metric = metric.with_tag("topic", &topic);
// TODO: include `internal` if meta is set.
metric.send();
}
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use uuid::Uuid;
/// Extracts notification data from `Subscription` and request data
#[derive(Clone, Debug)]
pub struct Notification {
/// Unique message_id for this notification
pub message_id: String,
/// The subscription information block
pub subscription: Subscription,
/// Set of associated crypto headers
pub headers: NotificationHeaders,
/// UNIX timestamp in seconds
pub timestamp: u64,
/// UNIX timestamp in milliseconds
pub sort_key_timestamp: u64,
/// The encrypted notification body
pub data: Option<String>,
}

Expand Down
10 changes: 9 additions & 1 deletion autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::extractors::notification::Notification;
use crate::routers::RouterError;
use actix_web::http::StatusCode;
use autopush_common::util::InsertOpt;
use cadence::{Counted, CountedExt, StatsdClient};
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use std::collections::HashMap;
use uuid::Uuid;

Expand Down Expand Up @@ -173,6 +173,14 @@ pub fn incr_success_metrics(
.with_tag("app_id", app_id)
.with_tag("destination", "Direct")
.send();
metrics
.time_with_tags(
"notification.total_request_time",
(autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000,
)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.send();
}

/// Common router test code
Expand Down
18 changes: 17 additions & 1 deletion autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::extractors::router_data_input::RouterDataInput;
use crate::routers::{Router, RouterError, RouterResponse};
use async_trait::async_trait;
use autopush_common::db::DynamoDbUser;
use cadence::{Counted, CountedExt, StatsdClient};
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use reqwest::{Response, StatusCode};
use serde_json::Value;
use std::collections::hash_map::RandomState;
Expand Down Expand Up @@ -73,10 +73,16 @@ impl Router for WebPushRouter {
}

if notification.headers.ttl == 0 {
let topic = notification.headers.topic.is_some().to_string();
trace!(
"Notification has a TTL of zero and was not successfully \
delivered, dropping it"
);
self.metrics
.incr_with_tags("notification.message.expired")
// TODO: include `internal` if meta is set.
.with_tag("topic", &topic)
.send();
return Ok(self.make_delivered_response(notification));
}

Expand Down Expand Up @@ -117,6 +123,16 @@ impl Router for WebPushRouter {
trace!("Response = {:?}", response);
if response.status() == 200 {
trace!("Node has delivered the message");
self.metrics
.time_with_tags(
"notification.total_request_time",
(notification.timestamp - autopush_common::util::sec_since_epoch())
* 1000,
)
.with_tag("platform", "websocket")
.with_tag("app_id", "direct")
.send();

Ok(self.make_delivered_response(notification))
} else {
trace!("Node has not delivered the message, returning stored response");
Expand Down
12 changes: 12 additions & 0 deletions autopush-common/src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn list_tables_sync(
.chain_err(|| "Unable to list tables")
}

/// Pull all pending messages for the user from storage
pub fn fetch_messages(
ddb: DynamoDbClient,
metrics: Arc<StatsdClient>,
Expand Down Expand Up @@ -136,6 +137,8 @@ pub fn fetch_messages(
})
}

/// Pull messages older than a given timestamp for a given user.
/// This also returns the latest message timestamp.
pub fn fetch_timestamp_messages(
ddb: DynamoDbClient,
metrics: Arc<StatsdClient>,
Expand Down Expand Up @@ -191,6 +194,7 @@ pub fn fetch_timestamp_messages(
})
}

/// Drop all user information from the Router table.
pub fn drop_user(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand All @@ -208,6 +212,7 @@ pub fn drop_user(
.chain_err(|| "Error dropping user")
}

/// Get the user information from the Router table.
pub fn get_uaid(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand All @@ -223,6 +228,7 @@ pub fn get_uaid(
.chain_err(|| "Error fetching user")
}

/// Register a user into the Router table.
pub fn register_user(
ddb: DynamoDbClient,
user: &DynamoDbUser,
Expand Down Expand Up @@ -264,6 +270,8 @@ pub fn register_user(
.chain_err(|| "Error storing user record")
}

/// Update the user's message month (Note: This is legacy for DynamoDB, but may still
/// be used by Stand Alone systems.)
pub fn update_user_message_month(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -294,6 +302,7 @@ pub fn update_user_message_month(
.chain_err(|| "Error updating user message month")
}

/// Return all known Channels for a given User.
pub fn all_channels(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -324,6 +333,7 @@ pub fn all_channels(
.or_else(|_err| future::ok(HashSet::new()))
}

/// Save the current list of Channels for a given user.
pub fn save_channels(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -357,6 +367,7 @@ pub fn save_channels(
.chain_err(|| "Error saving channels")
}

/// Remove a specific channel from the list of known channels for a given User
pub fn unregister_channel_id(
ddb: DynamoDbClient,
uaid: &Uuid,
Expand Down Expand Up @@ -385,6 +396,7 @@ pub fn unregister_channel_id(
.chain_err(|| "Error unregistering channel")
}

/// Respond with user information for a given user.
#[allow(clippy::too_many_arguments)]
pub fn lookup_user(
ddb: DynamoDbClient,
Expand Down
41 changes: 37 additions & 4 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env;
use std::sync::Arc;
use uuid::Uuid;

use cadence::StatsdClient;
use cadence::{Counted, CountedExt, StatsdClient};
use futures::{future, Future};
use futures_backoff::retry_if;
use rusoto_core::{HttpClient, Region};
Expand Down Expand Up @@ -322,19 +322,26 @@ impl DynamoStorage {
message_month: String,
message: Notification,
) -> impl Future<Item = (), Error = Error> {
let topic = message.topic.is_some().to_string();
let ddb = self.ddb.clone();
let put_item = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, message))
.unwrap(),
table_name: message_month,
..Default::default()
};

let metrics = self.metrics.clone();
retry_if(
move || ddb.put_item(put_item.clone()),
retryable_putitem_error,
)
.and_then(|_| future::ok(()))
.and_then(move |_| {
let mut metric = metrics.incr_with_tags("notification.message.stored");
// TODO: include `internal` if meta is set.
metric = metric.with_tag("topic", &topic);
metric.send();
future::ok(())
})
.chain_err(|| "Error saving notification")
}

Expand All @@ -346,9 +353,15 @@ impl DynamoStorage {
messages: Vec<Notification>,
) -> impl Future<Item = (), Error = Error> {
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let put_items: Vec<WriteRequest> = messages
.into_iter()
.filter_map(|n| {
// eventually include `internal` if `meta` defined.
metrics
.incr_with_tags("notification.message.stored")
.with_tag("topic", &n.topic.is_some().to_string())
.send();
serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(uaid, n))
.ok()
.map(|hm| WriteRequest {
Expand Down Expand Up @@ -386,7 +399,9 @@ impl DynamoStorage {
uaid: &Uuid,
notif: &Notification,
) -> impl Future<Item = (), Error = Error> {
let topic = notif.topic.is_some().to_string();
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let delete_input = DeleteItemInput {
table_name: table_name.to_string(),
key: ddb_item! {
Expand All @@ -400,10 +415,17 @@ impl DynamoStorage {
move || ddb.delete_item(delete_input.clone()),
retryable_delete_error,
)
.and_then(|_| future::ok(()))
.and_then(move |_| {
let mut metric = metrics.incr_with_tags("notification.message.deleted");
// TODO: include `internal` if meta is set.
metric = metric.with_tag("topic", &topic);
metric.send();
future::ok(())
})
.chain_err(|| "Error deleting notification")
}

/// Check to see if we have pending messages and return them if we do.
pub fn check_storage(
&self,
table_name: &str,
Expand All @@ -426,11 +448,16 @@ impl DynamoStorage {
let table_name = table_name.to_string();
let ddb = self.ddb.clone();
let metrics = self.metrics.clone();
let rmetrics = metrics.clone();

response.and_then(move |resp| -> MyFuture<_> {
// Return now from this future if we have messages
if !resp.messages.is_empty() {
debug!("Topic message returns: {:?}", resp.messages);
rmetrics
.count_with_tags("notification.message.retrieved", resp.messages.len() as i64)
.with_tag("topic", "true")
.send();
return Box::new(future::ok(CheckStorageResponse {
include_topic: true,
messages: resp.messages,
Expand Down Expand Up @@ -461,6 +488,10 @@ impl DynamoStorage {
// If we didn't get a timestamp off the last query, use the original
// value if passed one
let timestamp = resp.timestamp.or(timestamp);
rmetrics
.count_with_tags("notification.message.retrieved", resp.messages.len() as i64)
.with_tag("topic", "false")
.send();
Ok(CheckStorageResponse {
include_topic: false,
messages: resp.messages,
Expand Down Expand Up @@ -532,6 +563,8 @@ impl DynamoStorage {
}
}

/// Get the list of current, valid message tables (Note: This is legacy for DynamoDB, but may still
/// be used for Stand Alone systems )
pub fn list_message_tables(ddb: &DynamoDbClient, prefix: &str) -> Result<Vec<String>> {
let mut names: Vec<String> = Vec::new();
let mut start_key = None;
Expand Down
20 changes: 19 additions & 1 deletion autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Default for DynamoDbUser {
}
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct DynamoDbNotification {
// DynamoDB <Hash key>
#[serde(serialize_with = "uuid_serializer")]
Expand Down Expand Up @@ -145,6 +145,24 @@ pub struct DynamoDbNotification {
updateid: Option<String>,
}

/// Ensure that the default for 'stored' is true.
impl Default for DynamoDbNotification {
fn default() -> Self {
Self {
uaid: Uuid::default(),
chidmessageid: String::default(),
current_timestamp: None,
chids: None,
timestamp: None,
expiry: 0,
ttl: None,
data: None,
headers: None,
updateid: None,
}
}
}

impl DynamoDbNotification {
fn parse_sort_key(key: &str) -> Result<RangeKey> {
lazy_static! {
Expand Down
Loading