Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove execution for connection manager #52

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions examples/usage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSaveChangesDsl, ConnectionError, OptionalExtension,
PoolError,
};
use diesel::{pg::PgConnection, prelude::*};

Expand Down Expand Up @@ -29,15 +28,15 @@ pub struct UserUpdate<'a> {
#[derive(thiserror::Error, Debug)]
enum MyError {
#[error("DB error")]
Db(#[from] PoolError),
Db(#[from] ConnectionError),

#[error("Custom transaction error")]
Other,
}

impl From<diesel::result::Error> for MyError {
fn from(error: diesel::result::Error) -> Self {
MyError::Db(PoolError::Connection(ConnectionError::Query(error)))
MyError::Db(ConnectionError::Query(error))
}
}

Expand All @@ -47,11 +46,12 @@ async fn main() {

let manager = async_bb8_diesel::ConnectionManager::<PgConnection>::new("localhost:1234");
let pool = bb8::Pool::builder().build(manager).await.unwrap();
let conn = pool.get().await.unwrap();

// Insert by values
let _ = diesel::insert_into(dsl::users)
.values((dsl::id.eq(0), dsl::name.eq("Jim")))
.execute_async(&pool)
.execute_async(&*conn)
.await
.unwrap();

Expand All @@ -61,34 +61,34 @@ async fn main() {
id: 0,
name: "Jim".to_string(),
})
.execute_async(&pool)
.execute_async(&*conn)
.await
.unwrap();

// Load
let _ = dsl::users.get_result_async::<User>(&pool).await.unwrap();
let _ = dsl::users.get_result_async::<User>(&*conn).await.unwrap();

// Update
let _ = diesel::update(dsl::users)
.filter(dsl::id.eq(0))
.set(dsl::name.eq("Jim, But Different"))
.execute_async(&pool)
.execute_async(&*conn)
.await
.unwrap();

// Update via save_changes
let update = &UserUpdate { id: 0, name: "Jim" };
let _ = update.save_changes_async::<User>(&pool).await.unwrap();
let _ = update.save_changes_async::<User>(&*conn).await.unwrap();

// Delete
let _ = diesel::delete(dsl::users)
.filter(dsl::id.eq(0))
.execute_async(&pool)
.execute_async(&*conn)
.await
.unwrap();

// Transaction with multiple operations
pool.transaction(|conn| {
conn.transaction(|conn| {
diesel::insert_into(dsl::users)
.values((dsl::id.eq(0), dsl::name.eq("Jim")))
.execute(conn)
Expand All @@ -97,35 +97,34 @@ async fn main() {
.values((dsl::id.eq(1), dsl::name.eq("Another Jim")))
.execute(conn)
.unwrap();
Ok::<(), PoolError>(())
Ok::<(), ConnectionError>(())
})
.await
.unwrap();

// Transaction returning custom error types.
let _: MyError = pool
let _: MyError = conn
.transaction(|_| {
return Err::<(), MyError>(MyError::Other {});
})
.await
.unwrap_err();

// Asynchronous transaction.
pool.transaction_async(|conn| async move {
conn.transaction_async(|conn| async move {
diesel::update(dsl::users)
.filter(dsl::id.eq(0))
.set(dsl::name.eq("Let's change the name again"))
.execute_async(&conn)
.await
.map_err(|e| PoolError::Connection(e))
})
.await
.unwrap();

// Access the result via OptionalExtension
assert!(dsl::users
.filter(dsl::id.eq(12345))
.first_async::<User>(&pool)
.first_async::<User>(&*conn)
.await
.optional()
.unwrap()
Expand Down
52 changes: 2 additions & 50 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! An async-safe connection pool for Diesel.

use crate::{Connection, ConnectionError, PoolError, PoolResult};
use crate::{Connection, ConnectionError};
use async_trait::async_trait;
use diesel::r2d2::{self, ManageConnection, R2D2Connection};
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::task;
use std::sync::{Arc, Mutex};

/// A connection manager which implements [`bb8::ManageConnection`] to
/// integrate with bb8.
Expand All @@ -28,19 +27,11 @@ use tokio::task;
/// let mgr = async_bb8_diesel::ConnectionManager::<PgConnection>::new("localhost:1234");
/// let pool = bb8::Pool::builder().build(mgr).await.unwrap();
///
/// // You can acquire connections to the pool manually...
/// diesel::insert_into(dsl::users)
/// .values(dsl::id.eq(1337))
/// .execute_async(&*pool.get().await.unwrap())
/// .await
/// .unwrap();
///
/// // ... Or just issue them to the pool directly.
/// diesel::insert_into(dsl::users)
/// .values(dsl::id.eq(1337))
/// .execute_async(&pool)
/// .await
/// .unwrap();
/// }
/// ```
#[derive(Clone)]
Expand Down Expand Up @@ -99,42 +90,3 @@ where
false
}
}

#[async_trait]
impl<Conn> crate::AsyncSimpleConnection<Conn, PoolError> for bb8::Pool<ConnectionManager<Conn>>
where
Conn: 'static + R2D2Connection,
{
#[inline]
async fn batch_execute_async(&self, query: &str) -> PoolResult<()> {
let self_ = self.clone();
let query = query.to_string();
let conn = self_.get_owned().await.map_err(PoolError::from)?;
task::spawn_blocking(move || conn.inner().batch_execute(&query))
.await
.unwrap() // Propagate panics
.map_err(PoolError::from)
}
}

#[async_trait]
impl<Conn> crate::AsyncConnection<Conn, PoolError> for bb8::Pool<ConnectionManager<Conn>>
where
Conn: 'static + R2D2Connection,
bb8::Pool<ConnectionManager<Conn>>: crate::AsyncSimpleConnection<Conn, PoolError>,
{
type OwnedConnection = bb8::PooledConnection<'static, ConnectionManager<Conn>>;

async fn get_owned_connection(&self) -> Result<Self::OwnedConnection, PoolError> {
let self_ = self.clone();
Ok(self_.get_owned().await.map_err(PoolError::from)?)
}

fn as_sync_conn(owned: &Self::OwnedConnection) -> MutexGuard<'_, Conn> {
owned.inner()
}

fn as_async_conn(owned: &Self::OwnedConnection) -> &crate::connection::Connection<Conn> {
&*owned
}
}
10 changes: 5 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ pub type PoolResult<R> = Result<R, PoolError>;

/// Async variant of [diesel::prelude::OptionalExtension].
pub trait OptionalExtension<T> {
fn optional(self) -> Result<Option<T>, PoolError>;
fn optional(self) -> Result<Option<T>, ConnectionError>;
}

impl<T> OptionalExtension<T> for PoolResult<T> {
fn optional(self) -> Result<Option<T>, PoolError> {
impl<T> OptionalExtension<T> for Result<T, ConnectionError> {
fn optional(self) -> Result<Option<T>, ConnectionError> {
let self_as_query_result: diesel::QueryResult<T> = match self {
Ok(value) => Ok(value),
Err(PoolError::Connection(ConnectionError::Query(error_kind))) => Err(error_kind),
Err(ConnectionError::Query(error_kind)) => Err(error_kind),
Err(e) => return Err(e),
};

self_as_query_result
.optional()
.map_err(|e| PoolError::Connection(ConnectionError::Query(e)))
.map_err(|e| ConnectionError::Query(e))
}
}

Expand Down