Skip to content

Commit

Permalink
graphql: migrate graphql to use async connections (#19234)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill authored and suiwombat committed Sep 16, 2024
1 parent c6c5165 commit 99c8b7d
Show file tree
Hide file tree
Showing 24 changed files with 668 additions and 419 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono.workspace = true
clap.workspace = true
const-str.workspace = true
diesel = { workspace = true, features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
diesel-async = { workspace = true, features = ["postgres"] }
either.workspace = true
fastcrypto = { workspace = true, features = ["copy_key"] }
fastcrypto-zkp.workspace = true
Expand Down
51 changes: 34 additions & 17 deletions crates/sui-graphql-rpc/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use async_graphql::dataloader::DataLoader as AGDataLoader;
use async_trait::async_trait;
use diesel::{
query_builder::{BoxedSelectStatement, FromClause, QueryFragment, QueryId},
query_dsl::{methods::LimitDsl, LoadQuery},
query_dsl::methods::LimitDsl,
QueryResult,
};

use diesel_async::{methods::LoadQuery, scoped_futures::ScopedBoxFuture};

use crate::error::Error;

/// Database Backend in use -- abstracting a specific implementation.
Expand Down Expand Up @@ -45,17 +47,21 @@ pub(crate) type Query<ST, QS, GB> =
#[async_trait]
pub(crate) trait QueryExecutor {
type Backend: diesel::backend::Backend;
type Connection: diesel::Connection;
type Connection: diesel_async::AsyncConnection;

type DbConnection<'c>: DbConnection<Connection = Self::Connection, Backend = Self::Backend>
where
Self: 'c;

/// Execute `txn` with read committed isolation. `txn` is supplied a database connection to
/// issue queries over.
async fn execute<T, U, E>(&self, txn: T) -> Result<U, Error>
async fn execute<'c, T, U, E>(&self, txn: T) -> Result<U, Error>
where
T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
T: for<'r> FnOnce(
&'r mut Self::DbConnection<'_>,
) -> ScopedBoxFuture<'static, 'r, Result<U, E>>
+ Send
+ 'c,
E: From<diesel::result::Error> + std::error::Error,
T: Send + 'static,
U: Send + 'static,
Expand All @@ -64,44 +70,55 @@ pub(crate) trait QueryExecutor {
/// Execute `txn` with repeatable reads and no phantom reads -- multiple calls to the same query
/// should produce the same results. `txn` is supplied a database connection to issue queries
/// over.
async fn execute_repeatable<T, U, E>(&self, txn: T) -> Result<U, Error>
async fn execute_repeatable<'c, T, U, E>(&self, txn: T) -> Result<U, Error>
where
T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
T: for<'r> FnOnce(
&'r mut Self::DbConnection<'_>,
) -> ScopedBoxFuture<'static, 'r, Result<U, E>>
+ Send
+ 'c,
E: From<diesel::result::Error> + std::error::Error,
T: Send + 'static,
U: Send + 'static,
E: Send + 'static;
}

#[async_trait]
pub(crate) trait DbConnection {
type Backend: diesel::backend::Backend;
type Connection: diesel::Connection<Backend = Self::Backend>;
type Connection: diesel_async::AsyncConnection<Backend = Self::Backend>;

/// Run a query that fetches a single value. `query` is a thunk that returns a query when
/// called.
fn result<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
async fn result<T, Q, U>(&mut self, query: T) -> QueryResult<U>
where
Q: diesel::query_builder::Query,
T: Fn() -> Q + Send,
Q: diesel::query_builder::Query + Send + 'static,
Q: LoadQuery<'static, Self::Connection, U>,
Q: QueryId + QueryFragment<Self::Backend>;
Q: QueryId + QueryFragment<Self::Backend>,
U: Send;

/// Run a query that fetches multiple values. `query` is a thunk that returns a query when
/// called.
fn results<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<Vec<U>>
async fn results<T, Q, U>(&mut self, query: T) -> QueryResult<Vec<U>>
where
Q: diesel::query_builder::Query,
T: Fn() -> Q + Send,
Q: diesel::query_builder::Query + Send + 'static,
Q: LoadQuery<'static, Self::Connection, U>,
Q: QueryId + QueryFragment<Self::Backend>;
Q: QueryId + QueryFragment<Self::Backend>,
U: Send;

/// Helper to limit a query that fetches multiple values to return only its first value. `query`
/// is a thunk that returns a query when called.
fn first<Q: LimitDsl, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
async fn first<T, Q: LimitDsl, U>(&mut self, query: T) -> QueryResult<U>
where
<Q as LimitDsl>::Output: diesel::query_builder::Query,
T: Fn() -> Q + Send,
<Q as LimitDsl>::Output: diesel::query_builder::Query + Send,
<Q as LimitDsl>::Output: LoadQuery<'static, Self::Connection, U>,
<Q as LimitDsl>::Output: QueryId + QueryFragment<Self::Backend>,
<Q as LimitDsl>::Output: QueryId + QueryFragment<Self::Backend> + 'static,
U: Send,
{
self.result(move || query().limit(1i64))
self.result(move || query().limit(1i64)).await
}
}

Expand Down
11 changes: 8 additions & 3 deletions crates/sui-graphql-rpc/src/data/package_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use async_graphql::dataloader::Loader;
use async_trait::async_trait;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::scoped_futures::ScopedFutureExt;
use move_core_types::account_address::AccountAddress;
use sui_indexer::models::packages::StoredPackage;
use sui_indexer::schema::packages;
Expand Down Expand Up @@ -59,9 +60,13 @@ impl Loader<PackageKey> for Db {
let ids: BTreeSet<_> = keys.iter().map(|PackageKey(id)| id.to_vec()).collect();
let stored_packages: Vec<StoredPackage> = self
.execute(move |conn| {
conn.results(move || {
dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned()))
})
async move {
conn.results(move || {
dsl::packages.filter(dsl::package_id.eq_any(ids.iter().cloned()))
})
.await
}
.scope_boxed()
})
.await
.map_err(|e| PackageResolverError::Store {
Expand Down
Loading

0 comments on commit 99c8b7d

Please sign in to comment.