From 6b94673dd9869955342a46468c61160f392bdc0e Mon Sep 17 00:00:00 2001 From: Marco Napetti Date: Thu, 9 Dec 2021 15:12:41 +0100 Subject: [PATCH 1/5] First metric and tracing implementation --- Cargo.toml | 11 ++-- src/database/db_connection.rs | 8 +++ src/database/mock.rs | 6 ++ src/database/mod.rs | 2 + src/database/stream/query.rs | 92 ++++++++++++++++++++---------- src/database/stream/transaction.rs | 45 ++++++++++++--- src/database/transaction.rs | 45 ++++++++++++++- src/driver/mock.rs | 9 +++ src/driver/sqlx_mysql.rs | 41 ++++++++++++- src/driver/sqlx_postgres.rs | 41 ++++++++++++- src/driver/sqlx_sqlite.rs | 41 ++++++++++++- src/lib.rs | 2 + src/metric.rs | 28 +++++++++ src/util.rs | 2 +- 14 files changed, 317 insertions(+), 56 deletions(-) create mode 100644 src/metric.rs diff --git a/Cargo.toml b/Cargo.toml index 47f075eb8..240bd053e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ async-trait = { version = "^0.1" } chrono = { version = "^0", optional = true } futures = { version = "^0.3" } futures-util = { version = "^0.3" } -log = { version = "^0.4", optional = true } +tracing = "0.1" rust_decimal = { version = "^1", optional = true } sea-orm-macros = { version = "^0.4.1", path = "sea-orm-macros", optional = true } sea-query = { version = "^0.19.1", features = ["thread-safe"] } @@ -36,8 +36,9 @@ serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1", optional = true } sqlx = { version = "^0.5", optional = true } uuid = { version = "0.8", features = ["serde", "v4"], optional = true } -ouroboros = "0.11" +ouroboros = "0.14" url = "^2.2" +once_cell = "1.8" [dev-dependencies] smol = { version = "^1.2" } @@ -47,12 +48,12 @@ tokio = { version = "^1.6", features = ["full"] } actix-rt = { version = "2.2.0" } maplit = { version = "^1" } rust_decimal_macros = { version = "^1" } -env_logger = { version = "^0.9" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } sea-orm = { path = ".", features = ["debug-print"] } pretty_assertions = { version = "^0.7" } [features] -debug-print = ["log"] +debug-print = [] default = [ "macros", "mock", @@ -60,6 +61,8 @@ default = [ "with-chrono", "with-rust_decimal", "with-uuid", + "runtime-tokio-rustls",#debug + "sqlx-all",#debug ] macros = ["sea-orm-macros"] mock = [] diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 99de8633f..b9249d998 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -3,6 +3,7 @@ use crate::{ StatementBuilder, TransactionError, }; use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, QueryBuilder, SqliteQueryBuilder}; +use tracing::instrument; use std::{future::Future, pin::Pin}; use url::Url; @@ -49,6 +50,7 @@ pub enum DatabaseBackend { /// The same as [DatabaseBackend] just shorter :) pub type DbBackend = DatabaseBackend; +#[derive(Debug)] pub(crate) enum InnerConnection { #[cfg(feature = "sqlx-mysql")] MySql(PoolConnection), @@ -104,6 +106,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { } } + #[instrument(level = "trace")] async fn execute(&self, stmt: Statement) -> Result { match self { #[cfg(feature = "sqlx-mysql")] @@ -118,6 +121,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { } } + #[instrument(level = "trace")] async fn query_one(&self, stmt: Statement) -> Result, DbErr> { match self { #[cfg(feature = "sqlx-mysql")] @@ -132,6 +136,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { } } + #[instrument(level = "trace")] async fn query_all(&self, stmt: Statement) -> Result, DbErr> { match self { #[cfg(feature = "sqlx-mysql")] @@ -146,6 +151,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { } } + #[instrument(level = "trace")] fn stream( &'a self, stmt: Statement, @@ -167,6 +173,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { }) } + #[instrument(level = "trace")] async fn begin(&self) -> Result { match self { #[cfg(feature = "sqlx-mysql")] @@ -185,6 +192,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { /// Execute the function inside a transaction. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. + #[instrument(level = "trace", skip(_callback))] async fn transaction(&self, _callback: F) -> Result> where F: for<'c> FnOnce( diff --git a/src/database/mock.rs b/src/database/mock.rs index 74ffd8bf6..b3ae74fc5 100644 --- a/src/database/mock.rs +++ b/src/database/mock.rs @@ -4,6 +4,7 @@ use crate::{ Statement, }; use sea_query::{Value, ValueType, Values}; +use tracing::instrument; use std::{collections::BTreeMap, sync::Arc}; /// Defines a Mock database suitable for testing @@ -89,6 +90,7 @@ impl MockDatabase { } impl MockDatabaseTrait for MockDatabase { + #[instrument(level = "trace")] fn execute(&mut self, counter: usize, statement: Statement) -> Result { if let Some(transaction) = &mut self.transaction { transaction.push(statement); @@ -104,6 +106,7 @@ impl MockDatabaseTrait for MockDatabase { } } + #[instrument(level = "trace")] fn query(&mut self, counter: usize, statement: Statement) -> Result, DbErr> { if let Some(transaction) = &mut self.transaction { transaction.push(statement); @@ -122,6 +125,7 @@ impl MockDatabaseTrait for MockDatabase { } } + #[instrument(level = "trace")] fn begin(&mut self) { if self.transaction.is_some() { self.transaction @@ -133,6 +137,7 @@ impl MockDatabaseTrait for MockDatabase { } } + #[instrument(level = "trace")] fn commit(&mut self) { if self.transaction.is_some() { if self.transaction.as_mut().unwrap().commit(self.db_backend) { @@ -144,6 +149,7 @@ impl MockDatabaseTrait for MockDatabase { } } + #[instrument(level = "trace")] fn rollback(&mut self) { if self.transaction.is_some() { if self.transaction.as_mut().unwrap().rollback(self.db_backend) { diff --git a/src/database/mod.rs b/src/database/mod.rs index 28bdbb78c..1a9556c7f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -14,6 +14,7 @@ pub use db_connection::*; pub use mock::*; pub use statement::*; pub use stream::*; +use tracing::instrument; pub use transaction::*; use crate::DbErr; @@ -42,6 +43,7 @@ pub struct ConnectOptions { impl Database { /// Method to create a [DatabaseConnection] on a database + #[instrument(level = "trace", skip(opt))] pub async fn connect(opt: C) -> Result where C: Into, diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index c2c88d0de..144a6b83e 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -12,6 +12,8 @@ use futures::TryStreamExt; #[cfg(feature = "sqlx-dep")] use sqlx::{pool::PoolConnection, Executor}; +use tracing::instrument; + use crate::{DbErr, InnerConnection, QueryResult, Statement}; /// Creates a stream from a [QueryResult] @@ -59,41 +61,71 @@ impl std::fmt::Debug for QueryStream { } impl QueryStream { + #[instrument(level = "trace")] fn build(stmt: Statement, conn: InnerConnection) -> QueryStream { QueryStreamBuilder { stmt, conn, - stream_builder: |conn, stmt| match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) + stream_builder: |conn, stmt| { + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + let query = crate::driver::sqlx_mysql::sqlx_query(stmt); + let _start = std::time::SystemTime::now(); + let res = Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + let query = crate::driver::sqlx_postgres::sqlx_query(stmt); + let _start = std::time::SystemTime::now(); + let res = Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); + let _start = std::time::SystemTime::now(); + let res = Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res + } + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => c.fetch(stmt), } - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => c.fetch(stmt), - }, + } } .build() } diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index 77a598192..d2643f63e 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -11,6 +11,8 @@ use sqlx::Executor; use futures::lock::MutexGuard; +use tracing::instrument; + use crate::{DbErr, InnerConnection, QueryResult, Statement}; /// `TransactionStream` cannot be used in a `transaction` closure as it does not impl `Send`. @@ -31,6 +33,7 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> { } impl<'a> TransactionStream<'a> { + #[instrument(level = "trace")] pub(crate) async fn build( conn: MutexGuard<'a, InnerConnection>, stmt: Statement, @@ -44,32 +47,56 @@ impl<'a> TransactionStream<'a> { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => { let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - Box::pin( + let _start = std::time::SystemTime::now(); + let res = Box::pin( c.fetch(query) .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> + ) as Pin>>>; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(c) => { let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - Box::pin( + let _start = std::time::SystemTime::now(); + let res = Box::pin( c.fetch(query) .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> + ) as Pin>>>; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(c) => { let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - Box::pin( + let _start = std::time::SystemTime::now(); + let res = Box::pin( c.fetch(query) .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), - ) - as Pin>>> + ) as Pin>>>; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: stmt, + }; + callback(&info); + } + res } #[cfg(feature = "mock")] InnerConnection::Mock(c) => c.fetch(stmt), diff --git a/src/database/transaction.rs b/src/database/transaction.rs index f4a1b6787..e26067db6 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -7,6 +7,7 @@ use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err}; use futures::lock::Mutex; #[cfg(feature = "sqlx-dep")] use sqlx::{pool::PoolConnection, TransactionManager}; +use tracing::instrument; use std::{future::Future, pin::Pin, sync::Arc}; // a Transaction is just a sugar for a connection where START TRANSACTION has been executed @@ -66,6 +67,7 @@ impl DatabaseTransaction { Self::begin(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await } + #[instrument(level = "trace")] async fn begin( conn: Arc>, backend: DbBackend, @@ -104,6 +106,7 @@ impl DatabaseTransaction { /// Runs a transaction to completion returning an rolling back the transaction on /// encountering an error if it fails + #[instrument(level = "trace", skip(callback))] pub(crate) async fn run(self, callback: F) -> Result> where F: for<'b> FnOnce( @@ -125,6 +128,7 @@ impl DatabaseTransaction { } /// Commit a transaction atomically + #[instrument(level = "trace")] pub async fn commit(mut self) -> Result<(), DbErr> { self.open = false; match *self.conn.lock().await { @@ -155,6 +159,7 @@ impl DatabaseTransaction { } /// rolls back a transaction in case error are encountered during the operation + #[instrument(level = "trace")] pub async fn rollback(mut self) -> Result<(), DbErr> { self.open = false; match *self.conn.lock().await { @@ -185,6 +190,7 @@ impl DatabaseTransaction { } // the rollback is queued and will be performed on next async operation, like returning the connection to the pool + #[instrument(level = "trace")] fn start_rollback(&mut self) { if self.open { if let Some(mut conn) = self.conn.try_lock() { @@ -229,6 +235,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { self.backend } + #[instrument(level = "trace")] async fn execute(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); @@ -236,17 +243,44 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(conn) => { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - query.execute(conn).await.map(Into::into) + let _start = std::time::SystemTime::now(); + let res = query.execute(conn).await.map(Into::into); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); + } + res } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(conn) => { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - query.execute(conn).await.map(Into::into) + let _start = std::time::SystemTime::now(); + let res = query.execute(conn).await.map(Into::into); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); + } + res } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(conn) => { let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - query.execute(conn).await.map(Into::into) + let _start = std::time::SystemTime::now(); + let res = query.execute(conn).await.map(Into::into); + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); + } + res } #[cfg(feature = "mock")] InnerConnection::Mock(conn) => return conn.execute(stmt), @@ -255,6 +289,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { _res.map_err(sqlx_error_to_exec_err) } + #[instrument(level = "trace")] async fn query_one(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); @@ -285,6 +320,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { } } + #[instrument(level = "trace")] async fn query_all(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); @@ -320,6 +356,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { _res.map_err(sqlx_error_to_query_err) } + #[instrument(level = "trace")] fn stream( &'a self, stmt: Statement, @@ -329,12 +366,14 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { ) } + #[instrument(level = "trace")] async fn begin(&self) -> Result { DatabaseTransaction::begin(Arc::clone(&self.conn), self.backend).await } /// Execute the function inside a transaction. /// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed. + #[instrument(level = "trace", skip(_callback))] async fn transaction(&self, _callback: F) -> Result> where F: for<'c> FnOnce( diff --git a/src/driver/mock.rs b/src/driver/mock.rs index cdded50c8..12f3cc5c4 100644 --- a/src/driver/mock.rs +++ b/src/driver/mock.rs @@ -3,6 +3,7 @@ use crate::{ Statement, Transaction, }; use futures::Stream; +use tracing::instrument; use std::{ fmt::Debug, pin::Pin, @@ -69,6 +70,7 @@ impl MockDatabaseConnector { /// Cpnnect to the [MockDatabase] #[allow(unused_variables)] + #[instrument(level = "trace")] pub async fn connect(string: &str) -> Result { macro_rules! connect_mock_db { ( $syntax: expr ) => { @@ -117,6 +119,7 @@ impl MockDatabaseConnection { } /// Execute the SQL statement in the [MockDatabase] + #[instrument(level = "trace")] pub fn execute(&self, statement: Statement) -> Result { debug_print!("{}", statement); let counter = self.execute_counter.fetch_add(1, Ordering::SeqCst); @@ -124,6 +127,7 @@ impl MockDatabaseConnection { } /// Return one [QueryResult] if the query was successful + #[instrument(level = "trace")] pub fn query_one(&self, statement: Statement) -> Result, DbErr> { debug_print!("{}", statement); let counter = self.query_counter.fetch_add(1, Ordering::SeqCst); @@ -132,6 +136,7 @@ impl MockDatabaseConnection { } /// Return all [QueryResult]s if the query was successful + #[instrument(level = "trace")] pub fn query_all(&self, statement: Statement) -> Result, DbErr> { debug_print!("{}", statement); let counter = self.query_counter.fetch_add(1, Ordering::SeqCst); @@ -139,6 +144,7 @@ impl MockDatabaseConnection { } /// Return [QueryResult]s from a multi-query operation + #[instrument(level = "trace")] pub fn fetch( &self, statement: &Statement, @@ -150,16 +156,19 @@ impl MockDatabaseConnection { } /// Create a statement block of SQL statements that execute together. + #[instrument(level = "trace")] pub fn begin(&self) { self.mocker.lock().unwrap().begin() } /// Commit a transaction atomically to the database + #[instrument(level = "trace")] pub fn commit(&self) { self.mocker.lock().unwrap().commit() } /// Roll back a faulty transaction + #[instrument(level = "trace")] pub fn rollback(&self) { self.mocker.lock().unwrap().rollback() } diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 3f9936e4d..2b2efd688 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -7,6 +7,7 @@ use sqlx::{ sea_query::sea_query_driver_mysql!(); use sea_query_driver_mysql::bind_query; +use tracing::instrument; use crate::{ debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, @@ -32,6 +33,7 @@ impl SqlxMySqlConnector { } /// Add configuration options for the MySQL database + #[instrument(level = "trace")] pub async fn connect(options: ConnectOptions) -> Result { let mut opt = options .url @@ -59,15 +61,25 @@ impl SqlxMySqlConnector { impl SqlxMySqlPoolConnection { /// Execute a [Statement] on a MySQL backend + #[instrument(level = "trace")] pub async fn execute(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.execute(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.execute(conn).await { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -76,18 +88,28 @@ impl SqlxMySqlPoolConnection { } /// Get one result from a SQL query. Returns [Option::None] if no match was found + #[instrument(level = "trace")] pub async fn query_one(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_one(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_one(conn).await { Ok(row) => Ok(Some(row.into())), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), _ => Err(DbErr::Query(err.to_string())), }, + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -96,15 +118,25 @@ impl SqlxMySqlPoolConnection { } /// Get the results of a query returning them as a Vec<[QueryResult]> + #[instrument(level = "trace")] pub async fn query_all(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_all(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_all(conn).await { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -113,6 +145,7 @@ impl SqlxMySqlPoolConnection { } /// Stream the results of executing a SQL query + #[instrument(level = "trace")] pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); @@ -126,6 +159,7 @@ impl SqlxMySqlPoolConnection { } /// Bundle a set of SQL statements that execute together. + #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { DatabaseTransaction::new_mysql(conn).await @@ -137,6 +171,7 @@ impl SqlxMySqlPoolConnection { } /// Create a MySQL transaction + #[instrument(level = "trace", skip(callback))] pub async fn transaction(&self, callback: F) -> Result> where F: for<'b> FnOnce( diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index 3abebe318..121e4e72d 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -7,6 +7,7 @@ use sqlx::{ sea_query::sea_query_driver_postgres!(); use sea_query_driver_postgres::bind_query; +use tracing::instrument; use crate::{ debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, @@ -32,6 +33,7 @@ impl SqlxPostgresConnector { } /// Add configuration options for the MySQL database + #[instrument(level = "trace")] pub async fn connect(options: ConnectOptions) -> Result { let mut opt = options .url @@ -59,15 +61,25 @@ impl SqlxPostgresConnector { impl SqlxPostgresPoolConnection { /// Execute a [Statement] on a PostgreSQL backend + #[instrument(level = "trace")] pub async fn execute(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.execute(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.execute(conn).await { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -76,18 +88,28 @@ impl SqlxPostgresPoolConnection { } /// Get one result from a SQL query. Returns [Option::None] if no match was found + #[instrument(level = "trace")] pub async fn query_one(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_one(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_one(conn).await { Ok(row) => Ok(Some(row.into())), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), _ => Err(DbErr::Query(err.to_string())), }, + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -96,15 +118,25 @@ impl SqlxPostgresPoolConnection { } /// Get the results of a query returning them as a Vec<[QueryResult]> + #[instrument(level = "trace")] pub async fn query_all(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_all(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_all(conn).await { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -113,6 +145,7 @@ impl SqlxPostgresPoolConnection { } /// Stream the results of executing a SQL query + #[instrument(level = "trace")] pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); @@ -126,6 +159,7 @@ impl SqlxPostgresPoolConnection { } /// Bundle a set of SQL statements that execute together. + #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { DatabaseTransaction::new_postgres(conn).await @@ -137,6 +171,7 @@ impl SqlxPostgresPoolConnection { } /// Create a PostgreSQL transaction + #[instrument(level = "trace", skip(callback))] pub async fn transaction(&self, callback: F) -> Result> where F: for<'b> FnOnce( diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index 255686d2b..9cfd9a723 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -7,6 +7,7 @@ use sqlx::{ sea_query::sea_query_driver_sqlite!(); use sea_query_driver_sqlite::bind_query; +use tracing::instrument; use crate::{ debug_print, error::*, executor::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, @@ -32,6 +33,7 @@ impl SqlxSqliteConnector { } /// Add configuration options for the SQLite database + #[instrument(level = "trace")] pub async fn connect(options: ConnectOptions) -> Result { let mut options = options; let mut opt = options @@ -63,15 +65,25 @@ impl SqlxSqliteConnector { impl SqlxSqlitePoolConnection { /// Execute a [Statement] on a SQLite backend + #[instrument(level = "trace")] pub async fn execute(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.execute(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.execute(conn).await { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -80,18 +92,28 @@ impl SqlxSqlitePoolConnection { } /// Get one result from a SQL query. Returns [Option::None] if no match was found + #[instrument(level = "trace")] pub async fn query_one(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_one(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_one(conn).await { Ok(row) => Ok(Some(row.into())), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), _ => Err(DbErr::Query(err.to_string())), }, + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -100,15 +122,25 @@ impl SqlxSqlitePoolConnection { } /// Get the results of a query returning them as a Vec<[QueryResult]> + #[instrument(level = "trace")] pub async fn query_all(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - match query.fetch_all(conn).await { + let _start = std::time::SystemTime::now(); + let res = match query.fetch_all(conn).await { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), + }; + if let Some(callback) = crate::metric::get_callback() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: &stmt, + }; + callback(&info); } + res } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -117,6 +149,7 @@ impl SqlxSqlitePoolConnection { } /// Stream the results of executing a SQL query + #[instrument(level = "trace")] pub async fn stream(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); @@ -130,6 +163,7 @@ impl SqlxSqlitePoolConnection { } /// Bundle a set of SQL statements that execute together. + #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { DatabaseTransaction::new_sqlite(conn).await @@ -141,6 +175,7 @@ impl SqlxSqlitePoolConnection { } /// Create a MySQL transaction + #[instrument(level = "trace", skip(callback))] pub async fn transaction(&self, callback: F) -> Result> where F: for<'b> FnOnce( diff --git a/src/lib.rs b/src/lib.rs index 469619f03..5b6787dc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -284,6 +284,8 @@ pub mod schema; #[cfg(feature = "macros")] pub mod tests_cfg; mod util; +/// Holds types and methods to perform metric collection +pub mod metric; pub use database::*; pub use driver::*; diff --git a/src/metric.rs b/src/metric.rs new file mode 100644 index 000000000..4eab0e749 --- /dev/null +++ b/src/metric.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use once_cell::sync::OnceCell; + +type Callback = Box) + Send + Sync>; + +static METRIC: OnceCell = OnceCell::new(); + +#[derive(Debug)] +/// Query execution infos +pub struct Info<'a> { + /// Query executiuon duration + pub elapsed: Duration, + /// Query data + pub statement: &'a crate::Statement, +} + +/// Sets a new metric callback, returning it if already set +pub fn set_callback(callback: F) -> Result<(), Callback> +where + F: Fn(&Info<'_>) + Send + Sync + 'static, +{ + METRIC.set(Box::new(callback)) +} + +pub(crate) fn get_callback() -> Option<&'static Callback> { + METRIC.get() +} diff --git a/src/util.rs b/src/util.rs index 9ffa1e4c2..f86140995 100644 --- a/src/util.rs +++ b/src/util.rs @@ -15,7 +15,7 @@ #[macro_export] #[cfg(feature = "debug-print")] macro_rules! debug_print { - ($( $args:expr ),*) => { log::debug!( $( $args ),* ); } + ($( $args:expr ),*) => { tracing::debug!( $( $args ),* ); } } #[macro_export] From 8ad9781a3f8b4285719bc2b012fc9ed9c7272d47 Mon Sep 17 00:00:00 2001 From: Marco Napetti Date: Thu, 9 Dec 2021 16:20:53 +0100 Subject: [PATCH 2/5] Remove debug features --- Cargo.toml | 2 -- sea-orm-macros/src/lib.rs | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 240bd053e..7b3bcdfba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,8 +61,6 @@ default = [ "with-chrono", "with-rust_decimal", "with-uuid", - "runtime-tokio-rustls",#debug - "sqlx-all",#debug ] macros = ["sea-orm-macros"] mock = [] diff --git a/sea-orm-macros/src/lib.rs b/sea-orm-macros/src/lib.rs index 00540aa4a..7fc4b3d03 100644 --- a/sea-orm-macros/src/lib.rs +++ b/sea-orm-macros/src/lib.rs @@ -596,9 +596,8 @@ pub fn test(_: TokenStream, input: TokenStream) -> TokenStream { #[test] #(#attrs)* fn #name() #ret { - let _ = ::env_logger::builder() - .filter_level(::log::LevelFilter::Debug) - .is_test(true) + let _ = ::tracing_subscriber::fmt() + .with_max_level(::tracing::Level::DEBUG) .try_init(); crate::block_on!(async { #body }) } From 4abe8310a57e5cbf7a52e9e32b614cd0aad48fd5 Mon Sep 17 00:00:00 2001 From: Marco Napetti Date: Thu, 9 Dec 2021 19:03:46 +0100 Subject: [PATCH 3/5] Per-connection metric --- src/database/db_connection.rs | 24 +++++++++++++++--- src/database/stream/query.rs | 40 ++++++++++++++++-------------- src/database/stream/transaction.rs | 15 ++++++----- src/database/transaction.rs | 32 ++++++++++++++++++------ src/driver/sqlx_mysql.rs | 32 +++++++++++++++++------- src/driver/sqlx_postgres.rs | 32 +++++++++++++++++------- src/driver/sqlx_sqlite.rs | 32 +++++++++++++++++------- src/metric.rs | 20 ++------------- 8 files changed, 147 insertions(+), 80 deletions(-) diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index b9249d998..3a090b6f1 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -166,7 +166,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.stream(stmt).await?, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { - crate::QueryStream::from((Arc::clone(conn), stmt)) + crate::QueryStream::from((Arc::clone(conn), stmt, None)) } DatabaseConnection::Disconnected => panic!("Disconnected"), }) @@ -184,7 +184,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.begin().await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { - DatabaseTransaction::new_mock(Arc::clone(conn)).await + DatabaseTransaction::new_mock(Arc::clone(conn), None).await } DatabaseConnection::Disconnected => panic!("Disconnected"), } @@ -213,7 +213,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.transaction(_callback).await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => { - let transaction = DatabaseTransaction::new_mock(Arc::clone(conn)) + let transaction = DatabaseTransaction::new_mock(Arc::clone(conn), None) .await .map_err(TransactionError::Connection)?; transaction.run(_callback).await @@ -245,6 +245,24 @@ impl DatabaseConnection { } } +impl DatabaseConnection { + /// Sets a callback to metric this connection + pub fn set_metric_callback(&mut self, callback: F) + where + F: Into, + { + match self { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnection::SqlxMySqlPoolConnection(conn) => conn.set_metric_callback(callback), + #[cfg(feature = "sqlx-postgres")] + DatabaseConnection::SqlxPostgresPoolConnection(conn) => conn.set_metric_callback(callback), + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.set_metric_callback(callback), + _ => {}, + } + } +} + impl DbBackend { /// Check if the URI is the same as the specified database backend. /// Returns true if they match. diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index 144a6b83e..2147a8dc6 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -21,36 +21,37 @@ use crate::{DbErr, InnerConnection, QueryResult, Statement}; pub struct QueryStream { stmt: Statement, conn: InnerConnection, - #[borrows(mut conn, stmt)] + metric_callback: Option, + #[borrows(mut conn, stmt, metric_callback)] #[not_covariant] stream: Pin> + 'this>>, } #[cfg(feature = "sqlx-mysql")] -impl From<(PoolConnection, Statement)> for QueryStream { - fn from((conn, stmt): (PoolConnection, Statement)) -> Self { - QueryStream::build(stmt, InnerConnection::MySql(conn)) +impl From<(PoolConnection, Statement, Option)> for QueryStream { + fn from((conn, stmt, metric_callback): (PoolConnection, Statement, Option)) -> Self { + QueryStream::build(stmt, InnerConnection::MySql(conn), metric_callback) } } #[cfg(feature = "sqlx-postgres")] -impl From<(PoolConnection, Statement)> for QueryStream { - fn from((conn, stmt): (PoolConnection, Statement)) -> Self { - QueryStream::build(stmt, InnerConnection::Postgres(conn)) +impl From<(PoolConnection, Statement, Option)> for QueryStream { + fn from((conn, stmt, metric_callback): (PoolConnection, Statement, Option)) -> Self { + QueryStream::build(stmt, InnerConnection::Postgres(conn), metric_callback) } } #[cfg(feature = "sqlx-sqlite")] -impl From<(PoolConnection, Statement)> for QueryStream { - fn from((conn, stmt): (PoolConnection, Statement)) -> Self { - QueryStream::build(stmt, InnerConnection::Sqlite(conn)) +impl From<(PoolConnection, Statement, Option)> for QueryStream { + fn from((conn, stmt, metric_callback): (PoolConnection, Statement, Option)) -> Self { + QueryStream::build(stmt, InnerConnection::Sqlite(conn), metric_callback) } } #[cfg(feature = "mock")] -impl From<(Arc, Statement)> for QueryStream { - fn from((conn, stmt): (Arc, Statement)) -> Self { - QueryStream::build(stmt, InnerConnection::Mock(conn)) +impl From<(Arc, Statement, Option)> for QueryStream { + fn from((conn, stmt, metric_callback): (Arc, Statement, Option)) -> Self { + QueryStream::build(stmt, InnerConnection::Mock(conn), metric_callback) } } @@ -61,12 +62,13 @@ impl std::fmt::Debug for QueryStream { } impl QueryStream { - #[instrument(level = "trace")] - fn build(stmt: Statement, conn: InnerConnection) -> QueryStream { + #[instrument(level = "trace", skip(metric_callback))] + fn build(stmt: Statement, conn: InnerConnection, metric_callback: Option) -> QueryStream { QueryStreamBuilder { stmt, conn, - stream_builder: |conn, stmt| { + metric_callback, + stream_builder: |conn, stmt, metric_callback| { match conn { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => { @@ -77,7 +79,7 @@ impl QueryStream { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, @@ -95,7 +97,7 @@ impl QueryStream { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, @@ -113,7 +115,7 @@ impl QueryStream { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index d2643f63e..fe1212d82 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -21,7 +21,8 @@ use crate::{DbErr, InnerConnection, QueryResult, Statement}; pub struct TransactionStream<'a> { stmt: Statement, conn: MutexGuard<'a, InnerConnection>, - #[borrows(mut conn, stmt)] + metric_callback: Option, + #[borrows(mut conn, stmt, metric_callback)] #[not_covariant] stream: Pin> + 'this>>, } @@ -33,15 +34,17 @@ impl<'a> std::fmt::Debug for TransactionStream<'a> { } impl<'a> TransactionStream<'a> { - #[instrument(level = "trace")] + #[instrument(level = "trace", skip(metric_callback))] pub(crate) async fn build( conn: MutexGuard<'a, InnerConnection>, stmt: Statement, + metric_callback: Option, ) -> TransactionStream<'a> { TransactionStreamAsyncBuilder { stmt, conn, - stream_builder: |conn, stmt| { + metric_callback, + stream_builder: |conn, stmt, metric_callback| { Box::pin(async move { match conn.deref_mut() { #[cfg(feature = "sqlx-mysql")] @@ -53,7 +56,7 @@ impl<'a> TransactionStream<'a> { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ) as Pin>>>; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, @@ -71,7 +74,7 @@ impl<'a> TransactionStream<'a> { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ) as Pin>>>; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, @@ -89,7 +92,7 @@ impl<'a> TransactionStream<'a> { .map_ok(Into::into) .map_err(crate::sqlx_error_to_query_err), ) as Pin>>>; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: stmt, diff --git a/src/database/transaction.rs b/src/database/transaction.rs index e26067db6..e6dbf83ae 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -17,6 +17,7 @@ pub struct DatabaseTransaction { conn: Arc>, backend: DbBackend, open: bool, + metric_callback: Option, } impl std::fmt::Debug for DatabaseTransaction { @@ -29,10 +30,12 @@ impl DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] pub(crate) async fn new_mysql( inner: PoolConnection, + metric_callback: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::MySql(inner))), DbBackend::MySql, + metric_callback, ) .await } @@ -40,10 +43,12 @@ impl DatabaseTransaction { #[cfg(feature = "sqlx-postgres")] pub(crate) async fn new_postgres( inner: PoolConnection, + metric_callback: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::Postgres(inner))), DbBackend::Postgres, + metric_callback, ) .await } @@ -51,10 +56,12 @@ impl DatabaseTransaction { #[cfg(feature = "sqlx-sqlite")] pub(crate) async fn new_sqlite( inner: PoolConnection, + metric_callback: Option, ) -> Result { Self::begin( Arc::new(Mutex::new(InnerConnection::Sqlite(inner))), DbBackend::Sqlite, + metric_callback, ) .await } @@ -62,20 +69,27 @@ impl DatabaseTransaction { #[cfg(feature = "mock")] pub(crate) async fn new_mock( inner: Arc, + metric_callback: Option, ) -> Result { let backend = inner.get_database_backend(); - Self::begin(Arc::new(Mutex::new(InnerConnection::Mock(inner))), backend).await + Self::begin( + Arc::new(Mutex::new(InnerConnection::Mock(inner))), + backend, + metric_callback, + ).await } - #[instrument(level = "trace")] + #[instrument(level = "trace", skip(metric_callback))] async fn begin( conn: Arc>, backend: DbBackend, + metric_callback: Option, ) -> Result { let res = DatabaseTransaction { conn, backend, open: true, + metric_callback, }; match *res.conn.lock().await { #[cfg(feature = "sqlx-mysql")] @@ -245,7 +259,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); let _start = std::time::SystemTime::now(); let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -259,7 +273,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); let _start = std::time::SystemTime::now(); let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -273,7 +287,7 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); let _start = std::time::SystemTime::now(); let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -362,13 +376,17 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { stmt: Statement, ) -> Pin> + 'a>> { Box::pin( - async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt).await) }, + async move { Ok(crate::TransactionStream::build(self.conn.lock().await, stmt, self.metric_callback.clone()).await) }, ) } #[instrument(level = "trace")] async fn begin(&self) -> Result { - DatabaseTransaction::begin(Arc::clone(&self.conn), self.backend).await + DatabaseTransaction::begin( + Arc::clone(&self.conn), + self.backend, + self.metric_callback.clone() + ).await } /// Execute the function inside a transaction. diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 2b2efd688..7216c2264 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -21,9 +21,16 @@ use super::sqlx_common::*; pub struct SqlxMySqlConnector; /// Defines a sqlx MySQL pool -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SqlxMySqlPoolConnection { pool: MySqlPool, + metric_callback: Option, +} + +impl std::fmt::Debug for SqlxMySqlPoolConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SqlxMySqlPoolConnection {{ pool: {:?} }}", self.pool) + } } impl SqlxMySqlConnector { @@ -45,7 +52,7 @@ impl SqlxMySqlConnector { } match options.pool_options().connect_with(opt).await { Ok(pool) => Ok(DatabaseConnection::SqlxMySqlPoolConnection( - SqlxMySqlPoolConnection { pool }, + SqlxMySqlPoolConnection { pool, metric_callback: None }, )), Err(e) => Err(sqlx_error_to_conn_err(e)), } @@ -55,7 +62,7 @@ impl SqlxMySqlConnector { impl SqlxMySqlConnector { /// Instantiate a sqlx pool connection to a [DatabaseConnection] pub fn from_sqlx_mysql_pool(pool: MySqlPool) -> DatabaseConnection { - DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection { pool }) + DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection { pool, metric_callback: None }) } } @@ -72,7 +79,7 @@ impl SqlxMySqlPoolConnection { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -102,7 +109,7 @@ impl SqlxMySqlPoolConnection { _ => Err(DbErr::Query(err.to_string())), }, }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -129,7 +136,7 @@ impl SqlxMySqlPoolConnection { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -150,7 +157,7 @@ impl SqlxMySqlPoolConnection { debug_print!("{}", stmt); if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from((conn, stmt))) + Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -162,7 +169,7 @@ impl SqlxMySqlPoolConnection { #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_mysql(conn).await + DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()).await } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -182,7 +189,7 @@ impl SqlxMySqlPoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_mysql(conn) + let transaction = DatabaseTransaction::new_mysql(conn, self.metric_callback.clone()) .await .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await @@ -192,6 +199,13 @@ impl SqlxMySqlPoolConnection { ))) } } + + pub(crate) fn set_metric_callback(&mut self, callback: F) + where + F: Into, + { + self.metric_callback = Some(callback.into()); + } } impl From for QueryResult { diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index 121e4e72d..5f90e5152 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -21,9 +21,16 @@ use super::sqlx_common::*; pub struct SqlxPostgresConnector; /// Defines a sqlx PostgreSQL pool -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SqlxPostgresPoolConnection { pool: PgPool, + metric_callback: Option, +} + +impl std::fmt::Debug for SqlxPostgresPoolConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SqlxPostgresPoolConnection {{ pool: {:?} }}", self.pool) + } } impl SqlxPostgresConnector { @@ -45,7 +52,7 @@ impl SqlxPostgresConnector { } match options.pool_options().connect_with(opt).await { Ok(pool) => Ok(DatabaseConnection::SqlxPostgresPoolConnection( - SqlxPostgresPoolConnection { pool }, + SqlxPostgresPoolConnection { pool, metric_callback: None }, )), Err(e) => Err(sqlx_error_to_conn_err(e)), } @@ -55,7 +62,7 @@ impl SqlxPostgresConnector { impl SqlxPostgresConnector { /// Instantiate a sqlx pool connection to a [DatabaseConnection] pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection { - DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool }) + DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool, metric_callback: None }) } } @@ -72,7 +79,7 @@ impl SqlxPostgresPoolConnection { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -102,7 +109,7 @@ impl SqlxPostgresPoolConnection { _ => Err(DbErr::Query(err.to_string())), }, }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -129,7 +136,7 @@ impl SqlxPostgresPoolConnection { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -150,7 +157,7 @@ impl SqlxPostgresPoolConnection { debug_print!("{}", stmt); if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from((conn, stmt))) + Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -162,7 +169,7 @@ impl SqlxPostgresPoolConnection { #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_postgres(conn).await + DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()).await } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -182,7 +189,7 @@ impl SqlxPostgresPoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_postgres(conn) + let transaction = DatabaseTransaction::new_postgres(conn, self.metric_callback.clone()) .await .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await @@ -192,6 +199,13 @@ impl SqlxPostgresPoolConnection { ))) } } + + pub(crate) fn set_metric_callback(&mut self, callback: F) + where + F: Into, + { + self.metric_callback = Some(callback.into()); + } } impl From for QueryResult { diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index 9cfd9a723..20f4954e0 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -21,9 +21,16 @@ use super::sqlx_common::*; pub struct SqlxSqliteConnector; /// Defines a sqlx SQLite pool -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SqlxSqlitePoolConnection { pool: SqlitePool, + metric_callback: Option, +} + +impl std::fmt::Debug for SqlxSqlitePoolConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SqlxSqlitePoolConnection {{ pool: {:?} }}", self.pool) + } } impl SqlxSqliteConnector { @@ -49,7 +56,7 @@ impl SqlxSqliteConnector { } match options.pool_options().connect_with(opt).await { Ok(pool) => Ok(DatabaseConnection::SqlxSqlitePoolConnection( - SqlxSqlitePoolConnection { pool }, + SqlxSqlitePoolConnection { pool, metric_callback: None }, )), Err(e) => Err(sqlx_error_to_conn_err(e)), } @@ -59,7 +66,7 @@ impl SqlxSqliteConnector { impl SqlxSqliteConnector { /// Instantiate a sqlx pool connection to a [DatabaseConnection] pub fn from_sqlx_sqlite_pool(pool: SqlitePool) -> DatabaseConnection { - DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool }) + DatabaseConnection::SqlxSqlitePoolConnection(SqlxSqlitePoolConnection { pool, metric_callback: None }) } } @@ -76,7 +83,7 @@ impl SqlxSqlitePoolConnection { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -106,7 +113,7 @@ impl SqlxSqlitePoolConnection { _ => Err(DbErr::Query(err.to_string())), }, }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -133,7 +140,7 @@ impl SqlxSqlitePoolConnection { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), }; - if let Some(callback) = crate::metric::get_callback() { + if let Some(callback) = self.metric_callback.as_deref() { let info = crate::metric::Info { elapsed: _start.elapsed().unwrap_or_default(), statement: &stmt, @@ -154,7 +161,7 @@ impl SqlxSqlitePoolConnection { debug_print!("{}", stmt); if let Ok(conn) = self.pool.acquire().await { - Ok(QueryStream::from((conn, stmt))) + Ok(QueryStream::from((conn, stmt, self.metric_callback.clone()))) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -166,7 +173,7 @@ impl SqlxSqlitePoolConnection { #[instrument(level = "trace")] pub async fn begin(&self) -> Result { if let Ok(conn) = self.pool.acquire().await { - DatabaseTransaction::new_sqlite(conn).await + DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()).await } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -186,7 +193,7 @@ impl SqlxSqlitePoolConnection { E: std::error::Error + Send, { if let Ok(conn) = self.pool.acquire().await { - let transaction = DatabaseTransaction::new_sqlite(conn) + let transaction = DatabaseTransaction::new_sqlite(conn, self.metric_callback.clone()) .await .map_err(|e| TransactionError::Connection(e))?; transaction.run(callback).await @@ -196,6 +203,13 @@ impl SqlxSqlitePoolConnection { ))) } } + + pub(crate) fn set_metric_callback(&mut self, callback: F) + where + F: Into, + { + self.metric_callback = Some(callback.into()); + } } impl From for QueryResult { diff --git a/src/metric.rs b/src/metric.rs index 4eab0e749..4ea62ef23 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -1,10 +1,6 @@ -use std::time::Duration; +use std::{time::Duration, sync::Arc}; -use once_cell::sync::OnceCell; - -type Callback = Box) + Send + Sync>; - -static METRIC: OnceCell = OnceCell::new(); +pub(crate) type Callback = Arc) + Send + Sync>; #[derive(Debug)] /// Query execution infos @@ -14,15 +10,3 @@ pub struct Info<'a> { /// Query data pub statement: &'a crate::Statement, } - -/// Sets a new metric callback, returning it if already set -pub fn set_callback(callback: F) -> Result<(), Callback> -where - F: Fn(&Info<'_>) + Send + Sync + 'static, -{ - METRIC.set(Box::new(callback)) -} - -pub(crate) fn get_callback() -> Option<&'static Callback> { - METRIC.get() -} From aaf11dd2654e00a915a79488d2c4e4fd118ad601 Mon Sep 17 00:00:00 2001 From: Marco Napetti Date: Thu, 9 Dec 2021 19:17:41 +0100 Subject: [PATCH 4/5] Into doesn't works --- src/database/db_connection.rs | 2 +- src/driver/sqlx_mysql.rs | 6 +++--- src/driver/sqlx_postgres.rs | 6 +++--- src/driver/sqlx_sqlite.rs | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 3a090b6f1..24b30e9cd 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -249,7 +249,7 @@ impl DatabaseConnection { /// Sets a callback to metric this connection pub fn set_metric_callback(&mut self, callback: F) where - F: Into, + F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static, { match self { #[cfg(feature = "sqlx-mysql")] diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 7216c2264..338b09d17 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ mysql::{MySqlArguments, MySqlConnectOptions, MySqlQueryResult, MySqlRow}, @@ -202,9 +202,9 @@ impl SqlxMySqlPoolConnection { pub(crate) fn set_metric_callback(&mut self, callback: F) where - F: Into, + F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static, { - self.metric_callback = Some(callback.into()); + self.metric_callback = Some(Arc::new(callback)); } } diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index 5f90e5152..7eebe94f6 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ postgres::{PgArguments, PgConnectOptions, PgQueryResult, PgRow}, @@ -202,9 +202,9 @@ impl SqlxPostgresPoolConnection { pub(crate) fn set_metric_callback(&mut self, callback: F) where - F: Into, + F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static, { - self.metric_callback = Some(callback.into()); + self.metric_callback = Some(Arc::new(callback)); } } diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index 20f4954e0..c2afbbf4e 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, sync::Arc}; use sqlx::{ sqlite::{SqliteArguments, SqliteConnectOptions, SqliteQueryResult, SqliteRow}, @@ -206,9 +206,9 @@ impl SqlxSqlitePoolConnection { pub(crate) fn set_metric_callback(&mut self, callback: F) where - F: Into, + F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static, { - self.metric_callback = Some(callback.into()); + self.metric_callback = Some(Arc::new(callback)); } } From 9a342546f3a329e0561cf6566e699875fbe02eea Mon Sep 17 00:00:00 2001 From: Marco Napetti Date: Fri, 10 Dec 2021 10:09:09 +0100 Subject: [PATCH 5/5] use macro to simplify code, add failure boolean --- src/database/stream/query.rs | 63 ++++++++++-------------------- src/database/stream/transaction.rs | 63 ++++++++++-------------------- src/database/transaction.rs | 39 +++++------------- src/driver/sqlx_mysql.rs | 63 ++++++++++-------------------- src/driver/sqlx_postgres.rs | 63 ++++++++++-------------------- src/driver/sqlx_sqlite.rs | 63 ++++++++++-------------------- src/metric.rs | 43 ++++++++++++++++++++ 7 files changed, 157 insertions(+), 240 deletions(-) diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index 2147a8dc6..f967268b2 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -73,56 +73,35 @@ impl QueryStream { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => { let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ); - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + }) } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(c) => { let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ); - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + }) } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(c) => { let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ); - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) + }) } #[cfg(feature = "mock")] InnerConnection::Mock(c) => c.fetch(stmt), diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index fe1212d82..1995e0b4f 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -50,56 +50,35 @@ impl<'a> TransactionStream<'a> { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(c) => { let query = crate::driver::sqlx_mysql::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) as Pin>>>; - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) as Pin>>> + }) } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(c) => { let query = crate::driver::sqlx_postgres::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) as Pin>>>; - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) as Pin>>> + }) } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(c) => { let query = crate::driver::sqlx_sqlite::sqlx_query(stmt); - let _start = std::time::SystemTime::now(); - let res = Box::pin( - c.fetch(query) - .map_ok(Into::into) - .map_err(crate::sqlx_error_to_query_err), - ) as Pin>>>; - if let Some(callback) = metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: stmt, - }; - callback(&info); - } - res + crate::metric::metric_ok!(metric_callback, stmt, { + Box::pin( + c.fetch(query) + .map_ok(Into::into) + .map_err(crate::sqlx_error_to_query_err), + ) as Pin>>> + }) } #[cfg(feature = "mock")] InnerConnection::Mock(c) => c.fetch(stmt), diff --git a/src/database/transaction.rs b/src/database/transaction.rs index e6dbf83ae..790483b49 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -257,44 +257,23 @@ impl<'a> ConnectionTrait<'a> for DatabaseTransaction { #[cfg(feature = "sqlx-mysql")] InnerConnection::MySql(conn) => { let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - let _start = std::time::SystemTime::now(); - let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) } #[cfg(feature = "sqlx-postgres")] InnerConnection::Postgres(conn) => { let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - let _start = std::time::SystemTime::now(); - let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) } #[cfg(feature = "sqlx-sqlite")] InnerConnection::Sqlite(conn) => { let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - let _start = std::time::SystemTime::now(); - let res = query.execute(conn).await.map(Into::into); - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) } #[cfg(feature = "mock")] InnerConnection::Mock(conn) => return conn.execute(stmt), diff --git a/src/driver/sqlx_mysql.rs b/src/driver/sqlx_mysql.rs index 338b09d17..e818a6c41 100644 --- a/src/driver/sqlx_mysql.rs +++ b/src/driver/sqlx_mysql.rs @@ -74,19 +74,12 @@ impl SqlxMySqlPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -101,22 +94,15 @@ impl SqlxMySqlPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(DbErr::Query(err.to_string())), - }, - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(DbErr::Query(err.to_string())), + }, + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -131,19 +117,12 @@ impl SqlxMySqlPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), diff --git a/src/driver/sqlx_postgres.rs b/src/driver/sqlx_postgres.rs index 7eebe94f6..bf72f0de0 100644 --- a/src/driver/sqlx_postgres.rs +++ b/src/driver/sqlx_postgres.rs @@ -74,19 +74,12 @@ impl SqlxPostgresPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -101,22 +94,15 @@ impl SqlxPostgresPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(DbErr::Query(err.to_string())), - }, - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(DbErr::Query(err.to_string())), + }, + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -131,19 +117,12 @@ impl SqlxPostgresPoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), diff --git a/src/driver/sqlx_sqlite.rs b/src/driver/sqlx_sqlite.rs index c2afbbf4e..d3498dc12 100644 --- a/src/driver/sqlx_sqlite.rs +++ b/src/driver/sqlx_sqlite.rs @@ -78,19 +78,12 @@ impl SqlxSqlitePoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.execute(conn).await { - Ok(res) => Ok(res.into()), - Err(err) => Err(sqlx_error_to_exec_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.execute(conn).await { + Ok(res) => Ok(res.into()), + Err(err) => Err(sqlx_error_to_exec_err(err)), + } + }) } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), @@ -105,22 +98,15 @@ impl SqlxSqlitePoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_one(conn).await { - Ok(row) => Ok(Some(row.into())), - Err(err) => match err { - sqlx::Error::RowNotFound => Ok(None), - _ => Err(DbErr::Query(err.to_string())), - }, - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_one(conn).await { + Ok(row) => Ok(Some(row.into())), + Err(err) => match err { + sqlx::Error::RowNotFound => Ok(None), + _ => Err(DbErr::Query(err.to_string())), + }, + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), @@ -135,19 +121,12 @@ impl SqlxSqlitePoolConnection { let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { - let _start = std::time::SystemTime::now(); - let res = match query.fetch_all(conn).await { - Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), - Err(err) => Err(sqlx_error_to_query_err(err)), - }; - if let Some(callback) = self.metric_callback.as_deref() { - let info = crate::metric::Info { - elapsed: _start.elapsed().unwrap_or_default(), - statement: &stmt, - }; - callback(&info); - } - res + crate::metric::metric!(self.metric_callback, &stmt, { + match query.fetch_all(conn).await { + Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), + Err(err) => Err(sqlx_error_to_query_err(err)), + } + }) } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), diff --git a/src/metric.rs b/src/metric.rs index 4ea62ef23..7aa3a0a2e 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -2,6 +2,8 @@ use std::{time::Duration, sync::Arc}; pub(crate) type Callback = Arc) + Send + Sync>; +pub(crate) use inner::{metric, metric_ok}; + #[derive(Debug)] /// Query execution infos pub struct Info<'a> { @@ -9,4 +11,45 @@ pub struct Info<'a> { pub elapsed: Duration, /// Query data pub statement: &'a crate::Statement, + /// Query execution failed + pub failed: bool, +} + +mod inner { + macro_rules! metric { + ($metric_callback:expr, $stmt:expr, $code:block) => { + { + let _start = std::time::SystemTime::now(); + let res = $code; + if let Some(callback) = $metric_callback.as_deref() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: $stmt, + failed: res.is_err(), + }; + callback(&info); + } + res + } + }; + } + pub(crate) use metric; + macro_rules! metric_ok { + ($metric_callback:expr, $stmt:expr, $code:block) => { + { + let _start = std::time::SystemTime::now(); + let res = $code; + if let Some(callback) = $metric_callback.as_deref() { + let info = crate::metric::Info { + elapsed: _start.elapsed().unwrap_or_default(), + statement: $stmt, + failed: false, + }; + callback(&info); + } + res + } + }; + } + pub(crate) use metric_ok; }