Skip to content

Commit

Permalink
feat: Add redis based reliability reporting (#778)
Browse files Browse the repository at this point in the history
This adds a feature flag `reliable_report` that optionally enables Push
message reliablity reporting. The report is done in two parts.
The first part uses a Redis like storage system to note message states.
This will require a regularly run "cleanup" script to sweep for expired
messages and adust the current counts, as well log those states to some
sequential logging friendly storage (e.g. common logging or steamed to
a file). The clean-up script should be a singleton to prevent possible
race conditions.

The second component will write a record of the state transition
times for tracked messages to a storage system that is indexed by the
tracking_id. This will allow for more "in depth" analysis by external
tooling.

The idea being that reporting will be comprised of two parts:
One part which shows active states of messages (with a log of prior
states to show trends over time), and an optional "in-depth" record
that could be used to show things like length of time in storage,
overall success rates, survivability rates, etc.

This patch also:
* fixes a few typos
* changes several methods that should consume Notifications, actually
consume them.
* convert from `tracking_id` to `reliability_id`
* convert instance of specialized `Metrics` to generic Cadence (to make
calls more consistent)
* adds a `RELIABLE_REPORT` flag to testing.

Closes: [SYNC-4327](https://mozilla-hub.atlassian.net/browse/SYNC-4327)

[SYNC-4327]:
https://mozilla-hub.atlassian.net/browse/SYNC-4327?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Rachael <[email protected]>
Co-authored-by: Philip Jenvey <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 49b2986 commit 4f35545
Show file tree
Hide file tree
Showing 45 changed files with 1,201 additions and 176 deletions.
50 changes: 47 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ CARGO = cargo
# Let's be very explicit about it for now.
TESTS_DIR := `pwd`/tests
TEST_RESULTS_DIR ?= workspace/test-results
PYTEST_ARGS ?= $(if $(SKIP_SENTRY),-m "not sentry") $(if $(TEST_STUB),,-m "not stub") # Stub tests do not work in CI
# NOTE: Do not be clever.
# The integration tests (and a few others) use pytest markers to control
# the tests that are being run. These markers are set and defined within
# the `./pyproject.toml`. That is the single source of truth.
PYTEST_ARGS := ${PYTEST_ARGS}
INTEGRATION_TEST_DIR := $(TESTS_DIR)/integration
INTEGRATION_TEST_FILE := $(INTEGRATION_TEST_DIR)/test_integration_all_rust.py
NOTIFICATION_TEST_DIR := $(TESTS_DIR)/notification
Expand Down Expand Up @@ -46,17 +50,17 @@ integration-test-clean:
$(DOCKER_COMPOSE) -f $(INTEGRATION_TEST_DIR)/docker-compose.yml down
docker rm integration-tests

integration-test-legacy:
integration-test-legacy: ## pytest markers are stored in `tests/pytest.ini`
$(POETRY) -V
$(POETRY) install --without dev,load,notification --no-root
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_legacy_results.xml \
-v $(PYTEST_ARGS)

integration-test-local:
integration-test-local: ## pytest markers are stored in `tests/pytest.ini`
$(POETRY) -V
$(POETRY) install --without dev,load,notification --no-root
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \
-v $(PYTEST_ARGS)

Expand Down
8 changes: 7 additions & 1 deletion autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ actix-service = "2.0"
docopt = "1.1"

[features]
default = ["bigtable"]
default = ["bigtable", "reliable_report"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
emulator = ["bigtable"]
log_vapid = []
reliable_report = [
"autoconnect_settings/reliable_report",
"autoconnect_web/reliable_report",
"autoconnect_ws/reliable_report",
"autopush_common/reliable_report",
]
4 changes: 2 additions & 2 deletions autoconnect/autoconnect-common/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ impl BroadcastChangeTracker {
}
*ver = broadcast.version;
} else {
trace!("📢 Not found: {}", &b_id);
trace!("📢 Not found: {b_id}");
return Err(ApcErrorKind::BroadcastError("Broadcast not found".into()).into());
}

trace!("📢 New version of {}", &b_id);
trace!("📢 New version of {b_id}");
// Check to see if this broadcast has been updated since initialization
let bcast_index = self
.broadcast_list
Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-settings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ autopush_common.workspace = true
# specify the default via the calling crate, in order to simplify default chains.
bigtable = ["autopush_common/bigtable"]
emulator = ["bigtable"]
reliable_report = ["autopush_common/reliable_report"]
14 changes: 14 additions & 0 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use autoconnect_common::{
registry::ClientRegistry,
};
use autopush_common::db::{client::DbClient, DbSettings, StorageType};
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;

use crate::{Settings, ENV_PREFIX};

Expand All @@ -32,6 +34,9 @@ pub struct AppState {
pub settings: Settings,
pub router_url: String,
pub endpoint_url: String,

#[cfg(feature = "reliable_report")]
pub reliability: Arc<PushReliability>,
}

impl AppState {
Expand Down Expand Up @@ -84,6 +89,13 @@ impl AppState {
ENV_PREFIX.to_uppercase()
),
};

#[cfg(feature = "reliable_report")]
let reliability = Arc::new(
PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| {
ConfigError::Message(format!("Could not start Reliability connection: {:?}", e))
})?,
);
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
Expand All @@ -103,6 +115,8 @@ impl AppState {
settings,
router_url,
endpoint_url,
#[cfg(feature = "reliable_report")]
reliability,
})
}

Expand Down
7 changes: 7 additions & 0 deletions autoconnect/autoconnect-settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ pub struct Settings {
///
/// By default, the number of available physical CPUs is used as the worker count.
pub actix_workers: Option<usize>,
#[cfg(feature = "reliable_report")]
/// The DNS for the reliability data store. This is normally a Redis compatible
/// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters)
/// for details.
pub reliability_dsn: Option<String>,
}

impl Default for Settings {
Expand Down Expand Up @@ -139,6 +144,8 @@ impl Default for Settings {
msg_limit: 150,
actix_max_connections: None,
actix_workers: None,
#[cfg(feature = "reliable_report")]
reliability_dsn: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ tokio.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws/reliable_report",
]
32 changes: 28 additions & 4 deletions autoconnect/autoconnect-web/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,45 @@ pub async fn ws_route(
}

/// Deliver a Push notification directly to a connected client
#[allow(unused_mut)]
pub async fn push_route(
uaid: web::Path<Uuid>,
notif: web::Json<Notification>,
mut notif: web::Json<Notification>,
app_state: web::Data<AppState>,
) -> HttpResponse {
trace!(
"⏩ push_route, uaid: {} channel_id: {}",
"⏩ in push_route, uaid: {} channel_id: {}",
uaid,
notif.channel_id
notif.channel_id,
);
#[cfg(feature = "reliable_report")]
{
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::IntAccepted,
)
.await;
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::Transmitted,
)
.await;
}
// Attempt to send the notification to the UA using WebSocket protocol, or store on failure.
let result = app_state
.clients
.notify(uaid.into_inner(), notif.into_inner())
.notify(uaid.into_inner(), notif.clone())
.await;
if result.is_ok() {
#[cfg(feature = "reliable_report")]
notif
.record_reliability(
&app_state.reliability,
autopush_common::reliability::ReliabilityState::Accepted,
)
.await;
HttpResponse::Ok().finish()
} else {
HttpResponse::NotFound().body("Client not available")
Expand Down
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ ctor.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws_sm/reliable_report",
]
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ serde_json.workspace = true
autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = []
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl WebPushClient {
&self.app_state.settings
}

#[cfg(feature = "reliable_report")]
pub fn app_reliability(&self) -> &autopush_common::reliability::PushReliability {
&self.app_state.reliability
}

/// Connect this `WebPushClient` to the `ClientRegistry`
///
/// Returning a `Stream` of `ServerNotification`s from the `ClientRegistry`
Expand Down Expand Up @@ -233,6 +238,7 @@ impl WebPushClient {
let connected_at = self.connected_at;
rt::spawn(async move {
app_state.db.save_messages(&uaid, notifs).await?;
// XXX: record reliability
debug!("Finished saving unacked direct notifs, checking for reconnect");
let Some(user) = app_state.db.get_user(&uaid).await? else {
return Err(SMErrorKind::Internal(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl WebPushClient {
if msg.sortkey_timestamp.is_none() {
expired_topic_sort_keys.push(msg.chidmessageid());
}
// XXX: record ReliabilityState::Expired?
false
});
// TODO: A batch remove_messages would be nicer
Expand Down Expand Up @@ -163,6 +164,22 @@ impl WebPushClient {
Ok(smsgs)
}

#[cfg(feature = "reliable_report")]
/// Record and transition the state for trackable messages.
async fn record_state(
&self,
messages: &mut Vec<Notification>,
state: autopush_common::reliability::ReliabilityState,
) {
// *Note* because `.map()` is sync
// we can't call the async func without additional hoops.
for message in messages {
message
.record_reliability(&self.app_state.reliability, state)
.await;
}
}

/// Read a chunk (max count 10 returned) of Notifications from storage
///
/// This alternates between reading Topic Notifications and Timestamp
Expand All @@ -186,10 +203,20 @@ impl WebPushClient {
let topic_resp = if self.flags.include_topic {
trace!("🗄️ WebPushClient::do_check_storage: fetch_topic_messages");
// Get the most recent max 11 messages.
self.app_state
#[allow(unused_mut)]
let mut messages = self
.app_state
.db
.fetch_topic_messages(&self.uaid, 11)
.await?
.await?;
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut messages.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
messages
} else {
Default::default()
};
Expand Down Expand Up @@ -226,7 +253,8 @@ impl WebPushClient {
"🗄️ WebPushClient::do_check_storage: fetch_timestamp_messages timestamp: {:?}",
timestamp
);
let timestamp_resp = self
#[allow(unused_mut)]
let mut timestamp_resp = self
.app_state
.db
.fetch_timestamp_messages(&self.uaid, timestamp, 10)
Expand All @@ -244,6 +272,13 @@ impl WebPushClient {
)
.with_tag("topic", "false")
.send();
#[cfg(feature = "reliable_report")]
// Since we pulled these from storage, mark them as "retrieved"
self.record_state(
&mut timestamp_resp.messages,
autopush_common::reliability::ReliabilityState::Retrieved,
)
.await;
}

Ok(CheckStorageResponse {
Expand Down
Loading

0 comments on commit 4f35545

Please sign in to comment.