Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve database error classification #1317

Merged
merged 2 commits into from
May 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 145 additions & 36 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,35 @@ pub struct Error {
}

impl Error {
#[track_caller]
fn new(typ: ErrorType) -> Self {
let trace = vec![Location::caller()];
Self { trace, typ }
}

#[track_caller]
pub fn generic(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Generic(s.to_string()),
}
Self::new(ErrorType::Generic(s.to_string()))
}

#[track_caller]
pub fn database(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Database(s.to_string()),
}
Self::new(ErrorType::Database(s.to_string()))
}

#[track_caller]
pub fn conflict(e: DbErr) -> Self {
Self::new(ErrorType::Conflict(e))
}

#[track_caller]
pub fn queue(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Queue(s.to_string()),
}
Self::new(ErrorType::Queue(s.to_string()))
}

#[track_caller]
pub fn validation(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Validation(s.to_string()),
}
Self::new(ErrorType::Validation(s.to_string()))
}

#[track_caller]
Expand All @@ -71,18 +70,22 @@ impl Error {

#[track_caller]
pub fn cache(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Cache(s.to_string()),
}
Self::new(ErrorType::Cache(s.to_string()))
}

#[track_caller]
pub fn timeout(s: impl fmt::Display) -> Self {
Self {
trace: vec![Location::caller()],
typ: ErrorType::Timeout(s.to_string()),
}
Self::new(ErrorType::Timeout(s.to_string()))
}

#[track_caller]
pub fn db_timeout(s: impl fmt::Display) -> Self {
Self::new(ErrorType::DbTimeout(s.to_string()))
}

#[track_caller]
pub fn connection_timeout(e: DbErr) -> Self {
Self::new(ErrorType::ConnectionTimeout(e))
}

#[track_caller]
Expand Down Expand Up @@ -142,8 +145,16 @@ impl<T> Traceable<T> for Result<T> {

impl From<DbErr> for Error {
#[track_caller]
fn from(value: DbErr) -> Self {
Error::database(value)
fn from(err: DbErr) -> Self {
if is_timeout_error(&err) {
Error::db_timeout(err)
} else if is_conflict_err(&err) {
Error::conflict(err)
} else if is_connection_timeout_error(&err) {
Error::connection_timeout(err)
} else {
Error::database(err)
}
}
}

Expand Down Expand Up @@ -206,7 +217,7 @@ impl From<lapin::Error> for Error {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum ErrorType {
/// A generic error
Generic(String),
Expand All @@ -222,6 +233,12 @@ pub enum ErrorType {
Cache(String),
/// Timeout error
Timeout(String),
/// Database timeout error
DbTimeout(String),
/// Connection timeout error
ConnectionTimeout(DbErr),
/// Conflict error
Conflict(DbErr),
}

impl fmt::Display for ErrorType {
Expand All @@ -234,6 +251,9 @@ impl fmt::Display for ErrorType {
Self::Http(s) => s.fmt(f),
Self::Cache(s) => s.fmt(f),
Self::Timeout(s) => s.fmt(f),
Self::DbTimeout(s) => s.fmt(f),
Self::ConnectionTimeout(s) => s.fmt(f),
Self::Conflict(s) => s.fmt(f),
}
}
}
Expand Down Expand Up @@ -415,15 +435,104 @@ impl From<crate::core::webhook_http_client::Error> for Error {
/// error "duplicate key value violates unique constraint". This is to be used in `map_err` calls
/// on creation/update of records
pub fn http_error_on_conflict(db_err: DbErr) -> Error {
match db_err {
DbErr::Query(RuntimeErr::SqlxError(e))
if e.as_database_error()
.and_then(|e| e.code())
.filter(|code| code == "23505")
.is_some() =>
{
HttpError::conflict(None, None).into()
if is_conflict_err(&db_err) {
HttpError::conflict(None, None).into()
} else {
Error::database(db_err)
}
}

pub fn is_conflict_err(db_err: &DbErr) -> bool {
use DbErr as E;
let rt_err = match db_err {
E::Exec(e) | E::Query(e) | E::Conn(e) => e,
// If sqlx ever extends this enum, I want a compile time error so we're forced to update this function.
// Hence we list out all the enumerations, rather than using a default match statement
E::TryIntoErr { .. }
| E::ConvertFromU64(_)
| E::UnpackInsertId
| E::UpdateGetPrimaryKey
| E::RecordNotFound(_)
| E::AttrNotSet(_)
| E::Custom(_)
| E::Type(_)
| E::Json(_)
| E::Migration(_)
| E::RecordNotInserted
| E::RecordNotUpdated
| E::ConnectionAcquire(_) => return false,
};

let sqlx_err = match rt_err {
RuntimeErr::SqlxError(e) => e,
RuntimeErr::Internal(_) => return false,
};

sqlx_err
.as_database_error()
.and_then(|e| e.code())
.filter(|code| code == "23505")
.is_some()
}

pub fn is_timeout_error(db_err: &DbErr) -> bool {
let runtime_err = match &db_err {
DbErr::Conn(e) | DbErr::Exec(e) | DbErr::Query(e) => e,
_ => return false,
};

let sqlx_err = match runtime_err {
RuntimeErr::SqlxError(e) => e,
RuntimeErr::Internal(_) => return false,
};

match sqlx_err.as_database_error() {
// STUPID - no other good way to ID statement timeouts
Some(e) => e
.message()
.contains("canceling statement due to statement timeout"),
None => false,
}
}

/// Returns true if the DbErr results from weirdness with a slow/long connection.
/// This is distinct from [is_timeout_error], which reports whether the underlying
/// query actually timed out on the pg side.
///
/// [is_connection_timeout_error] reports whether the connection to pg itself was slow
/// for some reason.
pub fn is_connection_timeout_error(db_err: &DbErr) -> bool {
use DbErr as E;
let rt_err = match db_err {
E::ConnectionAcquire(_) | E::Conn(_) => return true,
E::Exec(e) | E::Query(e) => e.to_string(),

// If sqlx ever extends this enum, I want a compile time error so we're forced to update this function.
// Hence we list out all the enumerations, rather than using a default match statement
E::TryIntoErr { .. }
| E::ConvertFromU64(_)
| E::UnpackInsertId
| E::UpdateGetPrimaryKey
| E::RecordNotFound(_)
| E::AttrNotSet(_)
| E::Custom(_)
| E::Type(_)
| E::Json(_)
| E::Migration(_)
| E::RecordNotInserted
| E::RecordNotUpdated => return false,
};

const ERRORS: [&str; 3] = [
"Connection pool timed out",
"Connection reset by peer",
"unexpected end of file",
];
for e in ERRORS {
if rt_err.contains(e) {
return true;
}
e => Error::database(e),
}

false
}
Loading