diff --git a/Cargo.lock b/Cargo.lock index be772b62f0..900ce4fa1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3263,7 +3263,6 @@ dependencies = [ "actix-web", "serde 1.0.135", "serde_json", - "slog-scope", ] [[package]] diff --git a/syncstorage/src/db/mod.rs b/syncstorage/src/db/mod.rs index abe96fa23e..6263fff10d 100644 --- a/syncstorage/src/db/mod.rs +++ b/syncstorage/src/db/mod.rs @@ -7,15 +7,15 @@ pub mod spanner; mod tests; pub mod transaction; -pub(crate) use mysql::models::blocking_error_to_db_error; - use std::time::Duration; use cadence::{Gauged, StatsdClient}; +use futures::TryFutureExt; use syncstorage_db_common::{ error::{DbError, DbErrorKind}, results, DbPool, }; +use tokio::{self, task, time}; use url::Url; use crate::server::metrics::Metrics; @@ -45,7 +45,7 @@ pub fn spawn_pool_periodic_reporter( .expect("Couldn't get hostname") .into_string() .expect("Couldn't get hostname"); - actix_rt::spawn(async move { + tokio::spawn(async move { loop { let results::PoolState { connections, @@ -62,8 +62,26 @@ pub fn spawn_pool_periodic_reporter( .gauge_with_tags("storage.pool.connections.idle", idle_connections as u64) .with_tag("hostname", &hostname) .send(); - actix_rt::time::delay_for(interval).await; + time::delay_for(interval).await; } }); Ok(()) } + +pub async fn run_on_blocking_threadpool(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + task::spawn_blocking(f) + .map_err(|err| { + if err.is_cancelled() { + DbError::internal("Db threadpool operation cancelled") + } else if err.is_panic() { + DbError::internal("Db threadpool operation panicked") + } else { + DbError::internal("Db threadpool operation failed for unknown reason") + } + }) + .await? +} diff --git a/syncstorage/src/db/mysql/models.rs b/syncstorage/src/db/mysql/models.rs index 6279624b83..e842006e86 100644 --- a/syncstorage/src/db/mysql/models.rs +++ b/syncstorage/src/db/mysql/models.rs @@ -1,5 +1,3 @@ -use actix_web::web::block; - use futures::future::TryFutureExt; use std::{self, cell::RefCell, collections::HashMap, fmt, ops::Deref, sync::Arc}; @@ -30,6 +28,7 @@ use super::{ pool::CollectionCache, schema::{bso, collections, user_collections}, }; +use crate::db; use crate::server::metrics::Metrics; use crate::settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS}; use crate::web::tags::Tags; @@ -73,7 +72,7 @@ struct MysqlDbSession { #[derive(Clone, Debug)] pub struct MysqlDb { - /// Synchronous Diesel calls are executed in actix_web::web::block to satisfy + /// Synchronous Diesel calls are executed in tokio::task::spawn_blocking to satisfy /// the Db trait's asynchronous interface. /// /// Arc provides a Clone impl utilized for safely moving to @@ -978,7 +977,6 @@ impl MysqlDb { self.session.borrow().timestamp } } - #[macro_export] macro_rules! sync_db_method { ($name:ident, $sync_name:ident, $type:ident) => { @@ -987,36 +985,22 @@ macro_rules! sync_db_method { ($name:ident, $sync_name:ident, $type:ident, $result:ty) => { fn $name(&self, params: params::$type) -> DbFuture<'_, $result> { let db = self.clone(); - Box::pin( - block(move || db.$sync_name(params)).map_err(|err| match err { - actix_web::error::BlockingError::Error(e) => e, - actix_web::error::BlockingError::Canceled => { - DbError::internal("Db threadpool operation canceled") - } - }), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.$sync_name(params) + })) } }; } -pub fn blocking_error_to_db_error(err: actix_web::error::BlockingError) -> DbError { - match err { - actix_web::error::BlockingError::Error(e) => e, - actix_web::error::BlockingError::Canceled => { - DbError::internal("Db threadpool operation canceled") - } - } -} - impl<'a> Db<'a> for MysqlDb { fn commit(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin(block(move || db.commit_sync()).map_err(blocking_error_to_db_error)) + Box::pin(db::run_on_blocking_threadpool(move || db.commit_sync())) } fn rollback(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin(block(move || db.rollback_sync()).map_err(blocking_error_to_db_error)) + Box::pin(db::run_on_blocking_threadpool(move || db.rollback_sync())) } fn begin(&self, for_write: bool) -> DbFuture<'_, ()> { @@ -1030,7 +1014,7 @@ impl<'a> Db<'a> for MysqlDb { fn check(&self) -> DbFuture<'_, results::Check> { let db = self.clone(); - Box::pin(block(move || db.check_sync()).map_err(blocking_error_to_db_error)) + Box::pin(db::run_on_blocking_threadpool(move || db.check_sync())) } sync_db_method!(lock_for_read, lock_for_read_sync, LockCollection); @@ -1090,7 +1074,9 @@ impl<'a> Db<'a> for MysqlDb { fn get_collection_id(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); - Box::pin(block(move || db.get_collection_id(&name)).map_err(blocking_error_to_db_error)) + Box::pin(db::run_on_blocking_threadpool(move || { + db.get_collection_id(&name) + })) } fn get_connection_info(&self) -> results::ConnectionInfo { @@ -1099,20 +1085,16 @@ impl<'a> Db<'a> for MysqlDb { fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); - Box::pin( - block(move || db.get_or_create_collection_id(&name)) - .map_err(blocking_error_to_db_error), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.get_or_create_collection_id(&name) + })) } fn update_collection(&self, param: params::UpdateCollection) -> DbFuture<'_, SyncTimestamp> { let db = self.clone(); - Box::pin( - block(move || { - db.update_collection(param.user_id.legacy_id as u32, param.collection_id) - }) - .map_err(blocking_error_to_db_error), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.update_collection(param.user_id.legacy_id as u32, param.collection_id) + })) } fn timestamp(&self) -> SyncTimestamp { @@ -1127,13 +1109,10 @@ impl<'a> Db<'a> for MysqlDb { fn clear_coll_cache(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin( - block(move || { - db.coll_cache.clear(); - Ok(()) - }) - .map_err(blocking_error_to_db_error), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.coll_cache.clear(); + Ok(()) + })) } fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) { diff --git a/syncstorage/src/db/mysql/pool.rs b/syncstorage/src/db/mysql/pool.rs index 0f11d75cc0..d0f028b64e 100644 --- a/syncstorage/src/db/mysql/pool.rs +++ b/syncstorage/src/db/mysql/pool.rs @@ -1,5 +1,3 @@ -use actix_web::web::block; - use async_trait::async_trait; use std::{ @@ -18,9 +16,10 @@ use diesel::{ use diesel_logger::LoggingConnection; use syncstorage_db_common::{error::DbError, results, Db, DbPool, STD_COLLS}; -use super::models::{self, MysqlDb, Result}; +use super::models::{MysqlDb, Result}; #[cfg(test)] use super::test::TestTransactionCustomizer; +use crate::db; use crate::server::metrics::Metrics; use crate::settings::{Quota, Settings}; @@ -104,9 +103,7 @@ impl MysqlDbPool { impl DbPool for MysqlDbPool { async fn get<'a>(&'a self) -> Result>> { let pool = self.clone(); - let db = block(move || pool.get_sync()) - .await - .map_err(models::blocking_error_to_db_error)?; + let db = db::run_on_blocking_threadpool(move || pool.get_sync()).await?; Ok(Box::new(db) as Box>) } diff --git a/syncstorage/src/db/spanner/manager/session.rs b/syncstorage/src/db/spanner/manager/session.rs index 59ce5adc11..b7767d55e1 100644 --- a/syncstorage/src/db/spanner/manager/session.rs +++ b/syncstorage/src/db/spanner/manager/session.rs @@ -1,4 +1,3 @@ -use actix_web::web::block; use google_cloud_rust_raw::spanner::v1::{ spanner::{CreateSessionRequest, GetSessionRequest, Session}, spanner_grpc::SpannerClient, @@ -7,7 +6,7 @@ use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, Metada use std::sync::Arc; use syncstorage_db_common::error::{DbError, DbErrorKind}; -use crate::db::spanner::now; +use crate::db::{self, spanner::now}; use crate::server::metrics::Metrics; const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; @@ -40,7 +39,7 @@ pub async fn create_spanner_session( emulator_host: Option, ) -> Result { let using_spanner_emulator = emulator_host.is_some(); - let chan = block(move || -> Result { + let chan = db::run_on_blocking_threadpool(move || -> Result { if let Some(spanner_emulator_address) = emulator_host { Ok(ChannelBuilder::new(env) .max_send_message_len(100 << 20) @@ -60,13 +59,7 @@ pub async fn create_spanner_session( .secure_connect(SPANNER_ADDRESS, creds)) } }) - .await - .map_err(|e| match e { - actix_web::error::BlockingError::Error(e) => e.into(), - actix_web::error::BlockingError::Canceled => { - DbError::internal("web::block Manager operation canceled") - } - })?; + .await?; let client = SpannerClient::new(chan); // Connect to the instance and create a Spanner session. diff --git a/syncstorage/src/error.rs b/syncstorage/src/error.rs index 10edf7837d..97557e11e3 100644 --- a/syncstorage/src/error.rs +++ b/syncstorage/src/error.rs @@ -150,16 +150,6 @@ impl ApiError { } } -impl From> for ApiError { - fn from(inner: actix_web::error::BlockingError) -> Self { - match inner { - actix_web::error::BlockingError::Error(e) => e, - actix_web::error::BlockingError::Canceled => { - ApiErrorKind::Internal("Db threadpool operation canceled".to_owned()).into() - } - } - } -} impl Error for ApiError { fn source(&self) -> Option<&(dyn Error + 'static)> { self.kind.source() diff --git a/syncstorage/src/tokenserver/auth/oauth.rs b/syncstorage/src/tokenserver/auth/oauth.rs index 2ad2242fa8..52d850990d 100644 --- a/syncstorage/src/tokenserver/auth/oauth.rs +++ b/syncstorage/src/tokenserver/auth/oauth.rs @@ -1,5 +1,6 @@ -use actix_web::{web, Error}; +use actix_web::Error; use async_trait::async_trait; +use futures::TryFutureExt; use pyo3::{ prelude::{Py, PyAny, PyErr, PyModule, Python}, types::{IntoPyDict, PyString}, @@ -7,7 +8,7 @@ use pyo3::{ use serde::{Deserialize, Serialize}; use serde_json; use tokenserver_common::error::TokenserverError; -use tokio::time; +use tokio::{task, time}; use super::VerifyToken; use crate::tokenserver::settings::Settings; @@ -73,7 +74,7 @@ impl VerifyToken for RemoteVerifier { async fn verify(&self, token: String) -> Result { let verifier = self.clone(); - let fut = web::block(move || { + let fut = task::spawn_blocking(move || { let maybe_verify_output_string = Python::with_gil(|py| { let client = verifier.inner.as_ref(py); // `client.verify_token(token)` @@ -101,6 +102,17 @@ impl VerifyToken for RemoteVerifier { } None => Err(TokenserverError::invalid_credentials("Unauthorized")), } + }) + .map_err(|err| { + if err.is_cancelled() { + error!("Tokenserver threadpool operation cancelled"); + } else if err.is_panic() { + error!("Tokenserver threadpool operation panicked"); + } else { + error!("Tokenserver threadpool operation failed for unknown reason"); + } + + TokenserverError::internal_error() }); // The PyFxA OAuth client does not offer a way to set a request timeout, so we set one here @@ -109,6 +121,6 @@ impl VerifyToken for RemoteVerifier { time::timeout(Duration::from_secs(self.timeout), fut) .await .map_err(|_| TokenserverError::resource_unavailable())? - .map_err(Into::into) + .map_err(Into::into)? } } diff --git a/syncstorage/src/tokenserver/db/models.rs b/syncstorage/src/tokenserver/db/models.rs index 698468b3fd..c495aa8280 100644 --- a/syncstorage/src/tokenserver/db/models.rs +++ b/syncstorage/src/tokenserver/db/models.rs @@ -1,4 +1,4 @@ -use actix_web::{http::StatusCode, web::block}; +use actix_web::http::StatusCode; use diesel::{ mysql::MysqlConnection, r2d2::{ConnectionManager, PooledConnection}, @@ -8,7 +8,6 @@ use diesel::{ #[cfg(test)] use diesel_logger::LoggingConnection; use futures::future::LocalBoxFuture; -use futures::TryFutureExt; use syncstorage_db_common::error::DbError; use std::{ @@ -618,7 +617,7 @@ impl Db for TokenserverDb { fn check(&self) -> DbFuture<'_, results::Check> { let db = self.clone(); - Box::pin(block(move || db.check_sync()).map_err(db::blocking_error_to_db_error)) + Box::pin(db::run_on_blocking_threadpool(move || db.check_sync())) } #[cfg(test)] diff --git a/syncstorage/src/tokenserver/db/pool.rs b/syncstorage/src/tokenserver/db/pool.rs index 3dcef04465..9088d7514e 100644 --- a/syncstorage/src/tokenserver/db/pool.rs +++ b/syncstorage/src/tokenserver/db/pool.rs @@ -1,4 +1,3 @@ -use actix_web::web::block; use async_trait::async_trait; use diesel::{ mysql::MysqlConnection, @@ -80,9 +79,8 @@ impl TokenserverPool { #[cfg(test)] pub async fn get_tokenserver_db(&self) -> Result { let pool = self.clone(); - let conn = block(move || pool.inner.get().map_err(DbError::from)) - .await - .map_err(db::blocking_error_to_db_error)?; + let conn = + db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?; Ok(TokenserverDb::new(conn, &self.metrics)) } @@ -95,9 +93,8 @@ impl DbPool for TokenserverPool { metrics.start_timer("tokenserver.storage.get_pool", None); let pool = self.clone(); - let conn = block(move || pool.inner.get().map_err(DbError::from)) - .await - .map_err(db::blocking_error_to_db_error)?; + let conn = + db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?; Ok(Box::new(TokenserverDb::new(conn, &self.metrics)) as Box) } diff --git a/tokenserver-common/Cargo.toml b/tokenserver-common/Cargo.toml index 6b07a8ed0f..c27a787adb 100644 --- a/tokenserver-common/Cargo.toml +++ b/tokenserver-common/Cargo.toml @@ -7,4 +7,3 @@ edition = "2021" actix-web = "3" serde = "1.0" serde_json = { version = "1.0", features = ["arbitrary_precision"] } -slog-scope = "4.3" diff --git a/tokenserver-common/src/error.rs b/tokenserver-common/src/error.rs index 9babde5c0f..1cc50a45ff 100644 --- a/tokenserver-common/src/error.rs +++ b/tokenserver-common/src/error.rs @@ -1,10 +1,6 @@ use std::fmt; -use actix_web::{ - error::{BlockingError, ResponseError}, - http::StatusCode, - HttpResponse, -}; +use actix_web::{http::StatusCode, HttpResponse, ResponseError}; use serde::{ ser::{SerializeMap, Serializer}, Serialize, @@ -112,18 +108,6 @@ impl TokenserverError { } } -impl From> for TokenserverError { - fn from(inner: BlockingError) -> Self { - match inner { - BlockingError::Error(e) => e, - BlockingError::Canceled => { - error!("Tokenserver threadpool operation canceled"); - TokenserverError::internal_error() - } - } - } -} - #[derive(Clone, Copy, Debug, PartialEq)] pub enum ErrorLocation { Header, diff --git a/tokenserver-common/src/lib.rs b/tokenserver-common/src/lib.rs index 71cd1a7103..a91e735174 100644 --- a/tokenserver-common/src/lib.rs +++ b/tokenserver-common/src/lib.rs @@ -1,4 +1 @@ pub mod error; - -#[macro_use] -extern crate slog_scope;