Skip to content

Commit

Permalink
remove actix dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan Donowitz committed Apr 15, 2022
1 parent 1f6bf5c commit 59dc0e6
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 108 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 22 additions & 4 deletions syncstorage/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ pub mod spanner;
mod tests;
pub mod transaction;

pub(crate) use mysql::models::blocking_error_to_db_error;

use std::time::Duration;

use cadence::{Gauged, StatsdClient};
use futures::TryFutureExt;
use syncstorage_db_common::{
error::{DbError, DbErrorKind},
results, DbPool,
};
use tokio::{self, task, time};
use url::Url;

use crate::server::metrics::Metrics;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn spawn_pool_periodic_reporter(
.expect("Couldn't get hostname")
.into_string()
.expect("Couldn't get hostname");
actix_rt::spawn(async move {
tokio::spawn(async move {
loop {
let results::PoolState {
connections,
Expand All @@ -62,8 +62,26 @@ pub fn spawn_pool_periodic_reporter(
.gauge_with_tags("storage.pool.connections.idle", idle_connections as u64)
.with_tag("hostname", &hostname)
.send();
actix_rt::time::delay_for(interval).await;
time::delay_for(interval).await;
}
});
Ok(())
}

pub async fn run_on_blocking_threadpool<F, T>(f: F) -> Result<T, DbError>
where
F: FnOnce() -> Result<T, DbError> + Send + 'static,
T: Send + 'static,
{
task::spawn_blocking(f)
.map_err(|err| {
if err.is_cancelled() {
DbError::internal("Db threadpool operation cancelled")
} else if err.is_panic() {
DbError::internal("Db threadpool operation panicked")
} else {
DbError::internal("Db threadpool operation failed for unknown reason")
}
})
.await?
}
63 changes: 21 additions & 42 deletions syncstorage/src/db/mysql/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use actix_web::web::block;

use futures::future::TryFutureExt;

use std::{self, cell::RefCell, collections::HashMap, fmt, ops::Deref, sync::Arc};
Expand Down Expand Up @@ -30,6 +28,7 @@ use super::{
pool::CollectionCache,
schema::{bso, collections, user_collections},
};
use crate::db;
use crate::server::metrics::Metrics;
use crate::settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS};
use crate::web::tags::Tags;
Expand Down Expand Up @@ -73,7 +72,7 @@ struct MysqlDbSession {

#[derive(Clone, Debug)]
pub struct MysqlDb {
/// Synchronous Diesel calls are executed in actix_web::web::block to satisfy
/// Synchronous Diesel calls are executed in tokio::task::spawn_blocking to satisfy
/// the Db trait's asynchronous interface.
///
/// Arc<MysqlDbInner> provides a Clone impl utilized for safely moving to
Expand Down Expand Up @@ -978,7 +977,6 @@ impl MysqlDb {
self.session.borrow().timestamp
}
}

#[macro_export]
macro_rules! sync_db_method {
($name:ident, $sync_name:ident, $type:ident) => {
Expand All @@ -987,36 +985,22 @@ macro_rules! sync_db_method {
($name:ident, $sync_name:ident, $type:ident, $result:ty) => {
fn $name(&self, params: params::$type) -> DbFuture<'_, $result> {
let db = self.clone();
Box::pin(
block(move || db.$sync_name(params)).map_err(|err| match err {
actix_web::error::BlockingError::Error(e) => e,
actix_web::error::BlockingError::Canceled => {
DbError::internal("Db threadpool operation canceled")
}
}),
)
Box::pin(db::run_on_blocking_threadpool(move || {
db.$sync_name(params)
}))
}
};
}

pub fn blocking_error_to_db_error(err: actix_web::error::BlockingError<DbError>) -> DbError {
match err {
actix_web::error::BlockingError::Error(e) => e,
actix_web::error::BlockingError::Canceled => {
DbError::internal("Db threadpool operation canceled")
}
}
}

impl<'a> Db<'a> for MysqlDb {
fn commit(&self) -> DbFuture<'_, ()> {
let db = self.clone();
Box::pin(block(move || db.commit_sync()).map_err(blocking_error_to_db_error))
Box::pin(db::run_on_blocking_threadpool(move || db.commit_sync()))
}

fn rollback(&self) -> DbFuture<'_, ()> {
let db = self.clone();
Box::pin(block(move || db.rollback_sync()).map_err(blocking_error_to_db_error))
Box::pin(db::run_on_blocking_threadpool(move || db.rollback_sync()))
}

fn begin(&self, for_write: bool) -> DbFuture<'_, ()> {
Expand All @@ -1030,7 +1014,7 @@ impl<'a> Db<'a> for MysqlDb {

fn check(&self) -> DbFuture<'_, results::Check> {
let db = self.clone();
Box::pin(block(move || db.check_sync()).map_err(blocking_error_to_db_error))
Box::pin(db::run_on_blocking_threadpool(move || db.check_sync()))
}

sync_db_method!(lock_for_read, lock_for_read_sync, LockCollection);
Expand Down Expand Up @@ -1090,7 +1074,9 @@ impl<'a> Db<'a> for MysqlDb {

fn get_collection_id(&self, name: String) -> DbFuture<'_, i32> {
let db = self.clone();
Box::pin(block(move || db.get_collection_id(&name)).map_err(blocking_error_to_db_error))
Box::pin(db::run_on_blocking_threadpool(move || {
db.get_collection_id(&name)
}))
}

fn get_connection_info(&self) -> results::ConnectionInfo {
Expand All @@ -1099,20 +1085,16 @@ impl<'a> Db<'a> for MysqlDb {

fn create_collection(&self, name: String) -> DbFuture<'_, i32> {
let db = self.clone();
Box::pin(
block(move || db.get_or_create_collection_id(&name))
.map_err(blocking_error_to_db_error),
)
Box::pin(db::run_on_blocking_threadpool(move || {
db.get_or_create_collection_id(&name)
}))
}

fn update_collection(&self, param: params::UpdateCollection) -> DbFuture<'_, SyncTimestamp> {
let db = self.clone();
Box::pin(
block(move || {
db.update_collection(param.user_id.legacy_id as u32, param.collection_id)
})
.map_err(blocking_error_to_db_error),
)
Box::pin(db::run_on_blocking_threadpool(move || {
db.update_collection(param.user_id.legacy_id as u32, param.collection_id)
}))
}

fn timestamp(&self) -> SyncTimestamp {
Expand All @@ -1127,13 +1109,10 @@ impl<'a> Db<'a> for MysqlDb {

fn clear_coll_cache(&self) -> DbFuture<'_, ()> {
let db = self.clone();
Box::pin(
block(move || {
db.coll_cache.clear();
Ok(())
})
.map_err(blocking_error_to_db_error),
)
Box::pin(db::run_on_blocking_threadpool(move || {
db.coll_cache.clear();
Ok(())
}))
}

fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) {
Expand Down
9 changes: 3 additions & 6 deletions syncstorage/src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use actix_web::web::block;

use async_trait::async_trait;

use std::{
Expand All @@ -18,9 +16,10 @@ use diesel::{
use diesel_logger::LoggingConnection;
use syncstorage_db_common::{error::DbError, results, Db, DbPool, STD_COLLS};

use super::models::{self, MysqlDb, Result};
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db;
use crate::server::metrics::Metrics;
use crate::settings::{Quota, Settings};

Expand Down Expand Up @@ -104,9 +103,7 @@ impl MysqlDbPool {
impl DbPool for MysqlDbPool {
async fn get<'a>(&'a self) -> Result<Box<dyn Db<'a>>> {
let pool = self.clone();
let db = block(move || pool.get_sync())
.await
.map_err(models::blocking_error_to_db_error)?;
let db = db::run_on_blocking_threadpool(move || pool.get_sync()).await?;

Ok(Box::new(db) as Box<dyn Db<'a>>)
}
Expand Down
13 changes: 3 additions & 10 deletions syncstorage/src/db/spanner/manager/session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use actix_web::web::block;
use google_cloud_rust_raw::spanner::v1::{
spanner::{CreateSessionRequest, GetSessionRequest, Session},
spanner_grpc::SpannerClient,
Expand All @@ -7,7 +6,7 @@ use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, Metada
use std::sync::Arc;
use syncstorage_db_common::error::{DbError, DbErrorKind};

use crate::db::spanner::now;
use crate::db::{self, spanner::now};
use crate::server::metrics::Metrics;

const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
Expand Down Expand Up @@ -40,7 +39,7 @@ pub async fn create_spanner_session(
emulator_host: Option<String>,
) -> Result<SpannerSession, DbError> {
let using_spanner_emulator = emulator_host.is_some();
let chan = block(move || -> Result<grpcio::Channel, grpcio::Error> {
let chan = db::run_on_blocking_threadpool(move || -> Result<grpcio::Channel, DbError> {
if let Some(spanner_emulator_address) = emulator_host {
Ok(ChannelBuilder::new(env)
.max_send_message_len(100 << 20)
Expand All @@ -60,13 +59,7 @@ pub async fn create_spanner_session(
.secure_connect(SPANNER_ADDRESS, creds))
}
})
.await
.map_err(|e| match e {
actix_web::error::BlockingError::Error(e) => e.into(),
actix_web::error::BlockingError::Canceled => {
DbError::internal("web::block Manager operation canceled")
}
})?;
.await?;
let client = SpannerClient::new(chan);

// Connect to the instance and create a Spanner session.
Expand Down
10 changes: 0 additions & 10 deletions syncstorage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,6 @@ impl ApiError {
}
}

impl From<actix_web::error::BlockingError<ApiError>> for ApiError {
fn from(inner: actix_web::error::BlockingError<ApiError>) -> Self {
match inner {
actix_web::error::BlockingError::Error(e) => e,
actix_web::error::BlockingError::Canceled => {
ApiErrorKind::Internal("Db threadpool operation canceled".to_owned()).into()
}
}
}
}
impl Error for ApiError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.kind.source()
Expand Down
20 changes: 16 additions & 4 deletions syncstorage/src/tokenserver/auth/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use actix_web::{web, Error};
use actix_web::Error;
use async_trait::async_trait;
use futures::TryFutureExt;
use pyo3::{
prelude::{Py, PyAny, PyErr, PyModule, Python},
types::{IntoPyDict, PyString},
};
use serde::{Deserialize, Serialize};
use serde_json;
use tokenserver_common::error::TokenserverError;
use tokio::time;
use tokio::{task, time};

use super::VerifyToken;
use crate::tokenserver::settings::Settings;
Expand Down Expand Up @@ -73,7 +74,7 @@ impl VerifyToken for RemoteVerifier {
async fn verify(&self, token: String) -> Result<VerifyOutput, TokenserverError> {
let verifier = self.clone();

let fut = web::block(move || {
let fut = task::spawn_blocking(move || {
let maybe_verify_output_string = Python::with_gil(|py| {
let client = verifier.inner.as_ref(py);
// `client.verify_token(token)`
Expand Down Expand Up @@ -101,6 +102,17 @@ impl VerifyToken for RemoteVerifier {
}
None => Err(TokenserverError::invalid_credentials("Unauthorized")),
}
})
.map_err(|err| {
if err.is_cancelled() {
error!("Tokenserver threadpool operation cancelled");
} else if err.is_panic() {
error!("Tokenserver threadpool operation panicked");
} else {
error!("Tokenserver threadpool operation failed for unknown reason");
}

TokenserverError::internal_error()
});

// The PyFxA OAuth client does not offer a way to set a request timeout, so we set one here
Expand All @@ -109,6 +121,6 @@ impl VerifyToken for RemoteVerifier {
time::timeout(Duration::from_secs(self.timeout), fut)
.await
.map_err(|_| TokenserverError::resource_unavailable())?
.map_err(Into::into)
.map_err(Into::into)?
}
}
5 changes: 2 additions & 3 deletions syncstorage/src/tokenserver/db/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use actix_web::{http::StatusCode, web::block};
use actix_web::http::StatusCode;
use diesel::{
mysql::MysqlConnection,
r2d2::{ConnectionManager, PooledConnection},
Expand All @@ -8,7 +8,6 @@ use diesel::{
#[cfg(test)]
use diesel_logger::LoggingConnection;
use futures::future::LocalBoxFuture;
use futures::TryFutureExt;
use syncstorage_db_common::error::DbError;

use std::{
Expand Down Expand Up @@ -618,7 +617,7 @@ impl Db for TokenserverDb {

fn check(&self) -> DbFuture<'_, results::Check> {
let db = self.clone();
Box::pin(block(move || db.check_sync()).map_err(db::blocking_error_to_db_error))
Box::pin(db::run_on_blocking_threadpool(move || db.check_sync()))
}

#[cfg(test)]
Expand Down
11 changes: 4 additions & 7 deletions syncstorage/src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use actix_web::web::block;
use async_trait::async_trait;
use diesel::{
mysql::MysqlConnection,
Expand Down Expand Up @@ -80,9 +79,8 @@ impl TokenserverPool {
#[cfg(test)]
pub async fn get_tokenserver_db(&self) -> Result<TokenserverDb, DbError> {
let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from))
.await
.map_err(db::blocking_error_to_db_error)?;
let conn =
db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(TokenserverDb::new(conn, &self.metrics))
}
Expand All @@ -95,9 +93,8 @@ impl DbPool for TokenserverPool {
metrics.start_timer("tokenserver.storage.get_pool", None);

let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from))
.await
.map_err(db::blocking_error_to_db_error)?;
let conn =
db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(Box::new(TokenserverDb::new(conn, &self.metrics)) as Box<dyn Db>)
}
Expand Down
1 change: 0 additions & 1 deletion tokenserver-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ edition = "2021"
actix-web = "3"
serde = "1.0"
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
slog-scope = "4.3"
Loading

0 comments on commit 59dc0e6

Please sign in to comment.