Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function for automatic transaction retry #58

Merged
merged 11 commits into from
Dec 6, 2023
8 changes: 8 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ jobs:
- name: Check style
run: cargo fmt -- --check

check-without-cockroach:
runs-on: ubuntu-latest
steps:
# actions/checkout@v2
- uses: actions/checkout@72f2cec99f417b1a1c5e2e88945068983b7965f9
- name: Cargo check
run: cargo check --no-default-features
smklein marked this conversation as resolved.
Show resolved Hide resolved

build-and-test:
runs-on: ${{ matrix.os }}
strategy:
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ license = "MIT"
repository = "https://github.com/oxidecomputer/async-bb8-diesel"
keywords = ["diesel", "r2d2", "pool", "tokio", "async"]

[features]
# Enables CockroachDB-specific functions.
cockroach = []
default = [ "cockroach" ]
smklein marked this conversation as resolved.
Show resolved Hide resolved
smklein marked this conversation as resolved.
Show resolved Hide resolved

[dependencies]
bb8 = "0.8"
async-trait = "0.1.73"
Expand Down
210 changes: 184 additions & 26 deletions src/async_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use crate::connection::Connection as SingleConnection;
use async_trait::async_trait;
use diesel::{
connection::{Connection as DieselConnection, SimpleConnection, TransactionManager},
connection::{
Connection as DieselConnection, SimpleConnection, TransactionManager,
TransactionManagerStatus,
},
dsl::Limit,
query_dsl::{
methods::{ExecuteDsl, LimitDsl, LoadQuery},
Expand All @@ -25,21 +28,32 @@ where
async fn batch_execute_async(&self, query: &str) -> Result<(), DieselError>;
}

#[cfg(feature = "cockroach")]
fn retryable_error(err: &DieselError) -> bool {
use diesel::result::DatabaseErrorKind::SerializationFailure;
match err {
DieselError::DatabaseError(SerializationFailure, boxed_error_information) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like SerializationFailure is defined to be synonymous with code 40001, which is the condition that the CockroachDB docs say to look for. I think we shouldn't have to inspect the message? (Not sure we shouldn't. But I'm not sure what we should do if we find a different message here.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to just ignore the error message, and rely on the error code. Not sure what to do if these are inconsistent -- I think that's more a question for "how much do we trust our database", because them being out-of-sync seems like undocumented territory.

return boxed_error_information
.message()
.starts_with("restart transaction");
}
_ => false,
}
}

/// An async variant of [`diesel::connection::Connection`].
#[async_trait]
pub trait AsyncConnection<Conn>: AsyncSimpleConnection<Conn>
where
Conn: 'static + DieselConnection,
Self: Send,
Self: Send + Sized + 'static,
{
type OwnedConnection: Sync + Send + 'static;

#[doc(hidden)]
async fn get_owned_connection(&self) -> Self::OwnedConnection;
fn get_owned_connection(&self) -> Self;
#[doc(hidden)]
fn as_sync_conn(owned: &Self::OwnedConnection) -> MutexGuard<'_, Conn>;
fn as_sync_conn(&self) -> MutexGuard<'_, Conn>;
#[doc(hidden)]
fn as_async_conn(owned: &Self::OwnedConnection) -> &SingleConnection<Conn>;
smklein marked this conversation as resolved.
Show resolved Hide resolved
fn as_async_conn(&self) -> &SingleConnection<Conn>;

/// Runs the function `f` in an context where blocking is safe.
async fn run<R, E, Func>(&self, f: Func) -> Result<R, E>
Expand All @@ -48,40 +62,183 @@ where
E: Send + 'static,
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
{
let connection = self.get_owned_connection().await;
Self::run_with_connection(connection, f).await
let connection = self.get_owned_connection();
connection.run_with_connection(f).await
}

#[doc(hidden)]
async fn run_with_connection<R, E, Func>(
connection: Self::OwnedConnection,
f: Func,
) -> Result<R, E>
async fn run_with_connection<R, E, Func>(self, f: Func) -> Result<R, E>
where
R: Send + 'static,
E: Send + 'static,
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
{
spawn_blocking(move || f(&mut *Self::as_sync_conn(&connection)))
spawn_blocking(move || f(&mut *self.as_sync_conn()))
.await
.unwrap() // Propagate panics
}

#[doc(hidden)]
async fn run_with_shared_connection<R, E, Func>(
connection: Arc<Self::OwnedConnection>,
f: Func,
) -> Result<R, E>
async fn run_with_shared_connection<R, E, Func>(self: &Arc<Self>, f: Func) -> Result<R, E>
where
R: Send + 'static,
E: Send + 'static,
Func: FnOnce(&mut Conn) -> Result<R, E> + Send + 'static,
{
spawn_blocking(move || f(&mut *Self::as_sync_conn(&connection)))
let conn = self.clone();
spawn_blocking(move || f(&mut *conn.as_sync_conn()))
.await
.unwrap() // Propagate panics
}

#[doc(hidden)]
smklein marked this conversation as resolved.
Show resolved Hide resolved
async fn transaction_depth(&self) -> Result<u32, DieselError> {
let conn = self.get_owned_connection();

Self::run_with_connection(conn, |conn| {
match Conn::TransactionManager::transaction_manager_status_mut(&mut *conn) {
TransactionManagerStatus::Valid(status) => {
Ok(status.transaction_depth().map(|d| d.into()).unwrap_or(0))
}
TransactionManagerStatus::InError => Err(DieselError::BrokenTransactionManager),
}
})
.await
}

// Diesel's "begin_transaction" chooses whether to issue "BEGIN" or a
// "SAVEPOINT" depending on the transaction depth.
//
// This method is a wrapper around that call, with validation that
// we're actually issuing the BEGIN statement here.
#[doc(hidden)]
async fn start_transaction(self: &Arc<Self>) -> Result<(), DieselError> {
if self.transaction_depth().await? != 0 {
return Err(DieselError::AlreadyInTransaction);
}
self.run_with_shared_connection(|conn| Conn::TransactionManager::begin_transaction(conn))
.await?;
Ok(())
}

// Diesel's "begin_transaction" chooses whether to issue "BEGIN" or a
// "SAVEPOINT" depending on the transaction depth.
//
// This method is a wrapper around that call, with validation that
// we're actually issuing our first SAVEPOINT here.
#[doc(hidden)]
async fn add_retry_savepoint(self: &Arc<Self>) -> Result<(), DieselError> {
match self.transaction_depth().await? {
0 => return Err(DieselError::NotInTransaction),
1 => (),
_ => return Err(DieselError::AlreadyInTransaction),
};

self.run_with_shared_connection(|conn| Conn::TransactionManager::begin_transaction(conn))
.await?;
Ok(())
}
smklein marked this conversation as resolved.
Show resolved Hide resolved

#[doc(hidden)]
async fn commit_transaction(self: &Arc<Self>) -> Result<(), DieselError> {
self.run_with_shared_connection(|conn| Conn::TransactionManager::commit_transaction(conn))
.await?;
Ok(())
}

#[doc(hidden)]
async fn rollback_transaction(self: &Arc<Self>) -> Result<(), DieselError> {
self.run_with_shared_connection(|conn| {
Conn::TransactionManager::rollback_transaction(conn)
})
.await?;
Ok(())
}

/// Issues a function `f` as a transaction.
///
/// If it fails, asynchronously calls `retry` to decide if to retry.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we want to document here that this must not be used from within a transaction? Or is there some way to ensure that at compile-time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to ensure this at compile-time -- whether or not we're in a transaction is information attached to each connection, but it does not change the underlying "type" of the connection. If we wanted to embed this info here, I think we'd need to:

  1. Create a new "connection" type that prevents directly accessing all transaction-based methods
  2. Intercept all transaction methods, consuming self, and creating a new connection type (indicating we're in a transaction)
  3. On completion of the transaction, consume self again and reduce the transaction level

That said, we do check for it at run-time, and I have a test that catches this case. I'll add documentation indicating that a runtime error would be returned here.

#[cfg(feature = "cockroach")]
async fn transaction_async_with_retry<R, Func, Fut, RetryFut, RetryFunc, 'a>(
&'a self,
f: Func,
retry: RetryFunc,
) -> Result<R, DieselError>
where
R: Send + 'static,
Fut: Future<Output = Result<R, DieselError>> + Send,
Func: Fn(SingleConnection<Conn>) -> Fut + Send + Sync,
RetryFut: Future<Output = bool> + Send,
RetryFunc: Fn() -> RetryFut + Send + Sync,
{
// Check out a connection once, and use it for the duration of the
// operation.
let conn = Arc::new(self.get_owned_connection());

// Refer to CockroachDB's guide on advanced client-side transaction
// retries for the full context here. In short, they expect a particular
// name for this savepoint, but Diesel has Opinions on savepoint names,
smklein marked this conversation as resolved.
Show resolved Hide resolved
// so we use this session variable to identify that any name is valid.
//
// TODO: It may be preferable to set this once per connection -- but
// that'll require more interaction with how sessions with the database
// are constructed.
Self::start_transaction(&conn).await?;
conn.run_with_shared_connection(|conn| {
conn.batch_execute("SET LOCAL force_savepoint_restart = true")
})
.await?;

loop {
// Add a SAVEPOINT to which we can later return.
Self::add_retry_savepoint(&conn).await?;

let async_conn = SingleConnection(Self::as_async_conn(&conn).0.clone());
match f(async_conn).await {
Ok(value) => {
// The user-level operation succeeded: try to commit the
// transaction by RELEASE-ing the retry savepoint.
if let Err(err) = Self::commit_transaction(&conn).await {
// We're still in the transaction, but we at least
// tried to ROLLBACK to our savepoint.
smklein marked this conversation as resolved.
Show resolved Hide resolved
if !retryable_error(&err) || !retry().await {
// Bail: ROLLBACK the initial BEGIN statement too.
let _ = Self::rollback_transaction(&conn).await;
return Err(err);
}
// ROLLBACK happened, we want to retry.
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

Self::commit_transaction(&conn).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Self::commit_transaction(&conn).await?;
// Commit the top-level transaction, too.
Self::commit_transaction(&conn).await?;

The CockroachDB docs don't address what happens if this fails. I gather from the docs that by releasing the savepoint, we've already durably committed everything we've done, so even if the COMMIT fails, the operation has happened and I guess we could ignore the error altogether. (It also seems fine to not ignore the error. The only errors I could see here are communication-level ones -- e.g., a broken TCP connection. In those cases the caller will have to assume the operation may have happened.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(added the suggested comment)

Yeah, from reading https://www.cockroachlabs.com/docs/v23.1/commit-transaction , it seems like COMMIT "clears the connection to allow new transactions to begin" in the context of client-side transaction retries.

This is a really weird case -- I agree that the last RELEASE is the thing documented to actually make the transaction durable. I'm truly unsure of whether or not it makes more sense to:

  1. Propagate this error back to a user, and let them figure it out (even though the transaction has probably been made durable?). This at least causes the error to be visible.
  2. Drop the error, hoping that leaves the connection in a "known defunct" state, so it can't be used for future transactions. This admittedly would let "whoever just called last" go on to their next operation, but it feels bizarre to me.

I kinda lean towards (1) -- I think this is the case for any DB operation (not just transactions). A request could be sent, processed by the DB, and the response could be lost. If the caller sees a TCP error (or any other "unhandled" error), they may or may not have completed the operation, and so need to accept that the state of the DB is "unknown" and in need of reconciliation.

return Ok(value);
}
Err(user_error) => {
// The user-level operation failed: ROLLBACK.
smklein marked this conversation as resolved.
Show resolved Hide resolved
if let Err(first_rollback_err) = Self::rollback_transaction(&conn).await {
// If we fail while rolling back, prioritize returning
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What state are we in at this point? It seems like we failed a ROLLBACK TO SAVEPOINT and at that point. The CockroachDB docs don't say anything about errors or failures here and I'm not sure if we're still "in" the nested transaction or what.

Might we wind up returning from this function while still in the outermost transaction?
edit: having look at the other thread, I see that AnsiTransactionManager tries to deal with stuff like this:
https://docs.diesel.rs/master/src/diesel/connection/transaction_manager.rs.html#400-401

Is that behavior a problem for us? e.g., is it possible that we try to do the rollback to savepoint, but diesel winds up doing that, it fails, and then it also rolls back the BEGIN and we've lost track of what depth we're at?


This is more of a general comment and I'm not sure what we can do about this: it feels like there's some critical state here (like "transaction depth" and "are we moving forward or backward") along with some guarantees made by various Diesel functions and async-bb8-functions, plus invariants maintained by the loop, etc. But I don't really understand what they all are. I don't think this is an async-bb8-diesel problem per se. But the net result is that as a reader I find it pretty hard to understand what's true at each point and have confidence in the code's correctness.

It might help [me?] to draw a state diagram. With that in hand, I wonder if an implementation could use this (maybe with something like typestates) to be more clearly, structurally correct. This sounds like a lot of work though and if we're all satisfied that this is correct as is it may not be worth it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What state are we in at this point? It seems like we failed a ROLLBACK TO SAVEPOINT and at that point. The CockroachDB docs don't say anything about errors or failures here and I'm not sure if we're still "in" the nested transaction or what.

It depends on how the rollback_transaction method has failed.

So, to answer your original question, "what state are we in", the answer is "it depends".

  • The transaction depth after seeing this error will always be 1 from Diesel's perspective.
  • We will always tried to have issued ROLLBACK after getting into this error case.

In some situations: the transaction manager will be "broken" and we'll expect all further Diesel operations to return: https://docs.diesel.rs/master/diesel/result/enum.Error.html#variant.BrokenTransactionManager

In other situations: the ROLLBACK error could have come from cockroachdb directly.

Might we wind up returning from this function while still in the outermost transaction? edit: having look at the other thread, I see that AnsiTransactionManager tries to deal with stuff like this: https://docs.diesel.rs/master/src/diesel/connection/transaction_manager.rs.html#400-401

The Diesel interface tries really hard to ensure that the "transaction depth" is consistent in the success/failure cases -- basically, regardless of whether "rollback" or "commit" succeeds or fails, there are two outcomes:

  1. The transaction "depth" has decreased by one.
  2. The connection is permanently labelled as "broken" for unrecoverable errors (if you look in the Diesel source code, this is like a call to set_in_error). Once this happens, basically all subsequent diesel operations return a "broken transaction manager" error.

Is that behavior a problem for us? e.g., is it possible that we try to do the rollback to savepoint, but diesel winds up doing that, it fails, and then it also rolls back the BEGIN and we've lost track of what depth we're at?

I don't think so - I think this would violate the aforementioned principle of "only change the transaction depth by at most one level".

This is more of a general comment and I'm not sure what we can do about this: it feels like there's some critical state here (like "transaction depth" and "are we moving forward or backward") along with some guarantees made by various Diesel functions and async-bb8-functions, plus invariants maintained by the loop, etc. But I don't really understand what they all are. I don't think this is an async-bb8-diesel problem per se. But the net result is that as a reader I find it pretty hard to understand what's true at each point and have confidence in the code's correctness.

It might help [me?] to draw a state diagram. With that in hand, I wonder if an implementation could use this (maybe with something like typestates) to be more clearly, structurally correct. This sounds like a lot of work though and if we're all satisfied that this is correct as is it may not be worth it.

I agree that typestates could help here, but I'm not sure how much value they have in async-bb8-diesel alone -- it kinda sounds like the property you want extends into Diesel's implementation of transactions too? I have a fear of managing the "transaction depth" inside async-bb8-diesel via typestates, while a client using the API could just skip the async-bb8-diesel level and send requests directly through Diesel -- which would violate our higher-level expectations.

I started to write this up in https://docs.google.com/drawings/d/1JDU5-LsohXP7kEj_9ZFJUz64XyUXNJhZTu4J02sUYV4/edit?usp=sharing -- just trying to explain what Diesel is doing -- but go figure, it's complicated. It wasn't written as a state machine, so this is basically an expansion of code logic into state machine form -- if we want something that looks like a cleaner state machine, we'll need to re-write Diesel's TransactionManager.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a swing at making a state machine diagram for the CockroachDB client-side retry behavior. I intended to finish it and compare it with the code, but haven't been able to prioritize it. I figured I'd drop it here for posterity in case it's useful in the future. (View source to see the underlyaing mermaid source.)

flowchart TD
    %% node definitions
    idle["connection idle"]
    begin["Execute `BEGIN`"]
    savepoint_start["Execute `SAVEPOINT`"]
    user_stuff["(consumer executes SQL queries)"]
    savepoint_release["Execute `RELEASE SAVEPOINT`"]
    commit["Execute `COMMIT`"]
    retry_decision{"Should retry?"}
    rollback_savepoint["Issue `ROLLBACK TO SAVEPOINT`"]
    rollback_all["Issue `ROLLBACK` (whole txn)"]
    dead["connection dead"]

    %% happy path
    idle -->|consumer calls `transaction_async_with_retry`| begin
    begin -->|ok| savepoint_start
    savepoint_start -->|ok| user_stuff
    user_stuff -->|ok| savepoint_release
    savepoint_release -->|ok| commit
    commit -->|ok| idle

    %% retry path
    user_stuff --> |fail| retry_decision
    retry_decision --> |yes| rollback_savepoint
    rollback_savepoint --> |ok| user_stuff
    savepoint_release --> |fail| retry_decision

    %% failure path
    retry_decision --> |no| rollback_all
    rollback_all --> |ok| idle
    commit --> |fail| rollback_all
    rollback_savepoint --> |fail| rollback_all

    %% other failures
    begin --> |fail| idle
    savepoint_start -->|fail| rollback_all

    %% really bad path
    rollback_all --> |fail| dead
Loading

// the ROLLBACK error over the user errors.
return match Self::rollback_transaction(&conn).await {
Ok(()) => Err(first_rollback_err),
Err(second_rollback_err) => Err(second_rollback_err),
};
}

// ROLLBACK happened, we want to retry.
smklein marked this conversation as resolved.
Show resolved Hide resolved
if retryable_error(&user_error) && retry().await {
continue;
}

// If we aren't retrying, ROLLBACK the BEGIN statement too.
return match Self::rollback_transaction(&conn).await {
Ok(()) => Err(user_error),
Err(err) => Err(err),
};
}
}
}
}

async fn transaction_async<R, E, Func, Fut, 'a>(&'a self, f: Func) -> Result<R, E>
where
R: Send + 'static,
Expand All @@ -91,14 +248,14 @@ where
{
// Check out a connection once, and use it for the duration of the
// operation.
let conn = Arc::new(self.get_owned_connection().await);
let conn = Arc::new(self.get_owned_connection());

// This function mimics the implementation of:
// https://docs.diesel.rs/master/diesel/connection/trait.TransactionManager.html#method.transaction
//
// However, it modifies all callsites to instead issue
// known-to-be-synchronous operations from an asynchronous context.
Self::run_with_shared_connection(conn.clone(), |conn| {
conn.run_with_shared_connection(|conn| {
Conn::TransactionManager::begin_transaction(conn).map_err(E::from)
})
.await?;
Expand All @@ -118,17 +275,18 @@ where
let async_conn = SingleConnection(Self::as_async_conn(&conn).0.clone());
match f(async_conn).await {
Ok(value) => {
Self::run_with_shared_connection(conn.clone(), |conn| {
conn.run_with_shared_connection(|conn| {
Conn::TransactionManager::commit_transaction(conn).map_err(E::from)
})
.await?;
Ok(value)
}
Err(user_error) => {
match Self::run_with_shared_connection(conn.clone(), |conn| {
Conn::TransactionManager::rollback_transaction(conn).map_err(E::from)
})
.await
match conn
.run_with_shared_connection(|conn| {
Conn::TransactionManager::rollback_transaction(conn).map_err(E::from)
})
.await
{
Ok(()) => Err(user_error),
Err(err) => Err(err),
Expand Down
16 changes: 9 additions & 7 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@ where
Conn: 'static + R2D2Connection,
Connection<Conn>: crate::AsyncSimpleConnection<Conn>,
{
type OwnedConnection = Connection<Conn>;

async fn get_owned_connection(&self) -> Self::OwnedConnection {
fn get_owned_connection(&self) -> Self {
Connection(self.0.clone())
}

fn as_sync_conn(owned: &Self::OwnedConnection) -> MutexGuard<'_, Conn> {
owned.inner()
// Accesses the connection synchronously, protected by a mutex.
//
// Avoid calling from asynchronous contexts.
fn as_sync_conn(&self) -> MutexGuard<'_, Conn> {
self.inner()
}

fn as_async_conn(owned: &Self::OwnedConnection) -> &Connection<Conn> {
owned
// TODO: Consider removing me.
fn as_async_conn(&self) -> &Connection<Conn> {
self
}
}
Loading
Loading