From 082dd1f0613fc91f3ea2f02b3bcdd9ddf2b938d3 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Tue, 23 Feb 2021 15:26:37 -0800 Subject: [PATCH] feat: kill old or excessively idled connections (#1006) * feat: kill old or excessivly idled connections * Two new optional arguments: _database_pool_connection_lifespan_ = max connection lifespan (in seconds) _database_pool_connection_max_idle_ = max idle time (in seconds) * add reporting (sorta) of connection lifespans to appropriate errors Issue: #861 --- src/db/error.rs | 3 + src/db/mock.rs | 4 ++ src/db/mod.rs | 2 + src/db/mysql/models.rs | 4 ++ src/db/results.rs | 6 ++ src/db/spanner/manager/deadpool.rs | 21 +++++-- src/db/spanner/manager/mod.rs | 2 +- src/db/spanner/manager/session.rs | 37 +++++++++++- src/db/spanner/models.rs | 21 +++++++ src/db/transaction.rs | 38 ++++++++++-- src/server/metrics.rs | 10 ++-- src/server/test.rs | 2 + src/settings.rs | 12 ++++ src/web/handlers.rs | 35 +++++++---- src/web/middleware/sentry.rs | 9 ++- src/web/tags.rs | 94 +++++++++++++++++++----------- 16 files changed, 236 insertions(+), 64 deletions(-) diff --git a/src/db/error.rs b/src/db/error.rs index cf256a8862..181f06c564 100644 --- a/src/db/error.rs +++ b/src/db/error.rs @@ -55,6 +55,9 @@ pub enum DbErrorKind { #[fail(display = "User over quota")] Quota, + + #[fail(display = "Connection expired")] + Expired, } impl DbError { diff --git a/src/db/mock.rs b/src/db/mock.rs index 8a015c45fa..ed45e5f88f 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -100,6 +100,10 @@ impl<'a> Db<'a> for MockDb { mock_db_method!(get_batch, GetBatch, Option); mock_db_method!(commit_batch, CommitBatch); + fn get_connection_info(&self) -> results::ConnectionInfo { + results::ConnectionInfo::default() + } + mock_db_method!(get_collection_id, GetCollectionId); #[cfg(test)] mock_db_method!(create_collection, CreateCollection); diff --git a/src/db/mod.rs b/src/db/mod.rs index 7b37500077..9970edf345 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -167,6 +167,8 @@ pub trait Db<'a>: Debug + 'a { fn check(&self) -> DbFuture<'_, results::Check>; + fn get_connection_info(&self) -> results::ConnectionInfo; + /// Retrieve the timestamp for an item/collection /// /// Modeled on the Python `get_resource_timestamp` function. diff --git a/src/db/mysql/models.rs b/src/db/mysql/models.rs index 5489a6de30..06f28b27fd 100644 --- a/src/db/mysql/models.rs +++ b/src/db/mysql/models.rs @@ -1091,6 +1091,10 @@ impl<'a> Db<'a> for MysqlDb { Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into)) } + fn get_connection_info(&self) -> results::ConnectionInfo { + results::ConnectionInfo::default() + } + #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); diff --git a/src/db/results.rs b/src/db/results.rs index 7caed8a745..117c781d63 100644 --- a/src/db/results.rs +++ b/src/db/results.rs @@ -87,6 +87,12 @@ pub struct PoolState { pub idle_connections: u32, } +#[derive(Debug, Default)] +pub struct ConnectionInfo { + pub age: i64, + pub idle: i64, +} + pub type GetCollectionId = i32; #[cfg(test)] diff --git a/src/db/spanner/manager/deadpool.rs b/src/db/spanner/manager/deadpool.rs index baaaf4249d..ae7a6dc649 100644 --- a/src/db/spanner/manager/deadpool.rs +++ b/src/db/spanner/manager/deadpool.rs @@ -23,6 +23,8 @@ pub struct SpannerSessionManager { env: Arc, metrics: Metrics, test_transactions: bool, + max_lifespan: Option, + max_idle: Option, } impl fmt::Debug for SpannerSessionManager { @@ -52,6 +54,8 @@ impl SpannerSessionManager { env, metrics: metrics.clone(), test_transactions, + max_lifespan: settings.database_pool_connection_lifespan, + max_idle: settings.database_pool_connection_max_idle, }) } } @@ -59,19 +63,26 @@ impl SpannerSessionManager { #[async_trait] impl Manager for SpannerSessionManager { async fn create(&self) -> Result { - create_spanner_session( + let session = create_spanner_session( Arc::clone(&self.env), self.metrics.clone(), &self.database_name, self.test_transactions, ) - .await + .await?; + Ok(session) } async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult { - recycle_spanner_session(conn, &self.database_name) - .await - .map_err(RecycleError::Backend) + recycle_spanner_session( + conn, + &self.database_name, + &self.metrics, + self.max_lifespan, + self.max_idle, + ) + .await + .map_err(RecycleError::Backend) } } diff --git a/src/db/spanner/manager/mod.rs b/src/db/spanner/manager/mod.rs index 02d34ecfc7..b1ee09326a 100644 --- a/src/db/spanner/manager/mod.rs +++ b/src/db/spanner/manager/mod.rs @@ -1,4 +1,4 @@ -mod bb8; +// mod bb8; mod deadpool; mod session; diff --git a/src/db/spanner/manager/session.rs b/src/db/spanner/manager/session.rs index 4efb637146..26da2b6c8a 100644 --- a/src/db/spanner/manager/session.rs +++ b/src/db/spanner/manager/session.rs @@ -5,8 +5,12 @@ use googleapis_raw::spanner::v1::{ }; use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, MetadataBuilder}; use std::sync::Arc; +use std::time::SystemTime; -use crate::{db::error::DbError, server::metrics::Metrics}; +use crate::{ + db::error::{DbError, DbErrorKind}, + server::metrics::Metrics, +}; const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; @@ -63,7 +67,38 @@ pub async fn create_spanner_session( pub async fn recycle_spanner_session( conn: &mut SpannerSession, database_name: &str, + metrics: &Metrics, + max_lifetime: Option, + max_idle: Option, ) -> Result<(), DbError> { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + if let Some(max_life) = max_lifetime { + // get the current UTC seconds + if let Some(age) = conn.session.create_time.clone().into_option() { + let age = now - age.seconds as u64; + if age > max_life as u64 { + metrics.incr("db.connection.max_life"); + dbg!("### aging out", conn.session.get_name()); + return Err(DbErrorKind::Expired.into()); + } + } + } + // check how long that this has been idle... + if let Some(max_idle) = max_idle { + if let Some(idle) = conn.session.approximate_last_use_time.clone().into_option() { + // get current UTC seconds + let idle = std::cmp::max(0, now as i64 - idle.seconds); + if idle > max_idle as i64 { + metrics.incr("db.connection.max_idle"); + dbg!("### idling out", conn.session.get_name()); + return Err(DbErrorKind::Expired.into()); + } + } + } + let mut req = GetSessionRequest::new(); req.set_name(conn.session.get_name().to_owned()); if let Err(e) = conn.client.get_session_async(&req)?.await { diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index d24b876340..79ff9929b6 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -5,6 +5,7 @@ use std::{ fmt, ops::Deref, sync::Arc, + time::SystemTime, }; use futures::future::TryFutureExt; @@ -2060,6 +2061,26 @@ impl<'a> Db<'a> for SpannerDb { Box::pin(async move { db.get_collection_id_async(&name).map_err(Into::into).await }) } + fn get_connection_info(&self) -> results::ConnectionInfo { + let session = self.conn.session.clone(); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + results::ConnectionInfo { + age: session + .create_time + .into_option() + .map(|time| now - time.seconds) + .unwrap_or_default(), + idle: session + .approximate_last_use_time + .into_option() + .map(|time| now - time.seconds) + .unwrap_or_default(), + } + } + #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); diff --git a/src/db/transaction.rs b/src/db/transaction.rs index 095e854237..a294b279e5 100644 --- a/src/db/transaction.rs +++ b/src/db/transaction.rs @@ -1,3 +1,4 @@ +use crate::db::results::ConnectionInfo; use crate::db::{params, Db, DbPool}; use crate::error::{ApiError, ApiErrorKind}; use crate::server::metrics::Metrics; @@ -6,15 +7,18 @@ use crate::web::extractors::{ BsoParam, CollectionParam, HawkIdentifier, PreConditionHeader, PreConditionHeaderOpt, }; use crate::web::middleware::SyncServerRequest; +use crate::web::tags::Tags; use crate::web::X_LAST_MODIFIED; + use actix_http::http::{HeaderValue, Method, StatusCode}; -use actix_http::Error; +use actix_http::{Error, Extensions}; use actix_web::dev::{Payload, PayloadStream}; use actix_web::http::header; use actix_web::web::Data; use actix_web::{FromRequest, HttpRequest, HttpResponse}; use futures::future::LocalBoxFuture; use futures::FutureExt; +use std::cell::RefMut; use std::future::Future; #[derive(Clone)] @@ -27,6 +31,13 @@ pub struct DbTransactionPool { precondition: PreConditionHeaderOpt, } +fn set_extra(exts: &mut RefMut<'_, Extensions>, connection_info: ConnectionInfo) { + let mut tags = Tags::default(); + tags.add_extra("connection_age", &connection_info.age.to_string()); + tags.add_extra("connection_idle", &connection_info.idle.to_string()); + tags.commit(exts); +} + impl DbTransactionPool { /// Perform an action inside of a DB transaction. If the action fails, the /// transaction is rolled back. If the action succeeds, the transaction is @@ -34,6 +45,7 @@ impl DbTransactionPool { /// action has succeeded (ex. check HTTP response for internal error). async fn transaction_internal<'a, A: 'a, R, F>( &'a self, + request: HttpRequest, action: A, ) -> Result<(R, Box>), Error> where @@ -53,6 +65,8 @@ impl DbTransactionPool { // Handle lock error if let Err(e) = result { + // Update the extra info fields. + set_extra(&mut request.extensions_mut(), db.get_connection_info()); db.rollback().await?; return Err(e.into()); } @@ -75,12 +89,16 @@ impl DbTransactionPool { } /// Perform an action inside of a DB transaction. - pub async fn transaction<'a, A: 'a, R, F>(&'a self, action: A) -> Result + pub async fn transaction<'a, A: 'a, R, F>( + &'a self, + request: HttpRequest, + action: A, + ) -> Result where A: FnOnce(Box>) -> F, F: Future> + 'a, { - let (resp, db) = self.transaction_internal(action).await?; + let (resp, db) = self.transaction_internal(request, action).await?; // No further processing before commit is possible db.commit().await?; @@ -89,13 +107,20 @@ impl DbTransactionPool { /// Perform an action inside of a DB transaction. This method will rollback /// if the HTTP response is an error. - pub async fn transaction_http<'a, A: 'a, F>(&'a self, action: A) -> Result + pub async fn transaction_http<'a, A: 'a, F>( + &'a self, + request: HttpRequest, + action: A, + ) -> Result where A: FnOnce(Box>) -> F, F: Future> + 'a, { + let mreq = request.clone(); let check_precondition = move |db: Box>| { async move { + // set the extra information for all requests so we capture default err handlers. + set_extra(&mut mreq.extensions_mut(), db.get_connection_info()); let resource_ts = db .extract_resource( self.user_id.clone(), @@ -144,7 +169,10 @@ impl DbTransactionPool { } }; - let (resp, db) = self.transaction_internal(check_precondition).await?; + let (resp, db) = self + .transaction_internal(request.clone(), check_precondition) + .await?; + // match on error and return a composed HttpResponse (so we can use the tags?) // HttpResponse can contain an internal error match resp.error() { diff --git a/src/server/metrics.rs b/src/server/metrics.rs index c9de85f0e3..ce472cec1d 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -57,7 +57,7 @@ impl Drop for Metrics { impl From<&HttpRequest> for Metrics { fn from(req: &HttpRequest) -> Self { let exts = req.extensions(); - let def_tags = Tags::from_request_head(req.head()); + let def_tags = Tags::from(req.head()); let tags = exts.get::().unwrap_or(&def_tags); Metrics { client: match req.app_data::>() { @@ -109,7 +109,7 @@ impl Metrics { pub fn start_timer(&mut self, label: &str, tags: Option) { let mut mtags = self.tags.clone().unwrap_or_default(); if let Some(t) = tags { - mtags.extend(t.tags) + mtags.extend(t) } trace!("⌚ Starting timer... {:?}", &label; &mtags); @@ -138,7 +138,7 @@ impl Metrics { let mut tagged = client.count_with_tags(label, count); let mut mtags = self.tags.clone().unwrap_or_default(); if let Some(tags) = tags { - mtags.extend(tags.tags); + mtags.extend(tags); } for key in mtags.tags.keys().clone() { if let Some(val) = mtags.tags.get(key) { @@ -207,7 +207,7 @@ mod tests { ), ); - let tags = Tags::from_request_head(&rh); + let tags = Tags::from(&rh); let mut result = HashMap::::new(); result.insert("ua.os.ver".to_owned(), "NT 10.0".to_owned()); @@ -233,7 +233,7 @@ mod tests { header::HeaderValue::from_static("Mozilla/5.0 (curl) Gecko/20100101 curl"), ); - let tags = Tags::from_request_head(&rh); + let tags = Tags::from(&rh); assert!(!tags.tags.contains_key("ua.os.ver")); println!("{:?}", tags); } diff --git a/src/server/test.rs b/src/server/test.rs index 832b820dd9..e7a310e4df 100644 --- a/src/server/test.rs +++ b/src/server/test.rs @@ -250,7 +250,9 @@ async fn test_endpoint_with_body( .call(req) .await .expect("Could not get sresponse in test_endpoint_with_body"); + dbg!("got response", sresponse.response().status()); assert!(sresponse.response().status().is_success()); + dbg!("all good"); test::read_body(sresponse).await } diff --git a/src/settings.rs b/src/settings.rs index d14b36d27c..fef7119612 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -52,6 +52,10 @@ pub struct Settings { pub database_pool_min_idle: Option, /// Pool timeout when waiting for a slot to become available, in seconds pub database_pool_connection_timeout: Option, + /// Max age a given connection should live, in seconds + pub database_pool_connection_lifespan: Option, + /// Max time a connection should sit idle before being dropped. + pub database_pool_connection_max_idle: Option, #[cfg(test)] pub database_use_test_transactions: bool, @@ -88,6 +92,8 @@ impl Default for Settings { tokenserver_database_url: None, database_pool_max_size: None, database_pool_min_idle: None, + database_pool_connection_lifespan: None, + database_pool_connection_max_idle: None, database_pool_connection_timeout: Some(30), #[cfg(test)] database_use_test_transactions: false, @@ -119,6 +125,10 @@ impl Settings { s.set_default("human_logs", false)?; #[cfg(test)] s.set_default("database_pool_connection_timeout", Some(30))?; + // Max lifespan a connection should have. + s.set_default::>("database_connection_lifespan", None)?; + // Max time a connection should be idle before dropping. + s.set_default::>("database_connection_max_idle", None)?; s.set_default("database_use_test_transactions", false)?; s.set_default("master_secret", "")?; // Each backend does their own default process, so specifying a "universal" value @@ -356,5 +366,7 @@ pub fn test_settings() -> Settings { settings.port = 8000; settings.database_pool_max_size = Some(1); settings.database_use_test_transactions = true; + settings.database_pool_connection_max_idle = Some(300); + settings.database_pool_connection_lifespan = Some(300); settings } diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 8c2a08fe59..4b26f9b3e6 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -1,5 +1,6 @@ //! API Handlers use std::collections::HashMap; +use std::convert::Into; use actix_web::{ dev::HttpResponseBuilder, http::StatusCode, web::Data, Error, HttpRequest, HttpResponse, @@ -31,9 +32,10 @@ pub const ONE_KB: f64 = 1024.0; pub async fn get_collections( meta: MetaRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { meta.metrics.incr("request.get_collections"); let result = db.get_collection_timestamps(meta.user_id).await?; @@ -47,9 +49,10 @@ pub async fn get_collections( pub async fn get_collection_counts( meta: MetaRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { meta.metrics.incr("request.get_collection_counts"); let result = db.get_collection_counts(meta.user_id).await?; @@ -63,9 +66,10 @@ pub async fn get_collection_counts( pub async fn get_collection_usage( meta: MetaRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { meta.metrics.incr("request.get_collection_usage"); let usage: HashMap<_, _> = db .get_collection_usage(meta.user_id) @@ -84,9 +88,10 @@ pub async fn get_collection_usage( pub async fn get_quota( meta: MetaRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { meta.metrics.incr("request.get_quota"); let usage = db.get_storage_usage(meta.user_id).await?; Ok(HttpResponse::Ok().json(vec![Some(usage as f64 / ONE_KB), None])) @@ -97,9 +102,10 @@ pub async fn get_quota( pub async fn delete_all( meta: MetaRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { meta.metrics.incr("request.delete_all"); Ok(HttpResponse::Ok().json(db.delete_storage(meta.user_id).await?)) }) @@ -109,9 +115,10 @@ pub async fn delete_all( pub async fn delete_collection( coll: CollectionRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { let delete_bsos = !coll.query.ids.is_empty(); let metrics = coll.metrics.clone(); let timestamp: ApiResult = if delete_bsos { @@ -149,14 +156,16 @@ pub async fn delete_collection( Ok(resp.json(timestamp)) }) .await + .map_err(Into::into) } pub async fn get_collection( coll: CollectionRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { coll.metrics.clone().incr("request.get_collection"); let params = params::GetBsos { user_id: coll.user_id.clone(), @@ -229,9 +238,10 @@ where pub async fn post_collection( coll: CollectionPostRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { coll.metrics.clone().incr("request.post_collection"); trace!("Collection: Post"); @@ -451,9 +461,10 @@ pub async fn post_collection_batch( pub async fn delete_bso( bso_req: BsoRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { bso_req.metrics.incr("request.delete_bso"); let result = db .delete_bso(params::DeleteBso { @@ -470,9 +481,10 @@ pub async fn delete_bso( pub async fn get_bso( bso_req: BsoRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { bso_req.metrics.incr("request.get_bso"); let result = db .get_bso(params::GetBso { @@ -493,9 +505,10 @@ pub async fn get_bso( pub async fn put_bso( bso_req: BsoPutRequest, db_pool: DbTransactionPool, + request: HttpRequest, ) -> Result { db_pool - .transaction_http(|db| async move { + .transaction_http(request, |db| async move { bso_req.metrics.incr("request.put_bso"); let result = db .put_bso(params::PutBso { diff --git a/src/web/middleware/sentry.rs b/src/web/middleware/sentry.rs index 884e97d942..291a1f64ba 100644 --- a/src/web/middleware/sentry.rs +++ b/src/web/middleware/sentry.rs @@ -99,7 +99,7 @@ where } fn call(&mut self, sreq: ServiceRequest) -> Self::Future { - let mut tags = Tags::from_request_head(sreq.head()); + let mut tags = Tags::from(sreq.head()); sreq.extensions_mut().insert(tags.clone()); Box::pin(self.service.call(sreq).and_then(move |mut sresp| { @@ -111,13 +111,20 @@ where for (k, v) in t.tags.clone() { tags.tags.insert(k, v); } + for (k, v) in t.extra.clone() { + tags.extra.insert(k, v); + } }; if let Some(t) = sresp.response().extensions().get::() { trace!("Sentry: found tags in response: {:?}", &t.tags); for (k, v) in t.tags.clone() { tags.tags.insert(k, v); } + for (k, v) in t.extra.clone() { + tags.extra.insert(k, v); + } }; + //dbg!(&tags); match sresp.response().error() { None => { // Middleware errors are eaten by current versions of Actix. Errors are now added diff --git a/src/web/tags.rs b/src/web/tags.rs index 03ffe312c6..02fc77b687 100644 --- a/src/web/tags.rs +++ b/src/web/tags.rs @@ -1,5 +1,7 @@ +use core::cell::RefMut; use std::collections::{BTreeMap, HashMap}; +use actix_http::Extensions; use actix_web::{ dev::{Payload, RequestHead}, http::header::USER_AGENT, @@ -52,37 +54,19 @@ fn insert_if_not_empty(label: &str, val: &str, tags: &mut HashMap(); -// tags.insert("SomeLabel".to_owned(), "whatever".to_owned()); -// ``` -// how you get the request (or the response, and it's set of `extensions`) to whatever -// function requires it, is left as an exercise for the reader. +/// Tags are extra data to be recorded in metric and logging calls. +/// +/// If additional tags are required or desired, you will need to add them to the +/// mutable extensions, e.g. +/// ```compile_fail +/// let mut tags = Tags::default(); +/// tags.add_tag("SomeLabel", "whatever"); +/// tags.commit(&mut request.extensions_mut()); +/// ``` impl Tags { - pub fn from_request_head(req_head: &RequestHead) -> Tags { - // Return an Option<> type because the later consumers (ApiErrors) presume that - // tags are optional and wrapped by an Option<> type. - let mut tags = HashMap::new(); - let mut extra = HashMap::new(); - if let Some(ua) = req_head.headers().get(USER_AGENT) { - if let Ok(uas) = ua.to_str() { - let (ua_result, metrics_os, metrics_browser) = parse_user_agent(uas); - insert_if_not_empty("ua.os.family", metrics_os, &mut tags); - insert_if_not_empty("ua.browser.family", metrics_browser, &mut tags); - insert_if_not_empty("ua.name", ua_result.name, &mut tags); - insert_if_not_empty("ua.os.ver", &ua_result.os_version.to_owned(), &mut tags); - insert_if_not_empty("ua.browser.ver", ua_result.version, &mut tags); - extra.insert("ua".to_owned(), uas.to_string()); - } - } - tags.insert("uri.method".to_owned(), req_head.method.to_string()); - // `uri.path` causes too much cardinality for influx but keep it in - // extra for sentry - extra.insert("uri.path".to_owned(), req_head.uri.to_string()); - Tags { tags, extra } + pub fn extend(&mut self, new_tags: Self) { + self.tags.extend(new_tags.tags); + self.extra.extend(new_tags.extra); } pub fn with_tags(tags: HashMap) -> Tags { @@ -95,15 +79,23 @@ impl Tags { } } + pub fn add_extra(&mut self, key: &str, value: &str) { + if !value.is_empty() { + self.extra.insert(key.to_owned(), value.to_owned()); + } + } + + pub fn add_tag(&mut self, key: &str, value: &str) { + if !value.is_empty() { + self.tags.insert(key.to_owned(), value.to_owned()); + } + } + pub fn get(&self, label: &str) -> String { let none = "None".to_owned(); self.tags.get(label).map(String::from).unwrap_or(none) } - pub fn extend(&mut self, tags: HashMap) { - self.tags.extend(tags); - } - pub fn tag_tree(self) -> BTreeMap { let mut result = BTreeMap::new(); @@ -121,6 +113,38 @@ impl Tags { } result } + + pub fn commit(self, exts: &mut RefMut<'_, Extensions>) { + match exts.get_mut::() { + Some(t) => t.extend(self), + None => exts.insert(self), + } + } +} + +impl From<&RequestHead> for Tags { + fn from(req_head: &RequestHead) -> Self { + // Return an Option<> type because the later consumers (ApiErrors) presume that + // tags are optional and wrapped by an Option<> type. + let mut tags = HashMap::new(); + let mut extra = HashMap::new(); + if let Some(ua) = req_head.headers().get(USER_AGENT) { + if let Ok(uas) = ua.to_str() { + let (ua_result, metrics_os, metrics_browser) = parse_user_agent(uas); + insert_if_not_empty("ua.os.family", metrics_os, &mut tags); + insert_if_not_empty("ua.browser.family", metrics_browser, &mut tags); + insert_if_not_empty("ua.name", ua_result.name, &mut tags); + insert_if_not_empty("ua.os.ver", &ua_result.os_version.to_owned(), &mut tags); + insert_if_not_empty("ua.browser.ver", ua_result.version, &mut tags); + extra.insert("ua".to_owned(), uas.to_string()); + } + } + tags.insert("uri.method".to_owned(), req_head.method.to_string()); + // `uri.path` causes too much cardinality for influx but keep it in + // extra for sentry + extra.insert("uri.path".to_owned(), req_head.uri.to_string()); + Tags { tags, extra } + } } impl FromRequest for Tags { @@ -133,7 +157,7 @@ impl FromRequest for Tags { let exts = req.extensions(); match exts.get::() { Some(t) => t.clone(), - None => Tags::from_request_head(req.head()), + None => Tags::from(req.head()), } };