Skip to content

Commit

Permalink
Check database health in /health
Browse files Browse the repository at this point in the history
Also changed the `/__heartbeat__` handler from `/status` to `/health`.
  • Loading branch information
AzureMarker committed Jul 20, 2020
1 parent 7086e1a commit 9095d8b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 14 deletions.
46 changes: 41 additions & 5 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::db::error::{DbError, DbResult};
use crate::db::retry::{
retry_policy, retryable_delete_error, retryable_getitem_error, retryable_putitem_error,
retryable_updateitem_error,
retry_policy, retryable_delete_error, retryable_describe_table_error, retryable_getitem_error,
retryable_putitem_error, retryable_updateitem_error,
};
use autopush_common::db::{DynamoDbNotification, DynamoDbUser};
use autopush_common::notification::Notification;
use autopush_common::{ddb_item, hashmap, val};
use cadence::StatsdClient;
use rusoto_core::credential::StaticProvider;
use rusoto_core::{HttpClient, Region};
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_dynamodb::{
AttributeValue, DeleteItemInput, DynamoDb, DynamoDbClient, GetItemInput, PutItemInput,
UpdateItemInput,
AttributeValue, DeleteItemInput, DescribeTableError, DescribeTableInput, DynamoDb,
DynamoDbClient, GetItemInput, PutItemInput, UpdateItemInput,
};
use std::collections::HashSet;
use std::env;
Expand Down Expand Up @@ -181,4 +181,40 @@ impl DbClient {

Ok(())
}

/// Check if the router table exists
pub async fn router_table_exists(&self) -> DbResult<bool> {
self.table_exists(self.router_table.clone()).await
}

/// Check if the message table exists
pub async fn message_table_exists(&self) -> DbResult<bool> {
self.table_exists(self.message_table.clone()).await
}

/// Check if a table exists
async fn table_exists(&self, table_name: String) -> DbResult<bool> {
let describe_item = DescribeTableInput { table_name };

let output = match retry_policy()
.retry_if(
|| self.ddb.describe_table(describe_item.clone()),
retryable_describe_table_error(self.metrics.clone()),
)
.await
{
Ok(output) => output,
Err(RusotoError::Service(DescribeTableError::ResourceNotFound(_))) => {
return Ok(false);
}
Err(e) => return Err(e.into()),
};

let status = output
.table
.and_then(|table| table.table_status)
.ok_or(DbError::TableStatusUnknown)?;

Ok(["CREATING", "UPDATING", "ACTIVE"].contains(&status.as_str()))
}
}
10 changes: 9 additions & 1 deletion autoendpoint/src/db/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use thiserror::Error;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};
use rusoto_dynamodb::{
DeleteItemError, DescribeTableError, GetItemError, PutItemError, UpdateItemError,
};

pub type DbResult<T> = Result<T, DbError>;

Expand All @@ -19,6 +21,12 @@ pub enum DbError {
#[error("Database error while performing DeleteItem")]
DeleteItem(#[from] RusotoError<DeleteItemError>),

#[error("Database error while performing DescribeTable")]
DescribeTable(#[from] RusotoError<DescribeTableError>),

#[error("Error while deserializing database response: {0}")]
Deserialize(#[from] serde_dynamodb::Error),

#[error("Unable to determine table status")]
TableStatusUnknown,
}
20 changes: 19 additions & 1 deletion autoendpoint/src/db/retry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use again::RetryPolicy;
use cadence::{Counted, StatsdClient};
use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};
use rusoto_dynamodb::{
DeleteItemError, DescribeTableError, GetItemError, PutItemError, UpdateItemError,
};
use std::time::Duration;

/// Create a retry function for the given error
Expand All @@ -28,6 +30,22 @@ retryable_error!(retryable_updateitem_error, UpdateItemError, "update_item");
retryable_error!(retryable_putitem_error, PutItemError, "put_item");
retryable_error!(retryable_delete_error, DeleteItemError, "delete_item");

// DescribeTableError does not have a ProvisionedThroughputExceeded variant
pub fn retryable_describe_table_error(
metrics: StatsdClient,
) -> impl Fn(&RusotoError<DescribeTableError>) -> bool {
move |err| match err {
RusotoError::Service(DescribeTableError::InternalServerError(_)) => {
metrics
.incr_with_tags("database.retry")
.with_tag("error", "describe_table_error")
.send();
true
}
_ => false,
}
}

/// Build an exponential retry policy
pub fn retry_policy() -> RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(100))
Expand Down
2 changes: 1 addition & 1 deletion autoendpoint/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Server {
.service(web::resource("/status").route(web::get().to(status_route)))
.service(web::resource("/health").route(web::get().to(health_route)))
// Dockerflow
.service(web::resource("/__heartbeat__").route(web::get().to(status_route)))
.service(web::resource("/__heartbeat__").route(web::get().to(health_route)))
.service(web::resource("/__lbheartbeat__").route(web::get().to(lb_heartbeat_route)))
.service(web::resource("/__version__").route(web::get().to(version_route)))
})
Expand Down
36 changes: 30 additions & 6 deletions autoendpoint/src/server/routes/health.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
//! Health and Dockerflow routes
use actix_web::web::Json;
use crate::db::error::DbResult;
use crate::server::ServerState;
use actix_web::web::{Data, Json};
use actix_web::HttpResponse;
use serde_json::json;

/// Handle the `/health` route
pub async fn health_route() -> Json<serde_json::Value> {
// TODO: Get database table health
/// Handle the `/health` and `/__heartbeat__` routes
pub async fn health_route(state: Data<ServerState>) -> Json<serde_json::Value> {
let router_health = interpret_table_health(state.ddb.router_table_exists().await);
let message_health = interpret_table_health(state.ddb.message_table_exists().await);

Json(json!({
"status": "OK"
"status": "OK",
"version": env!("CARGO_PKG_VERSION"),
"router_table": router_health,
"message_table": message_health
}))
}

/// Handle the `/status` and `/__heartbeat__` routes
/// Convert the result of a DB health check to JSON
fn interpret_table_health(health: DbResult<bool>) -> serde_json::Value {
match health {
Ok(true) => json!({
"status": "OK"
}),
Ok(false) => json!({
"status": "NOT OK",
"cause": "Nonexistent table"
}),
Err(e) => json!({
"status": "NOT OK",
"cause": e.to_string()
}),
}
}

/// Handle the `/status` route
pub async fn status_route() -> Json<serde_json::Value> {
Json(json!({
"status": "OK",
Expand Down

0 comments on commit 9095d8b

Please sign in to comment.