From b2b6bfd3e5f4273e6312f6305fb122013182d55b Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Wed, 13 Jul 2022 15:08:06 -0700 Subject: [PATCH] Fxa fixes (#321) bug: Various mini-patches for FxA integration work * Added more verbose `trace!` and `debug!` logging messages. * ignore padding errors for VAPID keys * bumped up default max bytes to handle base64 encoded 4096 block * record the VapidError as an info before we send it to metrics We can't add the key to metrics (cardinality). We can log it locally and monitor a given server for a short period, though. --- Cargo.lock | 183 +++++++++++++++--- autoendpoint/Cargo.toml | 1 + autoendpoint/src/db/client.rs | 6 + autoendpoint/src/extractors/notification.rs | 5 +- .../src/extractors/notification_headers.rs | 7 +- autoendpoint/src/extractors/subscription.rs | 28 ++- autoendpoint/src/extractors/token_info.rs | 3 +- autoendpoint/src/extractors/user.rs | 5 +- autoendpoint/src/headers/vapid.rs | 4 +- autoendpoint/src/main.rs | 3 +- autoendpoint/src/middleware/sentry.rs | 5 + autoendpoint/src/settings.rs | 11 +- autopush-common/src/db/commands.rs | 3 +- autopush-common/src/db/mod.rs | 4 + autopush/src/client.rs | 8 +- autopush/src/megaphone.rs | 10 + autopush/src/server/mod.rs | 54 ++++-- autopush/src/settings.rs | 7 + tests/test_integration_all_rust.py | 35 +++- 19 files changed, 309 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f28177a3..0cc5a2462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,16 +36,33 @@ dependencies = [ "tokio-util 0.3.1", ] +[[package]] +name = "actix-codec" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" +dependencies = [ + "bitflags", + "bytes 1.1.0", + "futures-core", + "futures-sink", + "log", + "memchr", + "pin-project-lite 0.2.9", + "tokio 1.18.2", + "tokio-util 0.7.2", +] + [[package]] name = "actix-connect" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "177837a10863f15ba8d3ae3ec12fac1099099529ed20083a27fdfe247381d0dc" dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", - "actix-utils", + "actix-codec 0.3.0", + "actix-rt 1.1.1", + "actix-service 1.0.6", + "actix-utils 2.0.0", "derive_more", "either", "futures-util", @@ -75,12 +92,12 @@ version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2be6b66b62a794a8e6d366ac9415bb7d475ffd1e9f4671f38c1d8a8a5df950b3" dependencies = [ - "actix-codec", + "actix-codec 0.3.0", "actix-connect", - "actix-rt", - "actix-service", + "actix-rt 1.1.1", + "actix-service 1.0.6", "actix-threadpool", - "actix-utils", + "actix-utils 2.0.0", "base64 0.13.0", "bitflags", "brotli", @@ -100,7 +117,7 @@ dependencies = [ "httparse", "indexmap", "itoa 0.4.8", - "language-tags", + "language-tags 0.2.2", "lazy_static", "log", "mime", @@ -116,6 +133,35 @@ dependencies = [ "time 0.2.27", ] +[[package]] +name = "actix-http" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f9ffb6db08c1c3a1f4aef540f1a63193adc73c4fbd40b75a95fc8c5258f6e51" +dependencies = [ + "actix-codec 0.5.0", + "actix-rt 2.7.0", + "actix-service 2.0.2", + "actix-utils 3.0.0", + "ahash", + "bitflags", + "bytes 1.1.0", + "bytestring", + "derive_more", + "encoding_rs", + "futures-core", + "http 0.2.7", + "httparse", + "httpdate 1.0.2", + "itoa 1.0.2", + "language-tags 0.3.2", + "mime", + "percent-encoding 2.1.0", + "pin-project-lite 0.2.9", + "smallvec 1.8.0", + "tracing", +] + [[package]] name = "actix-macros" version = "0.1.3" @@ -154,16 +200,26 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "actix-rt" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ea16c295198e958ef31930a6ef37d0fb64e9ca3b6116e6b93a8bdae96ee1000" +dependencies = [ + "futures-core", + "tokio 1.18.2", +] + [[package]] name = "actix-server" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45407e6e672ca24784baa667c5d32ef109ccdd8d5e0b5ebb9ef8a67f4dfb708e" dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", - "actix-utils", + "actix-codec 0.3.0", + "actix-rt 1.1.1", + "actix-service 1.0.6", + "actix-utils 2.0.0", "futures-channel", "futures-util", "log", @@ -184,6 +240,17 @@ dependencies = [ "pin-project 0.4.29", ] +[[package]] +name = "actix-service" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" +dependencies = [ + "futures-core", + "paste", + "pin-project-lite 0.2.9", +] + [[package]] name = "actix-testing" version = "1.0.1" @@ -191,9 +258,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47239ca38799ab74ee6a8a94d1ce857014b2ac36f242f70f3f75a66f691e791c" dependencies = [ "actix-macros", - "actix-rt", + "actix-rt 1.1.1", "actix-server", - "actix-service", + "actix-service 1.0.6", "log", "socket2 0.3.19", ] @@ -219,9 +286,9 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24789b7d7361cf5503a504ebe1c10806896f61e96eca9a7350e23001aca715fb" dependencies = [ - "actix-codec", - "actix-service", - "actix-utils", + "actix-codec 0.3.0", + "actix-service 1.0.6", + "actix-utils 2.0.0", "futures-util", ] @@ -231,9 +298,9 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e9022dec56632d1d7979e59af14f0597a28a830a9c1c7fec8b2327eb9f16b5a" dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", + "actix-codec 0.3.0", + "actix-rt 1.1.1", + "actix-service 1.0.6", "bitflags", "bytes 0.5.6", "either", @@ -245,23 +312,33 @@ dependencies = [ "slab", ] +[[package]] +name = "actix-utils" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e491cbaac2e7fc788dfff99ff48ef317e23b3cf63dbaf7aaab6418f40f92aa94" +dependencies = [ + "local-waker", + "pin-project-lite 0.2.9", +] + [[package]] name = "actix-web" version = "3.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6534a126df581caf443ba2751cab42092c89b3f1d06a9d829b1e17edfe3e277" dependencies = [ - "actix-codec", - "actix-http", + "actix-codec 0.3.0", + "actix-http 2.2.2", "actix-macros", "actix-router", - "actix-rt", + "actix-rt 1.1.1", "actix-server", - "actix-service", + "actix-service 1.0.6", "actix-testing", "actix-threadpool", "actix-tls", - "actix-utils", + "actix-utils 2.0.0", "actix-web-codegen", "awc", "bytes 0.5.6", @@ -427,7 +504,8 @@ version = "1.63.0" dependencies = [ "a2", "actix-cors", - "actix-rt", + "actix-http 3.2.1", + "actix-rt 1.1.1", "actix-web", "again", "async-trait", @@ -571,10 +649,10 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b381e490e7b0cfc37ebc54079b0413d8093ef43d14a4e4747083f7fa47a9e691" dependencies = [ - "actix-codec", - "actix-http", - "actix-rt", - "actix-service", + "actix-codec 0.3.0", + "actix-http 2.2.2", + "actix-rt 1.1.1", + "actix-service 1.0.6", "base64 0.13.0", "bytes 0.5.6", "cfg-if 1.0.0", @@ -2180,6 +2258,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.4.0" @@ -2198,6 +2282,12 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "local-waker" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" + [[package]] name = "lock_api" version = "0.3.4" @@ -2657,6 +2747,16 @@ dependencies = [ "parking_lot_core 0.8.5", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api 0.4.7", + "parking_lot_core 0.9.3", +] + [[package]] name = "parking_lot_core" version = "0.6.2" @@ -2686,6 +2786,25 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.2.13", + "smallvec 1.8.0", + "windows-sys", +] + +[[package]] +name = "paste" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" + [[package]] name = "pathdiff" version = "0.2.1" @@ -4571,7 +4690,9 @@ dependencies = [ "mio 0.8.3", "num_cpus", "once_cell", + "parking_lot 0.12.1", "pin-project-lite 0.2.9", + "signal-hook-registry", "socket2 0.4.4", "winapi 0.3.9", ] diff --git a/autoendpoint/Cargo.toml b/autoendpoint/Cargo.toml index 9a7dc5c7f..17c1522bc 100644 --- a/autoendpoint/Cargo.toml +++ b/autoendpoint/Cargo.toml @@ -17,6 +17,7 @@ a2 = { git = "https://github.com/mozilla-services/a2.git", branch = "autoendpoin actix-web = "3.3" actix-rt = "1.1" # 2.0+ requires futures 0.3+ actix-cors = "0.5" +actix-http = "3.0" again = { version = "0.1.2", default-features = false, features = ["log", "rand"] } async-trait = "0.1" autopush_common = { path = "../autopush-common" } diff --git a/autoendpoint/src/db/client.rs b/autoendpoint/src/db/client.rs index b51e21877..df6f0bb0b 100644 --- a/autoendpoint/src/db/client.rs +++ b/autoendpoint/src/db/client.rs @@ -93,6 +93,7 @@ impl DbClientImpl { router_table: String, message_table: String, ) -> DbResult { + debug!("Tables: {} and {}", router_table, message_table); let ddb = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { DynamoDbClient::new_with( HttpClient::new().expect("TLS initialization error"), @@ -205,6 +206,11 @@ impl DbClient for DbClientImpl { } async fn get_user(&self, uaid: Uuid) -> DbResult> { + trace!( + "Looking up user: {:?} in {}", + uaid.as_simple().to_string(), + self.router_table.clone() + ); let input = GetItemInput { table_name: self.router_table.clone(), consistent_read: Some(true), diff --git a/autoendpoint/src/extractors/notification.rs b/autoendpoint/src/extractors/notification.rs index 2be84553f..02792e18a 100644 --- a/autoendpoint/src/extractors/notification.rs +++ b/autoendpoint/src/extractors/notification.rs @@ -44,7 +44,10 @@ impl FromRequest for Notification { // Read data let data = web::Bytes::from_request(&req, &mut payload) .await - .map_err(ApiErrorKind::PayloadError)?; + .map_err(|e| { + debug!("▶▶ Request read payload error: {:?}", &e); + ApiErrorKind::PayloadError(e) + })?; // Convert data to base64 let data = if data.is_empty() { diff --git a/autoendpoint/src/extractors/notification_headers.rs b/autoendpoint/src/extractors/notification_headers.rs index 57724f1d8..a3376979d 100644 --- a/autoendpoint/src/extractors/notification_headers.rs +++ b/autoendpoint/src/extractors/notification_headers.rs @@ -299,12 +299,15 @@ mod tests { fn valid_topic() { let req = TestRequest::post() .header("TTL", "10") - .header("TOPIC", "test-topic") + .header("TOPIC", "a-test-topic-which-is-just-right") .to_http_request(); let result = NotificationHeaders::from_request(&req, false); assert!(result.is_ok()); - assert_eq!(result.unwrap().topic, Some("test-topic".to_string())); + assert_eq!( + result.unwrap().topic, + Some("a-test-topic-which-is-just-right".to_string()) + ); } /// Topic names which are too long return an error diff --git a/autoendpoint/src/extractors/subscription.rs b/autoendpoint/src/extractors/subscription.rs index 2ddbf601d..bed904192 100644 --- a/autoendpoint/src/extractors/subscription.rs +++ b/autoendpoint/src/extractors/subscription.rs @@ -58,6 +58,7 @@ impl FromRequest for Subscription { async move { // Collect token info and server state let token_info = TokenInfo::extract(&req).await?; + trace!("Token info: {:?}", &token_info); let state: Data = Data::extract(&req).await.expect("No server state found"); @@ -65,13 +66,18 @@ impl FromRequest for Subscription { let token = state .fernet .decrypt(&repad_base64(&token_info.token)) - .map_err(|_| ApiErrorKind::InvalidToken)?; + .map_err(|e| { + error!("fernet: {:?}", e); + ApiErrorKind::InvalidToken + })?; // Parse VAPID and extract public key. let vapid: Option = parse_vapid(&token_info, &state.metrics)? .map(|vapid| extract_public_key(vapid, &token_info)) .transpose()?; + trace!("Vapid: {:?}", &vapid); + match token_info.api_version { ApiVersion::Version1 => version_1_validation(&token)?, ApiVersion::Version2 => version_2_validation(&token, vapid.as_ref())?, @@ -82,11 +88,16 @@ impl FromRequest for Subscription { // only returned if the slice length is not 16. let uaid = Uuid::from_slice(&token[..16]).unwrap(); let channel_id = Uuid::from_slice(&token[16..32]).unwrap(); + + trace!("UAID: {:?}, CHID: {:?}", uaid, channel_id); + let user = state .ddb .get_user(uaid) .await? .ok_or(ApiErrorKind::NoSubscription)?; + + trace!("user: {:?}", &user); validate_user(&user, &channel_id, &state).await?; // Validate the VAPID JWT token and record the version @@ -193,7 +204,7 @@ fn version_2_validation(token: &[u8], vapid: Option<&VapidHeaderWithKey>) -> Api // Hash the VAPID public key let public_key = base64::decode_config(public_key, base64::URL_SAFE_NO_PAD) - .map_err(|_| VapidError::InvalidKey)?; + .map_err(|e| VapidError::InvalidKey(e.to_string()))?; let key_hash = openssl::hash::hash(MessageDigest::sha256(), &public_key) .map_err(ApiErrorKind::TokenHashValidation)?; @@ -215,8 +226,11 @@ fn validate_vapid_jwt(vapid: &VapidHeaderWithKey, domain: &Url) -> ApiResult<()> let VapidHeaderWithKey { vapid, public_key } = vapid; // Check the signature and make sure the expiration is in the future - let public_key = base64::decode_config(public_key, base64::URL_SAFE_NO_PAD) - .map_err(|_| VapidError::InvalidKey)?; + // NOTE: FxA sometimes sends a VAPID public key with incorrect padding. + // Prior versions ignored padding errors, so we should too. + let public_key = + base64::decode_config(public_key.trim_end_matches('='), base64::URL_SAFE_NO_PAD) + .map_err(|e| VapidError::InvalidKey(e.to_string()))?; // NOTE: This will fail if `exp` is specified as a string instead of a numeric. let token_data = match jsonwebtoken::decode::( &vapid.token, @@ -292,7 +306,8 @@ mod tests { base64::STANDARD, ) .unwrap(); - let public_key = "BM3bVjW_wuZC54alIbqjTbaBNtthriVtdZlchOyOSdbVYeYQu2i5inJdft7jUWIAy4O9xHBbY196Gf-1odb8hds".to_owned(); + // Specify a potentially invalid padding. + let public_key = "BM3bVjW_wuZC54alIbqjTbaBNtthriVtdZlchOyOSdbVYeYQu2i5inJdft7jUWIAy4O9xHBbY196Gf-1odb8hds==".to_owned(); let domain = "https://push.services.mozilla.org"; let jwk_header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::ES256); let enc_key = jsonwebtoken::EncodingKey::from_ec_der(&priv_key); @@ -311,7 +326,8 @@ mod tests { version_data: VapidVersionData::Version1, }, }; - assert!(validate_vapid_jwt(&header, &Url::from_str(domain).unwrap()).is_ok()); + let result = validate_vapid_jwt(&header, &Url::from_str(domain).unwrap()); + assert!(result.is_ok()); } #[test] diff --git a/autoendpoint/src/extractors/token_info.rs b/autoendpoint/src/extractors/token_info.rs index 55e99402d..aabec9ebd 100644 --- a/autoendpoint/src/extractors/token_info.rs +++ b/autoendpoint/src/extractors/token_info.rs @@ -6,6 +6,7 @@ use futures::future; use std::str::FromStr; /// Extracts basic token data from the webpush request path and headers +#[derive(Debug)] pub struct TokenInfo { pub api_version: ApiVersion, pub token: String, @@ -39,7 +40,7 @@ impl FromRequest for TokenInfo { } } -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum ApiVersion { Version1, Version2, diff --git a/autoendpoint/src/extractors/user.rs b/autoendpoint/src/extractors/user.rs index ea4ea8037..03a2ab8a3 100644 --- a/autoendpoint/src/extractors/user.rs +++ b/autoendpoint/src/extractors/user.rs @@ -53,7 +53,10 @@ async fn validate_webpush_user( }; if ddb.message_table() != message_table { - debug!("User is inactive, dropping user"; "user" => ?user); + debug!("User is inactive, dropping user"; + "ddb.message_table" => ddb.message_table(), + "message_table" => message_table, + "user" => ?user); drop_user(user.uaid, ddb, metrics).await?; return Err(ApiErrorKind::NoSubscription.into()); } diff --git a/autoendpoint/src/headers/vapid.rs b/autoendpoint/src/headers/vapid.rs index 844f3c4e2..b7d101dcf 100644 --- a/autoendpoint/src/headers/vapid.rs +++ b/autoendpoint/src/headers/vapid.rs @@ -85,8 +85,8 @@ pub enum VapidError { InvalidVapid(String), #[error("Missing VAPID public key")] MissingKey, - #[error("Invalid VAPID public key")] - InvalidKey, + #[error("Invalid VAPID public key: {0}")] + InvalidKey(String), #[error("Invalid VAPID audience")] InvalidAudience, #[error("Invalid VAPID expiry")] diff --git a/autoendpoint/src/main.rs b/autoendpoint/src/main.rs index f7bb04e1c..480ed0408 100644 --- a/autoendpoint/src/main.rs +++ b/autoendpoint/src/main.rs @@ -41,6 +41,7 @@ async fn main() -> Result<(), Box> { .and_then(|d| d.deserialize()) .unwrap_or_else(|e| e.exit()); let settings = settings::Settings::with_env_and_config_file(&args.flag_config)?; + let host_port = format!("{}:{}", &settings.host, &settings.port); logging::init_logging(!settings.human_logs).expect("Logging failed to initialize"); debug!("Starting up..."); @@ -51,7 +52,7 @@ async fn main() -> Result<(), Box> { let server = server::Server::with_settings(settings) .await .expect("Could not start server"); - info!("Server started"); + info!("Server started: {}", host_port); server.await?; // Shutdown diff --git a/autoendpoint/src/middleware/sentry.rs b/autoendpoint/src/middleware/sentry.rs index 702139ea8..a014a412f 100644 --- a/autoendpoint/src/middleware/sentry.rs +++ b/autoendpoint/src/middleware/sentry.rs @@ -35,6 +35,11 @@ pub fn sentry_middleware( if let Some(api_err) = error.as_error::() { // if it's not reportable, and we have access to the metrics, record it as a metric. if !api_err.kind.is_sentry_event() { + // The error (e.g. VapidErrorKind::InvalidKey(String)) might be too cardinal, + // but we may need that information to debug a production issue. We can + // add an info here, temporarily turn on info level debugging on a given server, + // capture it, and then turn it off before we run out of money. + info!("Sending error to metrics: {:?}", api_err.kind); if let Some(state) = state { match state .metrics diff --git a/autoendpoint/src/settings.rs b/autoendpoint/src/settings.rs index 8288abcbd..9c5e3308c 100644 --- a/autoendpoint/src/settings.rs +++ b/autoendpoint/src/settings.rs @@ -45,7 +45,11 @@ impl Default for Settings { port: 8000, router_table_name: "router".to_string(), message_table_name: "message".to_string(), - max_data_bytes: 4096, + /// max data is a bit hard to figure out, due to encryption. Using something + /// like pywebpush, if you encode a block of 4096 bytes, you'll get a + /// 4216 byte data block. Since we're going to be receiving this, we have to + /// presume base64 encoding, so we can bump things up to 5624 bytes max. + max_data_bytes: 5624, crypto_keys: format!("[{}]", Fernet::generate_key()), auth_keys: r#"["AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB="]"#.to_string(), human_logs: false, @@ -116,7 +120,10 @@ impl Settings { pub fn make_fernet(&self) -> MultiFernet { let keys = &self.crypto_keys.replace('"', "").replace(' ', ""); let fernets = Self::read_list_from_str(keys, "Invalid AUTOEND_CRYPTO_KEYS") - .map(|key| Fernet::new(key).expect("Invalid AUTOEND_CRYPTO_KEYS")) + .map(|key| { + debug!("Fernet keys: {:?}", &key); + Fernet::new(key).expect("Invalid AUTOEND_CRYPTO_KEYS") + }) .collect(); MultiFernet::new(fernets) } diff --git a/autopush-common/src/db/commands.rs b/autopush-common/src/db/commands.rs index 3cbc48cef..132b516a8 100644 --- a/autopush-common/src/db/commands.rs +++ b/autopush-common/src/db/commands.rs @@ -228,7 +228,6 @@ pub fn register_user( user: &DynamoDbUser, router_table: &str, ) -> impl Future { - trace!("### Registering User..."); let item = match serde_dynamodb::to_hashmap(user) { Ok(item) => item, Err(e) => return future::err(e).chain_err(|| "Failed to serialize item"), @@ -241,7 +240,7 @@ pub fn register_user( retry_if( move || { - debug!("Registering user: {:?}", item); + debug!("### Registering user into {}: {:?}", router_table, item); ddb.put_item(PutItemInput { item: item.clone(), table_name: router_table.clone(), diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index b80a0921c..6b7b591bf 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -76,6 +76,10 @@ impl DynamoStorage { router_table_name: &str, metrics: Arc, ) -> Result { + debug!( + "Checking tables: {} & {}", + &message_table_name, &router_table_name + ); let ddb = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { DynamoDbClient::new_with( HttpClient::new().chain_err(|| "TLS initialization error")?, diff --git a/autopush/src/client.rs b/autopush/src/client.rs index 8835f4cf2..7c568fcc3 100644 --- a/autopush/src/client.rs +++ b/autopush/src/client.rs @@ -842,7 +842,7 @@ where .. } = **send; if !smessages.is_empty() { - trace!("Sending {} {:#?}", smessages.len(), smessages); + trace!("🚟 Sending {} msgs: {:#?}", smessages.len(), smessages); let item = smessages.remove(0); let ret = data .ws @@ -919,6 +919,8 @@ where r#await: &'a mut RentToOwn<'a, AwaitInput>, ) -> Poll, Error> { trace!("State: AwaitInput"); + // The following is a blocking call. No action is taken until we either get a + // websocket data packet or there's an incoming notification. let input = try_ready!(r#await.data.input_or_notif()); let AwaitInput { data } = r#await.take(); let webpush_rc = data.webpush.clone(); @@ -1082,14 +1084,14 @@ where // Clients shouldn't ping > than once per minute or we // disconnect them if sec_since_epoch() - webpush.last_ping >= 45 { - debug!("Got a ping, sending pong"); + trace!("🏓 Got a ping, sending pong"); webpush.last_ping = sec_since_epoch(); transition!(Send { smessages: vec![ServerMessage::Ping], data, }) } else { - debug!("Got a ping too quickly, disconnecting"); + trace!("🏓 Got a ping too quickly, disconnecting"); Err(ErrorKind::ExcessivePing.into()) } } diff --git a/autopush/src/megaphone.rs b/autopush/src/megaphone.rs index 0a1792958..8e0050687 100644 --- a/autopush/src/megaphone.rs +++ b/autopush/src/megaphone.rs @@ -162,6 +162,7 @@ impl BroadcastChangeTracker { /// Note: If the broadcast already exists, it will be updated instead. pub fn add_broadcast(&mut self, broadcast: Broadcast) -> u32 { if let Ok(change_count) = self.update_broadcast(broadcast.clone()) { + trace!("📢 returning change count {}", &change_count); return change_count; } self.change_count += 1; @@ -180,6 +181,8 @@ impl BroadcastChangeTracker { /// /// Returns an error if the `broadcast` was never initialized/added. pub fn update_broadcast(&mut self, broadcast: Broadcast) -> Result { + let b_id = broadcast.broadcast_id.clone(); + let old_count = self.change_count; let key = self .broadcast_registry .lookup_key(&broadcast.broadcast_id) @@ -191,9 +194,11 @@ impl BroadcastChangeTracker { } *ver = broadcast.version; } else { + trace!("📢 Not found: {}", &b_id); return Err("Broadcast not found".into()); } + trace!("📢 New version of {}", &b_id); // Check to see if this broadcast has been updated since initialization let bcast_index = self .broadcast_list @@ -209,15 +214,20 @@ impl BroadcastChangeTracker { .next(); self.change_count += 1; if let Some(bcast_index) = bcast_index { + trace!("📢 {} index: {}", &b_id, &bcast_index); let mut bcast = self.broadcast_list.remove(bcast_index); bcast.change_count = self.change_count; self.broadcast_list.push(bcast); } else { + trace!("📢 adding broadcast list for {}", &b_id); self.broadcast_list.push(BroadcastRevision { change_count: self.change_count, broadcast: key, }) } + if old_count != self.change_count { + trace!("📢 New Change available"); + } Ok(self.change_count) } diff --git a/autopush/src/server/mod.rs b/autopush/src/server/mod.rs index 49e84c6c6..96d33e909 100644 --- a/autopush/src/server/mod.rs +++ b/autopush/src/server/mod.rs @@ -161,6 +161,7 @@ impl ServerOptions { return Err("Invalid AUTOPUSH_CRYPTO_KEY".into()); } let crypto_key = &crypto_key[1..crypto_key.len() - 1]; + debug!("Fernet keys: {:?}", &crypto_key); let fernets: Vec = crypto_key .split(',') .map(|s| s.trim().to_string()) @@ -303,6 +304,7 @@ impl Server { metrics, }); let addr = SocketAddr::from(([0, 0, 0, 0], srv.opts.port)); + debug!("{:?}", &addr); let ws_listener = TcpListener::bind(&addr, &srv.handle)?; let handle = core.handle(); @@ -375,6 +377,7 @@ impl Server { // the internal state machine. Box::new( ws.and_then(move |ws| { + trace!("🏓 starting ping manager"); PingManager::new(&srv2, ws, uarx) .chain_err(|| "failed to make ping handler") }) @@ -414,7 +417,7 @@ impl Server { ) .expect("Unable to start megaphone updater"); core.handle().spawn(fut.then(|res| { - debug!("megaphone result: {:?}", res.map(drop)); + trace!("📢 megaphone result: {:?}", res.map(drop)); Ok(()) })); } @@ -431,7 +434,7 @@ impl Server { &self, desired_broadcasts: &[Broadcast], ) -> (BroadcastSubs, HashMap) { - debug!("Initialized broadcasts"); + trace!("📢Initialized broadcasts"); let bc = self.broadcaster.borrow(); let BroadcastSubsInit(broadcast_subs, broadcasts) = bc.broadcast_delta(desired_broadcasts); let mut response = Broadcast::vec_into_hashmap(broadcasts); @@ -447,6 +450,7 @@ impl Server { /// Calculate whether there's new broadcast versions to go out pub fn broadcast_delta(&self, broadcast_subs: &mut BroadcastSubs) -> Option> { + trace!("📢 Checking broadcast_delta"); self.broadcaster.borrow().change_count_delta(broadcast_subs) } @@ -524,7 +528,7 @@ impl Future for MegaphoneUpdater { let new_state = match self.state { MegaphoneState::Waiting => { try_ready!(self.timeout.poll()); - debug!("Sending megaphone API request"); + trace!("📢Sending megaphone API request"); let fut = self .client .get(&self.api_url) @@ -539,15 +543,17 @@ impl Future for MegaphoneUpdater { let at = Instant::now() + self.poll_interval; match response.poll() { Ok(Async::Ready(MegaphoneAPIResponse { broadcasts })) => { - debug!("Fetched broadcasts: {:?}", broadcasts); + trace!("📢Fetched broadcasts: {:?}", broadcasts); let mut broadcaster = self.srv.broadcaster.borrow_mut(); for srv in Broadcast::from_hashmap(broadcasts) { - broadcaster.add_broadcast(srv); + let vv = broadcaster.add_broadcast(srv); + trace!("📢 add_broadcast = {}", vv); + // TODO: Notify that Ping required? } } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(error) => { - error!("Failed to get response, queue again {:?}", error); + error!("📢Failed to get response, queue again {:?}", error); capture_message( &format!("Failed to get response, queue again {:?}", error), sentry::Level::Error, @@ -603,6 +609,7 @@ impl PingManager { // an `Rc` object. This'll allow us to share it between the ping/pong // management and message shuffling. let socket = RcObject::new(WebpushSocket::new(socket)); + trace!("🏓Ping interval {:?}", &srv.opts.auto_ping_interval); Ok(PingManager { timeout: Timeout::new(srv.opts.auto_ping_interval, &srv.handle)?, waiting: WaitingFor::SendPing, @@ -620,6 +627,7 @@ impl Future for PingManager { fn poll(&mut self) -> Poll<(), Error> { let mut socket = self.socket.borrow_mut(); loop { + trace!("🏓 PingManager Poll loop"); if socket.ws_ping { // Don't check if we already have a delta to broadcast if socket.broadcast_delta.is_none() { @@ -632,8 +640,10 @@ impl Future for PingManager { } if socket.send_ws_ping()?.is_ready() { + trace!("🏓 Time to ping"); // If we just sent a broadcast, reset the ping interval and clear the delta if socket.broadcast_delta.is_some() { + trace!("📢 Pending"); let at = Instant::now() + self.srv.opts.auto_ping_interval; self.timeout.reset(at); socket.broadcast_delta = None; @@ -650,21 +660,29 @@ impl Future for PingManager { debug_assert!(!socket.ws_ping); match self.waiting { WaitingFor::SendPing => { + trace!( + "🏓Checking pong timeout:{} pong recv'd:{}", + socket.ws_pong_timeout, + socket.ws_pong_received + ); debug_assert!(!socket.ws_pong_timeout); debug_assert!(!socket.ws_pong_received); match self.timeout.poll()? { Async::Ready(()) => { - debug!("scheduling a ws ping to get sent"); + trace!("🏓scheduling a ws ping to get sent"); socket.ws_ping = true; } - Async::NotReady => break, + Async::NotReady => { + trace!("🏓not ready yet"); + break; + } } } WaitingFor::Pong => { if socket.ws_pong_received { // If we received a pong, then switch us back to waiting // to send out a ping - debug!("ws pong received, going back to sending a ping"); + trace!("🏓ws pong received, going back to sending a ping"); debug_assert!(!socket.ws_pong_timeout); let at = Instant::now() + self.srv.opts.auto_ping_interval; self.timeout.reset(at); @@ -674,7 +692,7 @@ impl Future for PingManager { // If our socket is waiting to deliver a pong timeout, // then no need to keep checking the timer and we can // keep going - debug!("waiting for socket to see ws pong timed out"); + trace!("🏓waiting for socket to see ws pong timed out"); break; } else if self.timeout.poll()?.is_ready() { // We may not actually be reading messages from the @@ -683,7 +701,7 @@ impl Future for PingManager { // error here wait for the stream to return `NotReady` // when looking for messages, as then we're extra sure // that no pong was received after this timeout elapsed. - debug!("waited too long for a ws pong"); + trace!("🏓waited too long for a ws pong"); socket.ws_pong_timeout = true; } else { break; @@ -753,28 +771,32 @@ impl WebpushSocket { T: Sink, Error: From, { + trace!("🏓 checking ping"); if self.ws_ping { + trace!("🏓 Ping present"); let msg = if let Some(broadcasts) = self.broadcast_delta.clone() { - debug!("sending a broadcast delta"); + trace!("🏓sending a broadcast delta"); let server_msg = ServerMessage::Broadcast { broadcasts: Broadcast::vec_into_hashmap(broadcasts), }; let s = server_msg.to_json().chain_err(|| "failed to serialize")?; Message::Text(s) } else { - debug!("sending a ws ping"); + trace!("🏓sending a ws ping"); Message::Ping(Vec::new()) }; match self.inner.start_send(msg)? { AsyncSink::Ready => { - debug!("ws ping sent"); + trace!("🏓ws ping sent"); self.ws_ping = false; } AsyncSink::NotReady(_) => { - debug!("ws ping not ready to be sent"); + trace!("🏓ws ping not ready to be sent"); return Ok(Async::NotReady); } } + } else { + trace!("🏓No Ping"); } Ok(Async::Ready(())) } @@ -805,7 +827,7 @@ where }; match msg { Message::Text(ref s) => { - trace!("text message {}", s); + trace!("🚟 text message {}", s); let msg = s .parse() .chain_err(|| ErrorKind::InvalidClientMessage(s.to_owned()))?; diff --git a/autopush/src/settings.rs b/autopush/src/settings.rs index 6cec57eb3..4ac931ac6 100644 --- a/autopush/src/settings.rs +++ b/autopush/src/settings.rs @@ -216,15 +216,21 @@ mod tests { use std::env; let port = format!("{}__PORT", ENV_PREFIX).to_uppercase(); let msg_limit = format!("{}__MSG_LIMIT", ENV_PREFIX).to_uppercase(); + let fernet = format!("{}__CRYPTO_KEY", ENV_PREFIX).to_uppercase(); let v1 = env::var(&port); let v2 = env::var(&msg_limit); env::set_var(&port, "9123"); env::set_var(&msg_limit, "123"); + env::set_var(&fernet, "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]"); let settings = Settings::with_env_and_config_files(&Vec::new()).unwrap(); assert_eq!(settings.endpoint_hostname, "localhost".to_owned()); assert_eq!(&settings.port, &9123); assert_eq!(&settings.msg_limit, &123); + assert_eq!( + &settings.crypto_key, + "[mqCGb8D-N7mqx6iWJov9wm70Us6kA9veeXdb8QUuzLQ=]" + ); // reset (just in case) if let Ok(p) = v1 { @@ -239,5 +245,6 @@ mod tests { } else { env::remove_var(&msg_limit); } + env::remove_var(&fernet); } } diff --git a/tests/test_integration_all_rust.py b/tests/test_integration_all_rust.py index c860ffeee..219d971ac 100644 --- a/tests/test_integration_all_rust.py +++ b/tests/test_integration_all_rust.py @@ -2,12 +2,14 @@ Rust Connection and Endpoint Node Integration Tests """ +from base64 import urlsafe_b64encode import copy import json import logging import os import random import signal +import string import socket import subprocess @@ -70,6 +72,7 @@ CN_QUEUES = [] EP_QUEUES = [] STRICT_LOG_COUNTS = True +RUST_LOG = "autoendpoint=debug,autopush_rs=debug,autopush_common=debug,error" def get_free_port(): @@ -97,7 +100,7 @@ def get_free_port(): router_tablename=ROUTER_TABLE, message_tablename=MESSAGE_TABLE, crypto_key="[{}]".format(CRYPTO_KEY), - auto_ping_interval=60.0, + auto_ping_interval=30.0, auto_ping_timeout=10.0, close_handshake_timeout=5, max_connections=5000, @@ -246,7 +249,7 @@ def delete_notification(self, channel, message=None, status=204): def send_notification(self, channel=None, version=None, data=None, use_header=True, status=None, ttl=200, timeout=0.2, vapid=None, endpoint=None, - topic=None): + topic=None, headers=None): if not channel: channel = random.choice(self.channels.keys()) @@ -284,6 +287,7 @@ def send_notification(self, channel=None, version=None, data=None, status = status or 201 log.debug("%s body: %s", method, body) + log.debug(" headers: %s", headers) http.request(method, url.path.encode("utf-8"), body, headers) resp = http.getresponse() log.debug("%s Response (%s): %s", method, resp.status, resp.read()) @@ -332,7 +336,8 @@ def get_broadcast(self, timeout=1): # pragma: nocover result = json.loads(d) assert result.get("messageType") == "broadcast" return result - except Exception: # pragma: nocover + except Exception as ex: # pragma: nocover + log.error("Error: {}".format(ex)) return None finally: self.ws.settimeout(orig_timeout) @@ -587,7 +592,6 @@ def setup_endpoint_server(): global EP_SERVER # Set up environment - os.environ["RUST_LOG"] = "trace" # NOTE: # due to a change in Config, autoendpoint uses a double # underscore as a separator (e.g. "AUTOEND__FCM__MIN_TTL" == @@ -609,7 +613,7 @@ def setup_endpoint_server(): def setup_module(): global CN_SERVER, CN_QUEUES, CN_MP_SERVER, MOCK_SERVER_THREAD, \ - STRICT_LOG_COUNTS + STRICT_LOG_COUNTS, RUST_LOG if "SKIP_INTEGRATION" in os.environ: # pragma: nocover raise SkipTest("Skipping integration tests") @@ -624,6 +628,7 @@ def setup_module(): setup_mock_server() + os.environ["RUST_LOG"] = RUST_LOG connection_binary = get_rust_binary_path("autopush_rs") setup_connection_server(connection_binary) setup_megaphone_server(connection_binary) @@ -1208,6 +1213,26 @@ def test_empty_message_with_crypto_headers(self): yield self.shut_down(client) + @inlineCallbacks + def test_big_message(self): + """Test that we accept a large message. + + Using pywebpush I encoded a 4096 block + of random data into a 4216b block. B64 encoding that produced a block that was + 5624 bytes long. We'll skip the binary bit for a 4216 block of "text" we then + b64 encode to send. + """ + import base64; + client = yield self.quick_register() + data = base64.urlsafe_b64encode( + ''.join(random.choice(string.ascii_letters+string.digits+string.punctuation) + for _ in xrange(0, 4216)) + ) + result = yield client.send_notification(data=data) + dd = result.get("data") + dh = base64.b64decode(dd + "==="[:len(dd) % 4]) + assert dh == data + # Need to dig into this test a bit more. I'm not sure it's structured correctly # since we resolved a bug about returning 202 v. 201, and it's using a dependent # library to do the Client calls. In short, this test will fail in `send_notification()`