Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move I/O calls to blocking threadpool #1190

Merged
merged 2 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ docker_stop_spanner:
docker-compose -f docker-compose.spanner.yaml down

run:
RUST_LOG=debug RUST_BACKTRACE=full cargo run -- --config config/local.toml --features tokenserver_test_mode
RUST_LOG=debug RUST_BACKTRACE=full cargo run --features tokenserver_test_mode -- --config config/local.toml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


run_spanner:
GOOGLE_APPLICATION_CREDENTIALS=$(PATH_TO_SYNC_SPANNER_KEYS) GRPC_DEFAULT_SSL_ROOTS_FILE_PATH=$(PATH_TO_GRPC_CERT) make run
Expand Down
13 changes: 4 additions & 9 deletions src/tokenserver/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ impl Db for MockDb {
Box::pin(future::ok(results::PostUser::default()))
}

fn allocate_user(&self, _params: params::AllocateUser) -> DbFuture<'_, results::AllocateUser> {
Box::pin(future::ok(results::AllocateUser::default()))
}

fn put_user(&self, _params: params::PutUser) -> DbFuture<'_, results::PutUser> {
Box::pin(future::ok(()))
}
Expand All @@ -78,6 +74,10 @@ impl Db for MockDb {
Box::pin(future::ok(()))
}

fn get_users(&self, _params: params::GetUsers) -> DbFuture<'_, results::GetUsers> {
Box::pin(future::ok(results::GetUsers::default()))
}

fn get_or_create_user(
&self,
_params: params::GetOrCreateUser,
Expand Down Expand Up @@ -106,11 +106,6 @@ impl Db for MockDb {
Box::pin(future::ok(results::GetUser::default()))
}

#[cfg(test)]
fn get_users(&self, _params: params::GetRawUsers) -> DbFuture<'_, results::GetRawUsers> {
Box::pin(future::ok(results::GetRawUsers::default()))
}

#[cfg(test)]
fn post_node(&self, _params: params::PostNode) -> DbFuture<'_, results::PostNode> {
Box::pin(future::ok(results::PostNode::default()))
Expand Down
491 changes: 235 additions & 256 deletions src/tokenserver/db/models.rs

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions src/tokenserver/db/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub struct PostService {
pub pattern: String,
}

pub struct GetUsers {
pub service_id: i32,
pub email: String,
}

#[derive(Clone, Default)]
pub struct GetOrCreateUser {
pub service_id: i32,
Expand Down Expand Up @@ -87,9 +92,6 @@ pub struct AddUserToNode {
pub node: String,
}

#[cfg(test)]
pub type GetRawUsers = String;

#[cfg(test)]
pub struct SetUserCreatedAt {
pub uid: i64,
Expand Down
8 changes: 8 additions & 0 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ impl TokenserverPool {
inner: builder.build(manager)?,
})
}

#[cfg(test)]
pub async fn get_tokenserver_db(&self) -> Result<Box<TokenserverDb>, DbError> {
let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(Box::new(TokenserverDb::new(conn)))
}
}

impl From<actix_web::error::BlockingError<DbError>> for DbError {
Expand Down
8 changes: 2 additions & 6 deletions src/tokenserver/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct GetRawUser {
pub replaced_at: Option<i64>,
}

pub type GetUsers = Vec<GetRawUser>;

#[derive(Debug, Default, PartialEq)]
pub struct AllocateUser {
pub uid: i64,
Expand Down Expand Up @@ -75,9 +77,6 @@ pub struct GetBestNode {

pub type AddUserToNode = ();

#[cfg(test)]
pub type GetRawUsers = Vec<GetRawUser>;

#[cfg(test)]
#[derive(Debug, Default, PartialEq, QueryableByName)]
pub struct GetUser {
Expand Down Expand Up @@ -141,9 +140,6 @@ pub type SetUserCreatedAt = ();
#[cfg(test)]
pub type SetUserReplacedAt = ();

#[cfg(test)]
pub type GetUsers = Vec<GetUser>;

pub type Check = bool;

#[cfg(test)]
Expand Down
20 changes: 18 additions & 2 deletions src/tokenserver/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::fmt;

use actix_web::{error::ResponseError, http::StatusCode, HttpResponse};
use actix_web::{
error::{BlockingError, ResponseError},
http::StatusCode,
HttpResponse,
};
use serde::{
ser::{SerializeMap, Serializer},
Serialize,
};

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct TokenserverError {
pub status: &'static str,
pub location: ErrorLocation,
Expand Down Expand Up @@ -99,6 +103,18 @@ impl TokenserverError {
}
}

impl From<BlockingError<TokenserverError>> for TokenserverError {
fn from(inner: BlockingError<TokenserverError>) -> Self {
match inner {
BlockingError::Error(e) => e,
BlockingError::Canceled => {
error!("Tokenserver threadpool operation canceled");
TokenserverError::internal_error()
}
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ErrorLocation {
Header,
Expand Down
49 changes: 31 additions & 18 deletions src/tokenserver/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use actix_web::{
dev::Payload,
http::StatusCode,
web::{Data, Query},
web::{self, Data, Query},
Error, FromRequest, HttpRequest,
};
use actix_web_httpauth::extractors::bearer::BearerAuth;
Expand All @@ -19,7 +19,7 @@ use regex::Regex;
use serde::Deserialize;
use sha2::Sha256;

use super::db::{self, models::Db, params, results};
use super::db::{self, models::Db, params, pool::DbPool, results};
use super::error::{ErrorLocation, TokenserverError};
use super::support::TokenData;
use super::NodeType;
Expand Down Expand Up @@ -195,11 +195,7 @@ impl FromRequest for TokenserverRequest {
};
let email = format!("{}@{}", fxa_uid, state.fxa_email_domain);
let user = {
let db = state.db_pool.get().await.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error()
})?;
let db = <Box<dyn Db>>::extract(&req).await?;

db.get_or_create_user(params::GetOrCreateUser {
service_id,
Expand Down Expand Up @@ -258,20 +254,37 @@ impl FromRequest for Box<dyn Db> {
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
let req = req.clone();

Box::pin(async move {
<Box<dyn DbPool>>::extract(&req)
.await?
.get()
.await
.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error().into()
})
})
}
}

impl FromRequest for Box<dyn DbPool> {
type Config = ();
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
let req = req.clone();

Box::pin(async move {
// XXX: Tokenserver state will no longer be an Option once the Tokenserver
// code is rolled out, so we will eventually be able to remove this unwrap().
let state = get_server_state(&req)?.as_ref().as_ref().unwrap();
let db = state.db_pool.get().await.map_err(|_| {
error!("⚠️ Could not acquire database connection");

TokenserverError::internal_error()
})?;

Ok(db)
Ok(state.db_pool.clone())
})
}
}
Expand Down Expand Up @@ -303,20 +316,20 @@ impl FromRequest for TokenData {
let auth = BearerAuth::extract(&req)
.await
.map_err(|_| TokenserverError::invalid_credentials("Unsupported"))?;

// XXX: The Tokenserver state will no longer be an Option once the Tokenserver
// code is rolled out, so we will eventually be able to remove this unwrap().
let state = get_server_state(&req)?.as_ref().as_ref().unwrap();
let oauth_verifier = state.oauth_verifier.clone();

state
.oauth_verifier
.verify_token(auth.token())
web::block(move || oauth_verifier.verify_token(auth.token()))
.await
.map_err(TokenserverError::from)
.map_err(Into::into)
})
}
}

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
struct KeyId {
client_state: String,
keys_changed_at: i64,
Expand Down