Skip to content

Commit

Permalink
feat: use Actix to spawn blocking threads (#1370)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethowitz authored Aug 9, 2022
1 parent ebf425f commit 1b1261f
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 79 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ target
.#*
service-account.json
.sentryclirc
.envrc

config/local.toml
tools/tokenserver/loadtests/*.pem
Expand Down
19 changes: 6 additions & 13 deletions syncstorage/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ pub mod transaction;

use std::time::Duration;

use actix_web::{error::BlockingError, web};
use cadence::{Gauged, StatsdClient};
use futures::TryFutureExt;
use syncstorage_db_common::{
error::{DbError, DbErrorKind},
results, DbPool, GetPoolState, PoolState,
};
use tokio::{self, task, time};
use tokio::{self, time};
use url::Url;

use crate::server::metrics::Metrics;
Expand Down Expand Up @@ -74,15 +74,8 @@ where
F: FnOnce() -> Result<T, DbError> + Send + 'static,
T: Send + 'static,
{
task::spawn_blocking(f)
.map_err(|err| {
if err.is_cancelled() {
DbError::internal("Db threadpool operation cancelled")
} else if err.is_panic() {
DbError::internal("Db threadpool operation panicked")
} else {
DbError::internal("Db threadpool operation failed for unknown reason")
}
})
.await?
web::block(f).await.map_err(|e| match e {
BlockingError::Error(e) => e,
BlockingError::Canceled => DbError::internal("Db threadpool operation canceled"),
})
}
2 changes: 1 addition & 1 deletion syncstorage/src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl MysqlDbPool {
pub fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result<Self> {
let manager = ConnectionManager::<MysqlConnection>::new(settings.database_url.clone());
let builder = Pool::builder()
.max_size(settings.database_pool_max_size.unwrap_or(10))
.max_size(settings.database_pool_max_size)
.connection_timeout(Duration::from_secs(
settings.database_pool_connection_timeout.unwrap_or(30) as u64,
))
Expand Down
2 changes: 1 addition & 1 deletion syncstorage/src/db/spanner/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SpannerDbPool {
}

pub async fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result<Self> {
let max_size = settings.database_pool_max_size.unwrap_or(10) as usize;
let max_size = settings.database_pool_max_size as usize;
let wait = settings
.database_pool_connection_timeout
.map(|seconds| Duration::from_secs(seconds as u64));
Expand Down
4 changes: 2 additions & 2 deletions syncstorage/src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn get_test_settings() -> Settings {
.expect("Could not get pool_size in get_test_settings");
settings.port = port;
settings.host = host;
settings.database_pool_max_size = Some(pool_size + 1);
settings.database_pool_max_size = pool_size + 1;
settings
}

Expand Down Expand Up @@ -726,7 +726,7 @@ async fn lbheartbeat_max_pool_size_check() {
use actix_web::web::Buf;

let mut settings = get_test_settings();
settings.database_pool_max_size = Some(10);
settings.database_pool_max_size = 10;

let mut app = init_app!(settings).await;

Expand Down
78 changes: 61 additions & 17 deletions syncstorage/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Application settings objects and initialization
use std::{cmp::min, env};
use std::{
cmp::min,
env::{self, VarError},
};

use actix_cors::Cors;
use actix_web::http::header::{AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
Expand Down Expand Up @@ -53,7 +56,7 @@ pub struct Quota {
/// Optionally we can permanently fail the check after a set time period,
/// indicating that this instance should be evicted and replaced.
pub struct Deadman {
pub max_size: Option<u32>,
pub max_size: u32,
pub previous_count: usize,
pub clock_start: Option<time::Instant>,
pub expiry: Option<time::Instant>,
Expand Down Expand Up @@ -83,7 +86,7 @@ pub struct Settings {
pub port: u16,
pub host: String,
pub database_url: String,
pub database_pool_max_size: Option<u32>,
pub database_pool_max_size: u32,
// NOTE: Not supported by deadpool!
pub database_pool_min_idle: Option<u32>,
/// Pool timeout when waiting for a slot to become available, in seconds
Expand Down Expand Up @@ -144,7 +147,7 @@ impl Default for Settings {
port: DEFAULT_PORT,
host: "127.0.0.1".to_string(),
database_url: "mysql://[email protected]/syncstorage".to_string(),
database_pool_max_size: None,
database_pool_max_size: 10,
database_pool_min_idle: None,
database_pool_connection_lifespan: None,
database_pool_connection_max_idle: None,
Expand Down Expand Up @@ -203,6 +206,7 @@ impl Settings {
s.set_default("human_logs", false)?;
#[cfg(test)]
s.set_default("database_pool_connection_timeout", Some(30))?;
s.set_default("database_pool_max_size", 10)?;
// Max lifespan a connection should have.
s.set_default::<Option<String>>("database_connection_lifespan", None)?;
// Max time a connection should be idle before dropping.
Expand Down Expand Up @@ -245,6 +249,7 @@ impl Settings {
"tokenserver.database_url",
"mysql://[email protected]/tokenserver",
)?;
s.set_default("tokenserver.database_pool_max_size", 10)?;
s.set_default("tokenserver.enabled", false)?;
s.set_default(
"tokenserver.fxa_browserid_audience",
Expand Down Expand Up @@ -331,18 +336,7 @@ impl Settings {
ms.limits.max_total_bytes =
min(ms.limits.max_total_bytes, MAX_SPANNER_LOAD_SIZE as u32);
return Ok(ms);
}

if !s.uses_spanner() {
if let Some(database_pool_max_size) = s.database_pool_max_size {
// Db backends w/ blocking calls block via
// actix-threadpool: grow its size to accommodate the
// full number of connections
let default = num_cpus::get() * 5;
if (database_pool_max_size as usize) > default {
env::set_var("ACTIX_THREADPOOL", database_pool_max_size.to_string());
}
}
} else {
// No quotas for stand alone servers
s.limits.max_quota_limit = 0;
s.enable_quota = false;
Expand All @@ -354,6 +348,56 @@ impl Settings {
if s.enforce_quota {
s.enable_quota = true
}

if matches!(env::var("ACTIX_THREADPOOL"), Err(VarError::NotPresent)) {
// Db backends w/ blocking calls block via
// actix-threadpool: grow its size to accommodate the
// full number of connections
let total_db_pool_size = {
let syncstorage_pool_max_size = if s.uses_spanner() || s.disable_syncstorage
{
0
} else {
s.database_pool_max_size
};

let tokenserver_pool_max_size = if s.tokenserver.enabled {
s.tokenserver.database_pool_max_size
} else {
0
};

syncstorage_pool_max_size + tokenserver_pool_max_size
};

let fxa_threads = if s.tokenserver.enabled
&& s.tokenserver.fxa_oauth_primary_jwk.is_none()
&& s.tokenserver.fxa_oauth_secondary_jwk.is_none()
{
s.tokenserver
.additional_blocking_threads_for_fxa_requests
.ok_or_else(|| {
println!(
"If the Tokenserver OAuth JWK is not cached, additional blocking \
threads must be used to handle the requests to FxA."
);

let setting_name =
"tokenserver.additional_blocking_threads_for_fxa_requests";
ConfigError::NotFound(String::from(setting_name))
})?
} else {
0
};

env::set_var(
"ACTIX_THREADPOOL",
((total_db_pool_size + fxa_threads) as usize)
.max(num_cpus::get() * 5)
.to_string(),
);
}

s
}
Err(e) => match e {
Expand Down Expand Up @@ -545,7 +589,7 @@ pub fn test_settings() -> Settings {
.expect("Could not get Settings in get_test_settings");
settings.debug = true;
settings.port = 8000;
settings.database_pool_max_size = Some(1);
settings.database_pool_max_size = 1;
settings.database_use_test_transactions = true;
settings.database_pool_connection_max_idle = Some(300);
settings.database_pool_connection_lifespan = Some(300);
Expand Down
30 changes: 12 additions & 18 deletions syncstorage/src/tokenserver/auth/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use actix_web::{error::BlockingError, web};
use async_trait::async_trait;
use futures::TryFutureExt;
use pyo3::{
prelude::{Py, PyAny, PyErr, PyModule, Python},
types::{IntoPyDict, PyString},
};
use serde::{Deserialize, Serialize};
use serde_json;
use tokenserver_common::error::TokenserverError;
use tokio::{task, time};
use tokio::time;

use super::VerifyToken;
use crate::tokenserver::settings::{Jwk, Settings};
Expand Down Expand Up @@ -154,21 +154,9 @@ impl VerifyToken for Verifier {

// If the JWK is not cached, PyFxA will make a request to FxA to retrieve it, blocking
// this thread. To improve performance, we make the request on a thread in a threadpool
// specifically used for blocking operations.
let fut = task::spawn_blocking(move || verify_inner(&verifier)).map_err(|err| {
let context = if err.is_cancelled() {
"Tokenserver threadpool operation cancelled"
} else if err.is_panic() {
"Tokenserver threadpool operation panicked"
} else {
"Tokenserver threadpool operation failed for unknown reason"
};

TokenserverError {
context: context.to_owned(),
..TokenserverError::internal_error()
}
});
// specifically used for blocking operations. The JWK should _always_ be cached in
// production to maximize performance.
let fut = web::block(move || verify_inner(&verifier)); //.map_err(|err| {

// The PyFxA OAuth client does not offer a way to set a request timeout, so we set one here
// by timing out the future if the verification process blocks this thread for longer
Expand All @@ -179,7 +167,13 @@ impl VerifyToken for Verifier {
context: "OAuth verification timeout".to_owned(),
..TokenserverError::resource_unavailable()
})?
.map_err(|_| TokenserverError::resource_unavailable())?
.map_err(|e| match e {
BlockingError::Error(_) => TokenserverError::resource_unavailable(),
BlockingError::Canceled => TokenserverError {
context: "Tokenserver threadpool operation failed".to_owned(),
..TokenserverError::internal_error()
},
})
}
}
}
2 changes: 1 addition & 1 deletion syncstorage/src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TokenserverPool {

let manager = ConnectionManager::<MysqlConnection>::new(settings.database_url.clone());
let builder = Pool::builder()
.max_size(settings.database_pool_max_size.unwrap_or(10))
.max_size(settings.database_pool_max_size)
.connection_timeout(Duration::from_secs(
settings.database_pool_connection_timeout.unwrap_or(30) as u64,
))
Expand Down
11 changes: 9 additions & 2 deletions syncstorage/src/tokenserver/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct Settings {
/// The URL of the Tokenserver MySQL database.
pub database_url: String,
/// The max size of the database connection pool.
pub database_pool_max_size: Option<u32>,
pub database_pool_max_size: u32,
// NOTE: Not supported by deadpool!
/// The minimum number of database connections to be maintained at any given time.
pub database_pool_min_idle: Option<u32>,
Expand Down Expand Up @@ -57,6 +57,12 @@ pub struct Settings {
pub run_migrations: bool,
/// The database ID of the Spanner node.
pub spanner_node_id: Option<i32>,
/// The number of additional blocking threads to add to the blocking threadpool to handle
/// OAuth verification requests to FxA. This value is added to the `ACTIX_THREADPOOL` env var.
/// Note that this setting only applies if the OAuth public JWK is not cached, since OAuth
/// verifications do not require requests to FXA if the JWK is set on Tokenserver. The server
/// will return an error at startup if the JWK is not cached and this setting is `None`.
pub additional_blocking_threads_for_fxa_requests: Option<u32>,
}

#[derive(Clone, Debug, Deserialize)]
Expand All @@ -75,7 +81,7 @@ impl Default for Settings {
fn default() -> Settings {
Settings {
database_url: "mysql://[email protected]/tokenserver_rs".to_owned(),
database_pool_max_size: None,
database_pool_max_size: 10,
database_pool_min_idle: None,
database_pool_connection_timeout: Some(30),
enabled: false,
Expand All @@ -95,6 +101,7 @@ impl Default for Settings {
statsd_label: "syncstorage.tokenserver".to_owned(),
run_migrations: cfg!(test),
spanner_node_id: None,
additional_blocking_threads_for_fxa_requests: None,
}
}
}
1 change: 0 additions & 1 deletion syncstorage/src/web/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ mod tests {
port: 0,
host: "127.0.0.1".to_string(),
database_url: "".to_string(),
database_pool_max_size: None,
database_use_test_transactions: false,
limits: Default::default(),
master_secret: Secrets::new("Ted Koppel is a robot").unwrap(),
Expand Down
44 changes: 21 additions & 23 deletions syncstorage/src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,32 +671,30 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, ApiError> {
let active = db_state.connections - db_state.idle_connections;
let mut status_code = StatusCode::OK;

if let Some(max_size) = deadman.max_size {
if active >= max_size && db_state.idle_connections == 0 {
if deadman.clock_start.is_none() {
deadman.clock_start = Some(time::Instant::now());
}
status_code = StatusCode::INTERNAL_SERVER_ERROR;
} else if deadman.clock_start.is_some() {
deadman.clock_start = None
}
deadman.previous_count = db_state.idle_connections as usize;
{
*deadarc.write().await = deadman;
if active >= deadman.max_size && db_state.idle_connections == 0 {
if deadman.clock_start.is_none() {
deadman.clock_start = Some(time::Instant::now());
}
resp.insert("active_connections".to_string(), Value::from(active));
status_code = StatusCode::INTERNAL_SERVER_ERROR;
} else if deadman.clock_start.is_some() {
deadman.clock_start = None
}
deadman.previous_count = db_state.idle_connections as usize;
{
*deadarc.write().await = deadman;
}
resp.insert("active_connections".to_string(), Value::from(active));
resp.insert(
"idle_connections".to_string(),
Value::from(db_state.idle_connections),
);
if let Some(clock) = deadman.clock_start {
let duration: time::Duration = time::Instant::now() - clock;
resp.insert(
"idle_connections".to_string(),
Value::from(db_state.idle_connections),
"duration_ms".to_string(),
Value::from(duration.whole_milliseconds()),
);
if let Some(clock) = deadman.clock_start {
let duration: time::Duration = time::Instant::now() - clock;
resp.insert(
"duration_ms".to_string(),
Value::from(duration.whole_milliseconds()),
);
};
}
};

Ok(HttpResponseBuilder::new(status_code).json(json!(resp)))
}
Expand Down

0 comments on commit 1b1261f

Please sign in to comment.