Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add redis based reliability reporting #778

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a8b64b7
feat: Add database tracking and report for Push Reliability
jrconlin Sep 23, 2024
dc1763a
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Sep 25, 2024
14bd4fb
f lint
jrconlin Sep 25, 2024
e914b14
f remove extra from pending pr
jrconlin Sep 25, 2024
4f25172
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 2, 2024
e9ff9f5
f r's
jrconlin Oct 2, 2024
c9a3512
f r's
jrconlin Oct 2, 2024
f8c7ee9
f lint
jrconlin Oct 2, 2024
1dd624f
f post test
jrconlin Oct 8, 2024
05e73cc
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 8, 2024
8291039
f r's
jrconlin Oct 9, 2024
ed67eb4
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 11, 2024
6c2c7d4
feat: Add redis based reliability reporting
jrconlin Oct 8, 2024
3a0beef
f isort
jrconlin Oct 17, 2024
efb1cf5
f add documentation
jrconlin Oct 17, 2024
c658a09
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 17, 2024
588a4ea
Merge branch 'master' into feat/SYNC-4324_track_db
jrconlin Oct 17, 2024
1c7e26a
f r's
jrconlin Oct 18, 2024
07e7db3
f r's
jrconlin Oct 18, 2024
950c996
Merge branch 'feat/SYNC-4324_track_db' into feat/SYNC-4327_redis
jrconlin Oct 21, 2024
853268c
f post merge fix
jrconlin Oct 21, 2024
28bd921
f add metric logging to reliability_cron.py
jrconlin Oct 22, 2024
e465b36
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Oct 24, 2024
148ac9b
Merge branch 'master' into feat/SYNC-4327_redis
jrconlin Oct 31, 2024
b42ea79
Merge branch 'master' into feat/SYNC-4327_redis
rachaelcrook Nov 6, 2024
7fea96c
Merge branch 'master' into feat/SYNC-4327_redis
pjenvey Nov 13, 2024
d9824f3
f r's
jrconlin Nov 22, 2024
62ecdc7
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Nov 22, 2024
3de403d
f fix tests
jrconlin Nov 22, 2024
51d7f7f
f r's
jrconlin Dec 6, 2024
b86ef6b
f autocomplete
jrconlin Dec 6, 2024
b0d1984
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 17, 2024
1f15f74
f r's
jrconlin Dec 17, 2024
76b02dd
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 17, 2024
1bf49d2
f fix up pymarks
jrconlin Dec 18, 2024
92eb64f
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Dec 18, 2024
9c7b13c
f switch to pytest.ini
jrconlin Dec 18, 2024
2243e8a
f force the config file
jrconlin Dec 18, 2024
15c5e46
f move pytest to integration root
jrconlin Dec 19, 2024
e7939f6
f use the one in /code?
jrconlin Dec 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ jobs:
username: $DOCKER_USER
password: $DOCKER_PASS
command: gcloud beta emulators bigtable start --host-port=localhost:8086
# - image: redis/redis-stack-server
# auth:
# username: $DOCKER_USER
# password: $DOCKER_PASS
resource_class: large
environment:
BIGTABLE_EMULATOR_HOST: localhost:8086
Expand Down
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ default = ["bigtable"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
emulator = ["bigtable"]
log_vapid = []
reliable_report = [
"autoconnect_settings/reliable_report",
"autoconnect_web/reliable_report",
"autoconnect_ws/reliable_report",
"autopush_common/reliable_report",
]
4 changes: 2 additions & 2 deletions autoconnect/autoconnect-common/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ impl BroadcastChangeTracker {
}
*ver = broadcast.version;
} else {
trace!("📢 Not found: {}", &b_id);
trace!("📢 Not found: {b_id}");
return Err(ApcErrorKind::BroadcastError("Broadcast not found".into()).into());
}

trace!("📢 New version of {}", &b_id);
trace!("📢 New version of {b_id}");
// Check to see if this broadcast has been updated since initialization
let bcast_index = self
.broadcast_list
Expand Down
2 changes: 2 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub struct ClientAck {
pub channel_id: Uuid,
// The corresponding version number for the message.
pub version: String,
#[serde(default)]
pub reliability_id: Option<String>,
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ autopush_common.workspace = true
# specify the default via the calling crate, in order to simplify default chains.
bigtable = ["autopush_common/bigtable"]
emulator = ["bigtable"]
reliable_report = ["autopush_common/reliable_report"]
14 changes: 14 additions & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use autoconnect_common::{
registry::ClientRegistry,
};
use autopush_common::db::{client::DbClient, DbSettings, StorageType};
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;

use crate::{Settings, ENV_PREFIX};

Expand All @@ -32,6 +34,9 @@ pub struct AppState {
pub settings: Settings,
pub router_url: String,
pub endpoint_url: String,

#[cfg(feature = "reliable_report")]
pub reliability: Arc<PushReliability>,
}

impl AppState {
Expand Down Expand Up @@ -84,6 +89,13 @@ impl AppState {
ENV_PREFIX.to_uppercase()
),
};

#[cfg(feature = "reliable_report")]
let reliability = Arc::new(
PushReliability::new(&settings.reliability_dsn, &Some(db.clone())).map_err(|e| {
ConfigError::Message(format!("Could not start Reliability connection: {:?}", e))
})?,
);
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
Expand All @@ -103,6 +115,8 @@ impl AppState {
settings,
router_url,
endpoint_url,
#[cfg(feature = "reliable_report")]
reliability,
})
}

Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct Settings {
///
/// By default, the number of available physical CPUs is used as the worker count.
pub actix_workers: Option<usize>,
#[cfg(feature = "reliable_report")]
pub reliability_dsn: Option<String>,
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for Settings {
Expand Down Expand Up @@ -139,6 +141,8 @@ impl Default for Settings {
msg_limit: 150,
actix_max_connections: None,
actix_workers: None,
#[cfg(feature = "reliable_report")]
reliability_dsn: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ tokio.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws/reliable_report",
]
49 changes: 46 additions & 3 deletions autoconnect/autoconnect-web/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,59 @@ pub async fn push_route(
notif: web::Json<Notification>,
app_state: web::Data<AppState>,
) -> HttpResponse {
#[allow(unused_mut)] // Needed for "reliable_report"
let mut notif = notif.into_inner();
#[cfg(feature = "reliable_report")]
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
{
notif.reliable_state = app_state
.reliability
.record(
&notif.reliability_id,
autopush_common::reliability::PushReliabilityState::IntAccepted,
&notif.reliable_state,
Some(notif.timestamp + notif.ttl),
)
.await;
}
trace!(
"⏩ push_route, uaid: {} channel_id: {}",
"⏩ in push_route, uaid: {} channel_id: {}",
uaid,
notif.channel_id
notif.channel_id,
);
#[cfg(feature = "reliable_report")]
let expiry = {
// Set "transmitted" a bit early since we can't do this inside of `notify`.
notif.reliable_state = app_state
.reliability
.record(
&notif.reliability_id,
autopush_common::reliability::PushReliabilityState::Transmitted,
&notif.reliable_state,
Some(notif.timestamp + notif.ttl),
)
.await;
Some(notif.timestamp + notif.ttl)
};
// Attempt to send the notification to the UA using WebSocket protocol, or store on failure.
let result = app_state
.clients
.notify(uaid.into_inner(), notif.into_inner())
.notify(uaid.into_inner(), notif.clone())
.await;
if result.is_ok() {
#[cfg(feature = "reliable_report")]
{
// Set "transmitted" a bit early since we can't do this inside of `notify`.
notif.reliable_state = app_state
.reliability
.record(
&notif.reliability_id,
autopush_common::reliability::PushReliabilityState::Accepted,
&notif.reliable_state,
expiry,
)
.await;
}

HttpResponse::Ok().finish()
} else {
HttpResponse::NotFound().body("Client not available")
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ ctor.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws_sm/reliable_report",
]
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ serde_json.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = []
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl WebPushClient {
&self.app_state.settings
}

#[cfg(feature = "reliable_report")]
pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
&self.app_state.reliability
}

/// Connect this `WebPushClient` to the `ClientRegistry`
///
/// Returning a `Stream` of `ServerNotification`s from the `ClientRegistry`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,31 @@ impl WebPushClient {
Ok(smsgs)
}

#[cfg(feature = "reliable_report")]
/// Record and transition the state for trackable messages.
async fn record_state(
&self,
messages: &mut Vec<Notification>,
state: autopush_common::reliability::PushReliabilityState,
) {
// *Note* because `.map()` is sync
// we can't call the async func without additional hoops.
// I'm guessing that there's a more elegant way to do this, but this works for now.
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
for message in messages {
let expiry = message.timestamp + message.ttl;
message.reliable_state = self
.app_state
.reliability
.record(
&message.reliability_id,
state,
&message.reliable_state,
Some(expiry),
)
.await;
}
}

/// Read a chunk (max count 10 returned) of Notifications from storage
///
/// This alternates between reading Topic Notifications and Timestamp
Expand All @@ -186,10 +211,20 @@ impl WebPushClient {
let topic_resp = if self.flags.include_topic {
trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
// Get the most recent max 11 messages.
self.app_state
#[allow(unused_mut)]
let mut messages = self
.app_state
.db
.fetch_topic_messages(&self.uaid, 11)
.await?
.await?;
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut messages.messages,
autopush_common::reliability::PushReliabilityState::Retreived,
)
.await;
messages
} else {
Default::default()
};
Expand Down Expand Up @@ -226,7 +261,8 @@ impl WebPushClient {
"🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
timestamp
);
let timestamp_resp = self
#[allow(unused_mut)]
let mut timestamp_resp = self
.app_state
.db
.fetch_timestamp_messages(&self.uaid, timestamp, 10)
Expand All @@ -244,6 +280,13 @@ impl WebPushClient {
)
.with_tag("topic", "false")
.send();
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut timestamp_resp.messages,
autopush_common::reliability::PushReliabilityState::Retreived,
)
.await;
}

Ok(CheckStorageResponse {
Expand Down
2 changes: 2 additions & 0 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ emulator = ["bigtable"]
stub = []
# Verbosely log vapid assertions (NOT ADVISED FOR WIDE PRODUCTION USE)
log_vapid = []

reliable_report = ["autopush_common/reliable_report"]
Loading