Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: __lbheartbeat__ will return 500 if the connection pool is exhausted #997

Merged
merged 2 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ async fn get_test_state(settings: &Settings) -> ServerState {
metrics: Box::new(metrics),
port: settings.port,
quota_enabled: settings.enable_quota,
deadman: Arc::new(RwLock::new(Deadman::default())),
deadman: Arc::new(RwLock::new(Deadman {
max_size: settings.database_pool_max_size,
..Default::default()
})),
}
}

Expand Down Expand Up @@ -695,3 +698,60 @@ async fn overquota() {
let resp = app.call(req).await.unwrap();
assert!(resp.response().status().is_success());
}

#[actix_rt::test]
async fn lbheartbeat_check() {
use actix_web::web::Buf;

let mut settings = get_test_settings();
settings.database_pool_max_size = Some(10);

let mut app = init_app!(settings).await;

// Test all is well.
let lb_req = create_request(http::Method::GET, "/__lbheartbeat__", None, None).to_request();
let sresp = app.call(lb_req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status.is_success());

// Exhaust the connections.
let mut headers: HashMap<&str, String> = HashMap::new();
headers.insert("TEST_CONNECTIONS", "10".to_owned());
headers.insert("TEST_IDLES", "0".to_owned());
let req = create_request(
http::Method::GET,
"/__lbheartbeat__",
Some(headers.clone()),
None,
)
.to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status == StatusCode::INTERNAL_SERVER_ERROR);

// check duration for exhausted connections
std::thread::sleep(std::time::Duration::from_secs(1));
let req =
create_request(http::Method::GET, "/__lbheartbeat__", Some(headers), None).to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
let body = test::read_body(sresp).await;
let resp: HashMap<String, serde_json::value::Value> =
serde_json::de::from_str(std::str::from_utf8(body.bytes()).unwrap()).unwrap();
// dbg!(status, body, &resp);
assert!(status == StatusCode::INTERNAL_SERVER_ERROR);
assert!(resp.get("duration_ms").unwrap().as_u64().unwrap() > 1000);

// check recovery
let mut headers: HashMap<&str, String> = HashMap::new();
headers.insert("TEST_CONNECTIONS", "5".to_owned());
headers.insert("TEST_IDLES", "5".to_owned());
let req =
create_request(http::Method::GET, "/__lbheartbeat__", Some(headers), None).to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status == StatusCode::OK);
}
39 changes: 35 additions & 4 deletions src/web/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! API Handlers
use std::collections::HashMap;

use actix_web::{http::StatusCode, web::Data, Error, HttpRequest, HttpResponse};
use actix_web::{
dev::HttpResponseBuilder, http::StatusCode, web::Data, Error, HttpRequest, HttpResponse,
};
use serde::Serialize;
use serde_json::{json, Value};

Expand Down Expand Up @@ -553,15 +555,44 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {

let deadarc = state.deadman.clone();
let mut deadman = *deadarc.read().await;
let db_state = state.db_pool.clone().state();
let db_state = if cfg!(test) {
use crate::db::results::PoolState;
use actix_web::http::header::HeaderValue;
use std::str::FromStr;

let test_pool = PoolState {
connections: u32::from_str(
req.headers()
.get("TEST_CONNECTIONS")
.unwrap_or(&HeaderValue::from_static("0"))
.to_str()
.unwrap_or("0"),
)
.unwrap_or_default(),
idle_connections: u32::from_str(
req.headers()
.get("TEST_IDLES")
.unwrap_or(&HeaderValue::from_static("0"))
.to_str()
.unwrap_or("0"),
)
.unwrap_or_default(),
};
// dbg!(&test_pool, deadman.max_size);
test_pool
} else {
state.db_pool.clone().state()
};

let active = db_state.connections - db_state.idle_connections;
let mut status_code = StatusCode::OK;

if let Some(max_size) = deadman.max_size {
if active >= max_size && db_state.idle_connections == 0 {
if deadman.previous_count > 0 {
if deadman.clock_start.is_none() {
deadman.clock_start = Some(time::Instant::now());
}
status_code = StatusCode::INTERNAL_SERVER_ERROR;
Comment on lines 594 to +595
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe only set the 500 status in an else block? Might be safer avoiding healthy nodes who happen to hit the max conns only once reporting 500 here.

Suggested change
}
status_code = StatusCode::INTERNAL_SERVER_ERROR;
} else {
status_code = StatusCode::INTERNAL_SERVER_ERROR;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was the reasoning behind adding the duration_ms. I'm kinda/sorta ok if a saturated node returns a 500 early, because it can't really process more. Things reset fairly quickly once things become desaturated, and I think it takes multiple 500s returns before a node is terminated. We might want to talk with @Micheletto to see if that matches what he wants. I'd also be fine returning the 500 only after the saturation passes some Duration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based off of conversations with ops, returning 500 immediately is preferred. Monitoring will only kill a node once multiple 500 reports in a row are detected.

} else if deadman.clock_start.is_some() {
deadman.clock_start = None
}
Expand All @@ -583,7 +614,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {
};
}

Ok(HttpResponse::Ok().json(json!(resp)))
Ok(HttpResponseBuilder::new(status_code).json(json!(resp)))
}

// try returning an API error
Expand Down