Skip to content

Commit

Permalink
fix: move I/O calls to blocking threadpool (#1190)
Browse files Browse the repository at this point in the history
Closes #1188
  • Loading branch information
ethowitz authored Dec 21, 2021
1 parent 46d4a9e commit cbeebf4
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 295 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ docker_stop_spanner:
docker-compose -f docker-compose.spanner.yaml down

run:
RUST_LOG=debug RUST_BACKTRACE=full cargo run -- --config config/local.toml --features tokenserver_test_mode
RUST_LOG=debug RUST_BACKTRACE=full cargo run --features tokenserver_test_mode -- --config config/local.toml

run_spanner:
GOOGLE_APPLICATION_CREDENTIALS=$(PATH_TO_SYNC_SPANNER_KEYS) GRPC_DEFAULT_SSL_ROOTS_FILE_PATH=$(PATH_TO_GRPC_CERT) make run
Expand Down
13 changes: 4 additions & 9 deletions src/tokenserver/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ impl Db for MockDb {
Box::pin(future::ok(results::PostUser::default()))
}

fn allocate_user(&self, _params: params::AllocateUser) -> DbFuture<'_, results::AllocateUser> {
Box::pin(future::ok(results::AllocateUser::default()))
}

fn put_user(&self, _params: params::PutUser) -> DbFuture<'_, results::PutUser> {
Box::pin(future::ok(()))
}
Expand All @@ -78,6 +74,10 @@ impl Db for MockDb {
Box::pin(future::ok(()))
}

fn get_users(&self, _params: params::GetUsers) -> DbFuture<'_, results::GetUsers> {
Box::pin(future::ok(results::GetUsers::default()))
}

fn get_or_create_user(
&self,
_params: params::GetOrCreateUser,
Expand Down Expand Up @@ -106,11 +106,6 @@ impl Db for MockDb {
Box::pin(future::ok(results::GetUser::default()))
}

#[cfg(test)]
fn get_users(&self, _params: params::GetRawUsers) -> DbFuture<'_, results::GetRawUsers> {
Box::pin(future::ok(results::GetRawUsers::default()))
}

#[cfg(test)]
fn post_node(&self, _params: params::PostNode) -> DbFuture<'_, results::PostNode> {
Box::pin(future::ok(results::PostNode::default()))
Expand Down
491 changes: 235 additions & 256 deletions src/tokenserver/db/models.rs

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions src/tokenserver/db/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub struct PostService {
pub pattern: String,
}

pub struct GetUsers {
pub service_id: i32,
pub email: String,
}

#[derive(Clone, Default)]
pub struct GetOrCreateUser {
pub service_id: i32,
Expand Down Expand Up @@ -87,9 +92,6 @@ pub struct AddUserToNode {
pub node: String,
}

#[cfg(test)]
pub type GetRawUsers = String;

#[cfg(test)]
pub struct SetUserCreatedAt {
pub uid: i64,
Expand Down
8 changes: 8 additions & 0 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ impl TokenserverPool {
inner: builder.build(manager)?,
})
}

#[cfg(test)]
pub async fn get_tokenserver_db(&self) -> Result<Box<TokenserverDb>, DbError> {
let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(Box::new(TokenserverDb::new(conn)))
}
}

impl From<actix_web::error::BlockingError<DbError>> for DbError {
Expand Down
8 changes: 2 additions & 6 deletions src/tokenserver/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct GetRawUser {
pub replaced_at: Option<i64>,
}

pub type GetUsers = Vec<GetRawUser>;

#[derive(Debug, Default, PartialEq)]
pub struct AllocateUser {
pub uid: i64,
Expand Down Expand Up @@ -75,9 +77,6 @@ pub struct GetBestNode {

pub type AddUserToNode = ();

#[cfg(test)]
pub type GetRawUsers = Vec<GetRawUser>;

#[cfg(test)]
#[derive(Debug, Default, PartialEq, QueryableByName)]
pub struct GetUser {
Expand Down Expand Up @@ -141,9 +140,6 @@ pub type SetUserCreatedAt = ();
#[cfg(test)]
pub type SetUserReplacedAt = ();

#[cfg(test)]
pub type GetUsers = Vec<GetUser>;

pub type Check = bool;

#[cfg(test)]
Expand Down
20 changes: 18 additions & 2 deletions src/tokenserver/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::fmt;

use actix_web::{error::ResponseError, http::StatusCode, HttpResponse};
use actix_web::{
error::{BlockingError, ResponseError},
http::StatusCode,
HttpResponse,
};
use serde::{
ser::{SerializeMap, Serializer},
Serialize,
};

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct TokenserverError {
pub status: &'static str,
pub location: ErrorLocation,
Expand Down Expand Up @@ -99,6 +103,18 @@ impl TokenserverError {
}
}

impl From<BlockingError<TokenserverError>> for TokenserverError {
fn from(inner: BlockingError<TokenserverError>) -> Self {
match inner {
BlockingError::Error(e) => e,
BlockingError::Canceled => {
error!("Tokenserver threadpool operation canceled");
TokenserverError::internal_error()
}
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ErrorLocation {
Header,
Expand Down
49 changes: 31 additions & 18 deletions src/tokenserver/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use actix_web::{
dev::Payload,
http::StatusCode,
web::{Data, Query},
web::{self, Data, Query},
Error, FromRequest, HttpRequest,
};
use actix_web_httpauth::extractors::bearer::BearerAuth;
Expand All @@ -19,7 +19,7 @@ use regex::Regex;
use serde::Deserialize;
use sha2::Sha256;

use super::db::{self, models::Db, params, results};
use super::db::{self, models::Db, params, pool::DbPool, results};
use super::error::{ErrorLocation, TokenserverError};
use super::support::TokenData;
use super::NodeType;
Expand Down Expand Up @@ -195,11 +195,7 @@ impl FromRequest for TokenserverRequest {
};
let email = format!("{}@{}", fxa_uid, state.fxa_email_domain);
let user = {
let db = state.db_pool.get().await.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error()
})?;
let db = <Box<dyn Db>>::extract(&req).await?;

db.get_or_create_user(params::GetOrCreateUser {
service_id,
Expand Down Expand Up @@ -258,20 +254,37 @@ impl FromRequest for Box<dyn Db> {
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
let req = req.clone();

Box::pin(async move {
<Box<dyn DbPool>>::extract(&req)
.await?
.get()
.await
.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error().into()
})
})
}
}

impl FromRequest for Box<dyn DbPool> {
type Config = ();
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
let req = req.clone();

Box::pin(async move {
// XXX: Tokenserver state will no longer be an Option once the Tokenserver
// code is rolled out, so we will eventually be able to remove this unwrap().
let state = get_server_state(&req)?.as_ref().as_ref().unwrap();
let db = state.db_pool.get().await.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error()
})?;

Ok(db)
Ok(state.db_pool.clone())
})
}
}
Expand Down Expand Up @@ -303,20 +316,20 @@ impl FromRequest for TokenData {
let auth = BearerAuth::extract(&req)
.await
.map_err(|_| TokenserverError::invalid_credentials("Unsupported"))?;

// XXX: The Tokenserver state will no longer be an Option once the Tokenserver
// code is rolled out, so we will eventually be able to remove this unwrap().
let state = get_server_state(&req)?.as_ref().as_ref().unwrap();
let oauth_verifier = state.oauth_verifier.clone();

state
.oauth_verifier
.verify_token(auth.token())
web::block(move || oauth_verifier.verify_token(auth.token()))
.await
.map_err(TokenserverError::from)
.map_err(Into::into)
})
}
}

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
struct KeyId {
client_state: String,
keys_changed_at: i64,
Expand Down

0 comments on commit cbeebf4

Please sign in to comment.