Skip to content

Commit

Permalink
Improve database error classification (#1317)
Browse files Browse the repository at this point in the history
## Motivation

Currently, turning `sea_orm::DbErr` into `svix_server::Error` and
turning that error into a response gets us an internal server error. For
many kinds of database errors, we can actually return a more useful
status code (and an error message).

## Solution

Assign more meaningful HTTP status codes and error messages to some DB
errors.
  • Loading branch information
svix-jplatte authored May 16, 2024
2 parents 97be4e9 + 4a213e0 commit f096f73
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 36 deletions.
181 changes: 145 additions & 36 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,35 @@ pub struct Error {
}

impl Error {
#[track_caller]
fn new(typ: ErrorType) -> Self {
let trace = vec![Location::caller()];
Self { trace, typ }
}

#[track_caller]
pub fn generic(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Generic(s.to_string()),
}
Self::new(ErrorType::Generic(s.to_string()))
}

#[track_caller]
pub fn database(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Database(s.to_string()),
}
Self::new(ErrorType::Database(s.to_string()))
}

#[track_caller]
pub fn conflict(e: DbErr) -> Self {
Self::new(ErrorType::Conflict(e))
}

#[track_caller]
pub fn queue(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Queue(s.to_string()),
}
Self::new(ErrorType::Queue(s.to_string()))
}

#[track_caller]
pub fn validation(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Validation(s.to_string()),
}
Self::new(ErrorType::Validation(s.to_string()))
}

#[track_caller]
Expand All @@ -71,18 +70,22 @@ impl Error {

#[track_caller]
pub fn cache(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Cache(s.to_string()),
}
Self::new(ErrorType::Cache(s.to_string()))
}

#[track_caller]
pub fn timeout(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Timeout(s.to_string()),
}
Self::new(ErrorType::Timeout(s.to_string()))
}

#[track_caller]
pub fn db_timeout(s: impl fmt::Display) -> Self {
Self::new(ErrorType::DbTimeout(s.to_string()))
}

#[track_caller]
pub fn connection_timeout(e: DbErr) -> Self {
Self::new(ErrorType::ConnectionTimeout(e))
}

#[track_caller]
Expand Down Expand Up @@ -142,8 +145,16 @@ impl<T> Traceable<T> for Result<T> {

impl From<DbErr> for Error {
#[track_caller]
fn from(value: DbErr) -> Self {
Error::database(value)
fn from(err: DbErr) -> Self {
if is_timeout_error(&err) {
Error::db_timeout(err)
} else if is_conflict_err(&err) {
Error::conflict(err)
} else if is_connection_timeout_error(&err) {
Error::connection_timeout(err)
} else {
Error::database(err)
}
}
}

Expand Down Expand Up @@ -206,7 +217,7 @@ impl From<lapin::Error> for Error {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum ErrorType {
/// A generic error
Generic(String),
Expand All @@ -222,6 +233,12 @@ pub enum ErrorType {
Cache(String),
/// Timeout error
Timeout(String),
/// Database timeout error
DbTimeout(String),
/// Connection timeout error
ConnectionTimeout(DbErr),
/// Conflict error
Conflict(DbErr),
}

impl fmt::Display for ErrorType {
Expand All @@ -234,6 +251,9 @@ impl fmt::Display for ErrorType {
Self::Http(s) => s.fmt(f),
Self::Cache(s) => s.fmt(f),
Self::Timeout(s) => s.fmt(f),
Self::DbTimeout(s) => s.fmt(f),
Self::ConnectionTimeout(s) => s.fmt(f),
Self::Conflict(s) => s.fmt(f),
}
}
}
Expand Down Expand Up @@ -415,15 +435,104 @@ impl From<crate::core::webhook_http_client::Error> for Error {
/// error "duplicate key value violates unique constraint". This is to be used in `map_err` calls
/// on creation/update of records
pub fn http_error_on_conflict(db_err: DbErr) -> Error {
match db_err {
DbErr::Query(RuntimeErr::SqlxError(e))
if e.as_database_error()
.and_then(|e| e.code())
.filter(|code| code == "23505")
.is_some() =>
{
HttpError::conflict(None, None).into()
if is_conflict_err(&db_err) {
HttpError::conflict(None, None).into()
} else {
Error::database(db_err)
}
}

pub fn is_conflict_err(db_err: &DbErr) -> bool {
use DbErr as E;
let rt_err = match db_err {
E::Exec(e) | E::Query(e) | E::Conn(e) => e,
// If sqlx ever extends this enum, I want a compile time error so we're forced to update this function.
// Hence we list out all the enumerations, rather than using a default match statement
E::TryIntoErr { .. }
| E::ConvertFromU64(_)
| E::UnpackInsertId
| E::UpdateGetPrimaryKey
| E::RecordNotFound(_)
| E::AttrNotSet(_)
| E::Custom(_)
| E::Type(_)
| E::Json(_)
| E::Migration(_)
| E::RecordNotInserted
| E::RecordNotUpdated
| E::ConnectionAcquire(_) => return false,
};

let sqlx_err = match rt_err {
RuntimeErr::SqlxError(e) => e,
RuntimeErr::Internal(_) => return false,
};

sqlx_err
.as_database_error()
.and_then(|e| e.code())
.filter(|code| code == "23505")
.is_some()
}

pub fn is_timeout_error(db_err: &DbErr) -> bool {
let runtime_err = match &db_err {
DbErr::Conn(e) | DbErr::Exec(e) | DbErr::Query(e) => e,
_ => return false,
};

let sqlx_err = match runtime_err {
RuntimeErr::SqlxError(e) => e,
RuntimeErr::Internal(_) => return false,
};

match sqlx_err.as_database_error() {
// STUPID - no other good way to ID statement timeouts
Some(e) => e
.message()
.contains("canceling statement due to statement timeout"),
None => false,
}
}

/// Returns true if the DbErr results from weirdness with a slow/long connection.
/// This is distinct from [is_timeout_error], which reports whether the underlying
/// query actually timed out on the pg side.
///
/// [is_connection_timeout_error] reports whether the connection to pg itself was slow
/// for some reason.
pub fn is_connection_timeout_error(db_err: &DbErr) -> bool {
use DbErr as E;
let rt_err = match db_err {
E::ConnectionAcquire(_) | E::Conn(_) => return true,
E::Exec(e) | E::Query(e) => e.to_string(),

// If sqlx ever extends this enum, I want a compile time error so we're forced to update this function.
// Hence we list out all the enumerations, rather than using a default match statement
E::TryIntoErr { .. }
| E::ConvertFromU64(_)
| E::UnpackInsertId
| E::UpdateGetPrimaryKey
| E::RecordNotFound(_)
| E::AttrNotSet(_)
| E::Custom(_)
| E::Type(_)
| E::Json(_)
| E::Migration(_)
| E::RecordNotInserted
| E::RecordNotUpdated => return false,
};

const ERRORS: [&str; 3] = [
"Connection pool timed out",
"Connection reset by peer",
"unexpected end of file",
];
for e in ERRORS {
if rt_err.contains(e) {
return true;
}
e => Error::database(e),
}

false
}
3 changes: 3 additions & 0 deletions server/svix-server/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ struct PendingDispatch {
created_at: DateTimeUtc,
}

// Clippy fails to compute the first variant's size, stating it as
// "at least 0 bytes". They're actually very similar in size.
#[allow(clippy::large_enum_variant)]
enum CompletedDispatch {
Failed(FailedDispatch),
Successful(SuccessfulDispatch),
Expand Down

0 comments on commit f096f73

Please sign in to comment.