Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add database tracking and report for Push Reliability #769

Merged
merged 15 commits into from
Oct 21, 2024
Merged
5 changes: 5 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ impl FromStr for ClientMessage {
}
}

/// Returned ACKnowledgement of the received message by the User Agent.
/// This is the payload for the `messageType:ack` packet.
///
#[derive(Debug, Deserialize)]
pub struct ClientAck {
// The channel_id which received messages
#[serde(rename = "channelID")]
pub channel_id: Uuid,
// The corresponding version number for the message.
pub version: String,
}

Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl AppState {
db_settings: settings.db_settings.clone(),
};
let storage_type = StorageType::from_dsn(&db_settings.dsn);

#[allow(unused)]
let db: Box<dyn DbClient> = match storage_type {
#[cfg(feature = "bigtable")]
Expand Down
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ ctor.workspace = true
tokio.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these features will be used in the next PR.

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ async-stream = "0.3"
ctor.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ tokio.workspace = true
serde_json.workspace = true

autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl WebPushClient {
// Get the stored notification record.
let n = &self.ack_state.unacked_stored_notifs[pos];
debug!("✅ Ack notif: {:?}", &n);
// TODO: Record "ack'd" reliability_id, if present.
// Only force delete Topic messages, since they don't have a timestamp.
// Other messages persist in the database, to be, eventually, cleaned up by their
// TTL. We will need to update the `CurrentTimestamp` field for the channel
Expand Down
20 changes: 16 additions & 4 deletions autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub enum ApiErrorKind {
#[error(transparent)]
Jwt(#[from] jsonwebtoken::errors::Error),

#[error(transparent)]
Serde(#[from] serde_json::Error),

#[error(transparent)]
ReqwestError(#[from] reqwest::Error),

#[error("Error while validating token")]
TokenHashValidation(#[source] openssl::error::ErrorStack),

Expand Down Expand Up @@ -143,6 +149,7 @@ impl ApiErrorKind {

ApiErrorKind::VapidError(_)
| ApiErrorKind::Jwt(_)
| ApiErrorKind::Serde(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::InvalidAuthentication
| ApiErrorKind::InvalidLocalAuth(_) => StatusCode::UNAUTHORIZED,
Expand All @@ -161,7 +168,8 @@ impl ApiErrorKind {
| ApiErrorKind::Io(_)
| ApiErrorKind::Metrics(_)
| ApiErrorKind::EndpointUrl(_)
| ApiErrorKind::RegistrationSecretHash(_) => StatusCode::INTERNAL_SERVER_ERROR,
| ApiErrorKind::RegistrationSecretHash(_)
| ApiErrorKind::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -179,7 +187,7 @@ impl ApiErrorKind {
ApiErrorKind::InvalidMessageId => "invalid_message_id",

ApiErrorKind::VapidError(_) => "vapid_error",
ApiErrorKind::Jwt(_) => "jwt",
ApiErrorKind::Jwt(_) | ApiErrorKind::Serde(_) => "jwt",
ApiErrorKind::TokenHashValidation(_) => "token_hash_validation",
ApiErrorKind::InvalidAuthentication => "invalid_authentication",
ApiErrorKind::InvalidLocalAuth(_) => "invalid_local_auth",
Expand All @@ -199,6 +207,7 @@ impl ApiErrorKind {
ApiErrorKind::Conditional(_) => "conditional",
ApiErrorKind::EndpointUrl(e) => return e.metric_label(),
ApiErrorKind::RegistrationSecretHash(_) => "registration_secret_hash",
ApiErrorKind::ReqwestError(_) => "reqwest",
})
}

Expand All @@ -221,7 +230,8 @@ impl ApiErrorKind {
// Ignore oversized payload.
ApiErrorKind::PayloadError(_) |
ApiErrorKind::Validation(_) |
ApiErrorKind::Conditional(_) => false,
ApiErrorKind::Conditional(_) |
ApiErrorKind::ReqwestError(_) => false,
_ => true,
}
}
Expand Down Expand Up @@ -251,6 +261,7 @@ impl ApiErrorKind {
ApiErrorKind::VapidError(_)
| ApiErrorKind::TokenHashValidation(_)
| ApiErrorKind::Jwt(_)
| ApiErrorKind::Serde(_)
| ApiErrorKind::InvalidAuthentication
| ApiErrorKind::InvalidLocalAuth(_) => Some(109),

Expand All @@ -269,7 +280,8 @@ impl ApiErrorKind {
| ApiErrorKind::InvalidRouterToken
| ApiErrorKind::RegistrationSecretHash(_)
| ApiErrorKind::EndpointUrl(_)
| ApiErrorKind::InvalidMessageId => None,
| ApiErrorKind::InvalidMessageId
| ApiErrorKind::ReqwestError(_) => None,
}
}
}
Expand Down
24 changes: 14 additions & 10 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{ApiError, ApiErrorKind};
use crate::error::{ApiError, ApiErrorKind, ApiResult};
use crate::extractors::{
message_id::MessageId, notification_headers::NotificationHeaders, subscription::Subscription,
};
Expand Down Expand Up @@ -103,6 +103,7 @@ impl From<Notification> for autopush_common::notification::Notification {
timestamp: notification.timestamp,
data: notification.data,
sortkey_timestamp,
reliability_id: notification.subscription.reliability_id,
headers: {
let headers: HashMap<String, String> = notification.headers.into();
if headers.is_empty() {
Expand Down Expand Up @@ -160,25 +161,28 @@ impl Notification {
/// fields are still required when delivering to the connection server, so
/// we can't simply convert this notification type to that one and serialize
/// via serde.
pub fn serialize_for_delivery(&self) -> HashMap<&'static str, serde_json::Value> {
pub fn serialize_for_delivery(&self) -> ApiResult<HashMap<&'static str, serde_json::Value>> {
let mut map = HashMap::new();

map.insert(
"channelID",
serde_json::to_value(self.subscription.channel_id).unwrap(),
serde_json::to_value(self.subscription.channel_id)?,
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
);
map.insert("version", serde_json::to_value(&self.message_id).unwrap());
map.insert("ttl", serde_json::to_value(self.headers.ttl).unwrap());
map.insert("topic", serde_json::to_value(&self.headers.topic).unwrap());
map.insert("timestamp", serde_json::to_value(self.timestamp).unwrap());
map.insert("version", serde_json::to_value(&self.message_id)?);
map.insert("ttl", serde_json::to_value(self.headers.ttl)?);
map.insert("topic", serde_json::to_value(&self.headers.topic)?);
map.insert("timestamp", serde_json::to_value(self.timestamp)?);
if let Some(reliability_id) = &self.subscription.reliability_id {
map.insert("reliability_id", serde_json::to_value(reliability_id)?);
}

if let Some(data) = &self.data {
map.insert("data", serde_json::to_value(data).unwrap());
map.insert("data", serde_json::to_value(data)?);

let headers: HashMap<_, _> = self.headers.clone().into();
map.insert("headers", serde_json::to_value(headers).unwrap());
map.insert("headers", serde_json::to_value(headers)?);
}

map
Ok(map)
}
}
19 changes: 9 additions & 10 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Subscription {
/// (This should ONLY be applied for messages that match known
/// Mozilla provided VAPID public keys.)
///
pub tracking_id: Option<String>,
pub reliability_id: Option<String>,
}

impl FromRequest for Subscription {
Expand Down Expand Up @@ -73,11 +73,13 @@ impl FromRequest for Subscription {
.transpose()?;

trace!("raw vapid: {:?}", &vapid);
let trackable = if let Some(vapid) = &vapid {
app_state.reliability.is_trackable(vapid)
} else {
false
};
let reliability_id: Option<String> = vapid.as_ref().and_then(|v| {
app_state
.vapid_tracker
.is_trackable(v)
.then(|| app_state.vapid_tracker.get_id(req.headers()))
});
debug!("🔍 Assigning Reliability ID: {reliability_id:?}");

// Capturing the vapid sub right now will cause too much cardinality. Instead,
// let's just capture if we have a valid VAPID, as well as what sort of bad sub
Expand Down Expand Up @@ -132,14 +134,11 @@ impl FromRequest for Subscription {
.incr(&format!("updates.vapid.draft{:02}", vapid.vapid.version()))?;
}

let tracking_id =
trackable.then(|| app_state.reliability.get_tracking_id(req.headers()));

Ok(Subscription {
user,
channel_id,
vapid,
tracking_id,
reliability_id,
})
}
.boxed_local()
Expand Down
8 changes: 7 additions & 1 deletion autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub fn build_message_data(notification: &Notification) -> ApiResult<HashMap<&'st
message_data.insert_opt("enc", notification.headers.encryption.as_ref());
message_data.insert_opt("cryptokey", notification.headers.crypto_key.as_ref());
message_data.insert_opt("enckey", notification.headers.encryption_key.as_ref());
// Report the data to the UA. How this value is reported back is still a work in progress.
trace!(
"🔍 Sending Reliability ID: {:?}",
notification.subscription.reliability_id
);
message_data.insert_opt("rid", notification.subscription.reliability_id.as_ref());
}

Ok(message_data)
Expand Down Expand Up @@ -239,7 +245,7 @@ pub mod tests {
user,
channel_id: channel_id(),
vapid: None,
tracking_id: None,
reliability_id: None,
},
headers: NotificationHeaders {
ttl: 0,
Expand Down
18 changes: 10 additions & 8 deletions autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ impl Router for WebPushRouter {
);
}
Err(error) => {
if error.is_timeout() {
self.metrics.incr("error.node.timeout")?;
if let ApiErrorKind::ReqwestError(error) = &error.kind {
if error.is_timeout() {
self.metrics.incr("error.node.timeout")?;
};
if error.is_connect() {
self.metrics.incr("error.node.connect")?;
};
};
if error.is_connect() {
self.metrics.incr("error.node.connect")?;
}
debug!("✉ Error while sending webpush notification: {}", error);
self.remove_node_id(user, node_id).await?
}
Expand Down Expand Up @@ -177,11 +179,11 @@ impl WebPushRouter {
&self,
notification: &Notification,
node_id: &str,
) -> Result<Response, reqwest::Error> {
) -> ApiResult<Response> {
let url = format!("{}/push/{}", node_id, notification.subscription.user.uaid);
let notification = notification.serialize_for_delivery();
let notification = notification.serialize_for_delivery()?;

self.http.put(&url).json(&notification).send().await
Ok(self.http.put(&url).json(&notification).send().await?)
}

/// Notify the node to check for notifications for the user
Expand Down
6 changes: 3 additions & 3 deletions autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AppState {
pub apns_router: Arc<ApnsRouter>,
#[cfg(feature = "stub")]
pub stub_router: Arc<StubRouter>,
pub reliability: Arc<VapidTracker>,
pub vapid_tracker: Arc<VapidTracker>,
}

pub struct Server;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Server {
)
.await?,
);
let reliability = Arc::new(VapidTracker(settings.tracking_keys()));
let vapid_tracker = Arc::new(VapidTracker(settings.tracking_keys()));
#[cfg(feature = "stub")]
let stub_router = Arc::new(StubRouter::new(settings.stub.clone())?);
let app_state = AppState {
Expand All @@ -122,7 +122,7 @@ impl Server {
apns_router,
#[cfg(feature = "stub")]
stub_router,
reliability,
vapid_tracker,
};

spawn_pool_periodic_reporter(
Expand Down
31 changes: 21 additions & 10 deletions autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ impl Settings {
// public key, but that may not always be true.
pub fn tracking_keys(&self) -> Vec<String> {
let keys = &self.tracking_keys.replace(['"', ' '], "");
Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned())
.collect()
let result = Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS")
.map(|v| v.to_owned().replace("=", ""))
.collect();
trace!("🔍 tracking_keys: {result:?}");
result
}

/// Get the URL for this endpoint server
Expand All @@ -193,11 +195,20 @@ impl VapidTracker {
pub fn is_trackable(&self, vapid: &VapidHeaderWithKey) -> bool {
// ideally, [Settings.with_env_and_config_file()] does the work of pre-populating
// the Settings.tracking_vapid_pubs cache, but we can't rely on that.
self.0.contains(&vapid.public_key)
let key = vapid.public_key.replace('=', "");
let result = self.0.contains(&key);
debug!("🔍 Checking {key} {}", {
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
if result {
"Match!"
} else {
"no match"
}
});
result
}

/// Extract the message Id from the headers (if present), otherwise just make one up.
pub fn get_tracking_id(&self, headers: &HeaderMap) -> String {
pub fn get_id(&self, headers: &HeaderMap) -> String {
headers
.get("X-MessageId")
.and_then(|v|
Expand Down Expand Up @@ -304,7 +315,7 @@ mod tests {
#[test]
fn test_tracking_keys() -> ApiResult<()> {
let settings = Settings{
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI"]"#.to_owned(),
tracking_keys: r#"["BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7"]"#.to_owned(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified to check for padding stripping.

Copy link
Contributor

@taddes taddes Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one have the "==" added to end as well, with exclusion of vI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were decoding the key from base64, yes. We're currently not doing that so just tweaking the string values to ensure that they match (with and without the padding) should be fine.

If we ever decide to decode these strings and do a byte comparison of the decoded pairs, then we would have to revisit this test, but that's out of scope for this PR.

..Default::default()
};

Expand All @@ -314,7 +325,7 @@ mod tests {
token: "".to_owned(),
version_data: crate::headers::vapid::VapidVersionData::Version1,
},
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7vI".to_owned()
public_key: "BLMymkOqvT6OZ1o9etCqV4jGPkvOXNz5FdBjsAR9zR5oeCV1x5CBKuSLTlHon-H_boHTzMtMoNHsAGDlDB6X7==".to_owned()
};

let key_set = settings.tracking_keys();
Expand All @@ -327,20 +338,20 @@ mod tests {
}

#[test]
fn test_tracking_id() -> ApiResult<()> {
fn test_reliability_id() -> ApiResult<()> {
let mut headers = HeaderMap::new();
let keys = Vec::new();
let reliability = VapidTracker(keys);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert!(!key.is_empty());

headers.insert(
HeaderName::from_lowercase(b"x-messageid").unwrap(),
HeaderValue::from_static("123foobar456"),
);

let key = reliability.get_tracking_id(&headers);
let key = reliability.get_id(&headers);
assert_eq!(key, "123foobar456".to_owned());

Ok(())
Expand Down
Loading