diff --git a/Cargo.toml b/Cargo.toml index c2aec2c77..d4944d627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ keywords = ["async", "orm", "mysql", "postgres", "sqlite"] rust-version = "1.65" [package.metadata.docs.rs] -features = ["default", "sqlx-all", "mock", "runtime-async-std-native-tls", "postgres-array", "sea-orm-internal"] +features = ["default", "sqlx-all", "mock", "proxy", "runtime-async-std-native-tls", "postgres-array", "sea-orm-internal"] rustdoc-args = ["--cfg", "docsrs"] [lib] @@ -76,6 +76,7 @@ default = [ ] macros = ["sea-orm-macros/derive"] mock = [] +proxy = ["serde_json"] with-json = ["serde_json", "sea-query/with-json", "chrono?/serde", "time?/serde", "uuid?/serde", "sea-query-binder?/with-json", "sqlx?/json"] with-chrono = ["chrono", "sea-query/with-chrono", "sea-query-binder?/with-chrono", "sqlx?/chrono"] with-rust_decimal = ["rust_decimal", "sea-query/with-rust_decimal", "sea-query-binder?/with-rust_decimal", "sqlx?/rust_decimal"] diff --git a/examples/proxy_gluesql_example/Cargo.toml b/examples/proxy_gluesql_example/Cargo.toml new file mode 100644 index 000000000..bc495135a --- /dev/null +++ b/examples/proxy_gluesql_example/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "sea-orm-proxy-gluesql-example" +version = "0.1.0" +authors = ["Langyo "] +edition = "2021" +publish = false + +[workspace] + +[dependencies] +async-std = { version = "1.12", features = ["attributes", "tokio1"] } +serde_json = { version = "1" } +serde = { version = "1" } +futures = { version = "0.3" } +async-stream = { version = "0.3" } +futures-util = { version = "0.3" } + +sqlparser = "0.40" +sea-orm = { path = "../../", features = [ + "proxy", + "debug-print", +] } +gluesql = { version = "0.15", default-features = false, features = [ + "memory-storage", +] } + +[dev-dependencies] +smol = { version = "1.2" } +smol-potat = { version = "1.1" } diff --git a/examples/proxy_gluesql_example/README.md b/examples/proxy_gluesql_example/README.md new file mode 100644 index 000000000..2269c471e --- /dev/null +++ b/examples/proxy_gluesql_example/README.md @@ -0,0 +1,7 @@ +# SeaORM Proxy Demo for GlueSQL + +Run this demo for [GlueSQL](https://gluesql.org/) with the following command: + +```bash +cargo run +``` diff --git a/examples/proxy_gluesql_example/src/entity/mod.rs b/examples/proxy_gluesql_example/src/entity/mod.rs new file mode 100644 index 000000000..e8b6291ac --- /dev/null +++ b/examples/proxy_gluesql_example/src/entity/mod.rs @@ -0,0 +1 @@ +pub mod post; diff --git a/examples/proxy_gluesql_example/src/entity/post.rs b/examples/proxy_gluesql_example/src/entity/post.rs new file mode 100644 index 000000000..868846046 --- /dev/null +++ b/examples/proxy_gluesql_example/src/entity/post.rs @@ -0,0 +1,17 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)] +#[sea_orm(table_name = "posts")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i64, + + pub title: String, + pub text: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/examples/proxy_gluesql_example/src/main.rs b/examples/proxy_gluesql_example/src/main.rs new file mode 100644 index 000000000..17c942d71 --- /dev/null +++ b/examples/proxy_gluesql_example/src/main.rs @@ -0,0 +1,190 @@ +//! Proxy connection example. + +#![deny(missing_docs)] + +mod entity; + +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; + +use gluesql::{memory_storage::MemoryStorage, prelude::Glue}; +use sea_orm::{ + ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult, + ProxyRow, Statement, +}; + +use entity::post::{ActiveModel, Entity}; + +struct ProxyDb { + mem: Mutex>, +} + +impl std::fmt::Debug for ProxyDb { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProxyDb").finish() + } +} + +impl ProxyDatabaseTrait for ProxyDb { + fn query(&self, statement: Statement) -> Result, DbErr> { + println!("SQL query: {:?}", statement); + let sql = statement.sql.clone(); + + let mut ret: Vec = vec![]; + async_std::task::block_on(async { + for payload in self.mem.lock().unwrap().execute(sql).await.unwrap().iter() { + match payload { + gluesql::prelude::Payload::Select { labels, rows } => { + for row in rows.iter() { + let mut map = BTreeMap::new(); + for (label, column) in labels.iter().zip(row.iter()) { + map.insert( + label.to_owned(), + match column { + gluesql::prelude::Value::I64(val) => { + sea_orm::Value::BigInt(Some(*val)) + } + gluesql::prelude::Value::Str(val) => { + sea_orm::Value::String(Some(Box::new(val.to_owned()))) + } + _ => unreachable!("Unsupported value: {:?}", column), + }, + ); + } + ret.push(map.into()); + } + } + _ => unreachable!("Unsupported payload: {:?}", payload), + } + } + }); + + Ok(ret) + } + + fn execute(&self, statement: Statement) -> Result { + let sql = if let Some(values) = statement.values { + // Replace all the '?' with the statement values + use sqlparser::ast::{Expr, Value}; + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + + let dialect = GenericDialect {}; + let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap(); + match &mut ast[0] { + sqlparser::ast::Statement::Insert { + columns, source, .. + } => { + for item in columns.iter_mut() { + item.quote_style = Some('"'); + } + + if let Some(obj) = source { + match &mut *obj.body { + sqlparser::ast::SetExpr::Values(obj) => { + for (mut item, val) in obj.rows[0].iter_mut().zip(values.0.iter()) { + match &mut item { + Expr::Value(item) => { + *item = match val { + sea_orm::Value::String(val) => { + Value::SingleQuotedString(match val { + Some(val) => val.to_string(), + None => "".to_string(), + }) + } + sea_orm::Value::BigInt(val) => Value::Number( + val.unwrap_or(0).to_string(), + false, + ), + _ => todo!(), + }; + } + _ => todo!(), + } + } + } + _ => todo!(), + } + } + } + _ => todo!(), + } + + let statement = &ast[0]; + statement.to_string() + } else { + statement.sql + }; + + println!("SQL execute: {}", sql); + async_std::task::block_on(async { + self.mem.lock().unwrap().execute(sql).await.unwrap(); + }); + + Ok(ProxyExecResult { + last_insert_id: 1, + rows_affected: 1, + }) + } +} + +#[async_std::main] +async fn main() { + let mem = MemoryStorage::default(); + let mut glue = Glue::new(mem); + + glue.execute( + r#" + CREATE TABLE IF NOT EXISTS posts ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + text TEXT NOT NULL + ) + "#, + ) + .await + .unwrap(); + + let db = Database::connect_proxy( + DbBackend::Sqlite, + Arc::new(Mutex::new(Box::new(ProxyDb { + mem: Mutex::new(glue), + }))), + ) + .await + .unwrap(); + + println!("Initialized"); + + let data = ActiveModel { + id: Set(11), + title: Set("Homo".to_owned()), + text: Set("いいよ、来いよ".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + id: Set(45), + title: Set("Homo".to_owned()), + text: Set("そうだよ".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + id: Set(14), + title: Set("Homo".to_owned()), + text: Set("悔い改めて".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + + let list = Entity::find().all(&db).await.unwrap().to_vec(); + println!("Result: {:?}", list); +} + +#[cfg(test)] +mod tests { + #[smol_potat::test] + async fn try_run() { + crate::main() + } +} diff --git a/examples/proxy_surrealdb_example/Cargo.toml b/examples/proxy_surrealdb_example/Cargo.toml new file mode 100644 index 000000000..4b16af515 --- /dev/null +++ b/examples/proxy_surrealdb_example/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "sea-orm-proxy-surrealdb-example" +version = "0.1.0" +authors = ["Langyo "] +edition = "2021" +publish = false + +[workspace] + +[dependencies] +async-std = { version = "1.12", features = ["attributes", "tokio1"] } +serde_json = { version = "1" } +serde = { version = "1" } +futures = { version = "0.3" } +async-stream = { version = "0.3" } +futures-util = { version = "0.3" } + +sqlparser = "0.40" +sea-orm = { path = "../../", features = [ + "proxy", + "debug-print", +] } +surrealdb = { version = "1", features = ["kv-mem"] } + +[dev-dependencies] +smol = { version = "1.2" } +smol-potat = { version = "1.1" } diff --git a/examples/proxy_surrealdb_example/README.md b/examples/proxy_surrealdb_example/README.md new file mode 100644 index 000000000..773850bc3 --- /dev/null +++ b/examples/proxy_surrealdb_example/README.md @@ -0,0 +1,7 @@ +# SeaORM Proxy Demo for SurrealDB + +Run this demo for [SurrealDB](https://surrealdb.com/) with the following command: + +```bash +cargo run +``` diff --git a/examples/proxy_surrealdb_example/src/entity/mod.rs b/examples/proxy_surrealdb_example/src/entity/mod.rs new file mode 100644 index 000000000..e8b6291ac --- /dev/null +++ b/examples/proxy_surrealdb_example/src/entity/mod.rs @@ -0,0 +1 @@ +pub mod post; diff --git a/examples/proxy_surrealdb_example/src/entity/post.rs b/examples/proxy_surrealdb_example/src/entity/post.rs new file mode 100644 index 000000000..1fde0c4cc --- /dev/null +++ b/examples/proxy_surrealdb_example/src/entity/post.rs @@ -0,0 +1,17 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)] +#[sea_orm(table_name = "posts")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: String, + + pub title: String, + pub text: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/examples/proxy_surrealdb_example/src/main.rs b/examples/proxy_surrealdb_example/src/main.rs new file mode 100644 index 000000000..b23f33c5f --- /dev/null +++ b/examples/proxy_surrealdb_example/src/main.rs @@ -0,0 +1,271 @@ +//! Proxy connection example. + +#![deny(missing_docs)] + +mod entity; + +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; + +use sea_orm::{ + ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult, + ProxyRow, Statement, +}; +use surrealdb::{ + engine::local::{Db, Mem}, + Surreal, +}; + +use entity::post::{ActiveModel, Entity}; + +#[derive(Debug)] +struct ProxyDb { + mem: Surreal, +} + +impl ProxyDatabaseTrait for ProxyDb { + fn query(&self, statement: Statement) -> Result, DbErr> { + println!("SQL query: {:?}", statement); + let mut ret = async_std::task::block_on(async { + // Surrealdb's grammar is not compatible with sea-orm's + // so we need to remove the extra clauses + // from "SELECT `from`.`col` FROM `from` WHERE `from`.`col` = xx" + // to "SELECT `col` FROM `from` WHERE `col` = xx" + + use sqlparser::ast::{Expr, SelectItem, SetExpr, TableFactor}; + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + + let dialect = GenericDialect {}; + let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap(); + match &mut ast[0] { + sqlparser::ast::Statement::Query(query) => match &mut *query.body { + SetExpr::Select(body) => { + body.projection.iter_mut().for_each(|item| { + match item { + SelectItem::UnnamedExpr(expr) => { + match expr { + Expr::CompoundIdentifier(idents) => { + // Remove the head of the identifier + // e.g. `from`.`col` -> `col` + let ident = idents.pop().unwrap(); + *expr = Expr::Identifier(ident); + } + _ => todo!(), + } + } + _ => todo!(), + } + }); + body.from.iter_mut().for_each(|item| { + match &mut item.relation { + TableFactor::Table { name, .. } => { + // Remove the head of the identifier + // e.g. `from`.`col` -> `col` + let ident = name.0.pop().unwrap(); + name.0 = vec![ident]; + } + _ => todo!(), + } + }); + } + _ => todo!(), + }, + _ => todo!(), + }; + + let statement = &ast[0]; + let sql = statement.to_string(); + println!("SQL: {}", sql); + self.mem.query(sql).await + }) + .unwrap(); + + // Convert the result to sea-orm's format + let ret: Vec = ret.take(0).unwrap(); + println!("SQL query result: {}", serde_json::to_string(&ret).unwrap()); + let ret = ret + .iter() + .map(|row| { + let mut map = serde_json::Map::new(); + for (k, v) in row.as_object().unwrap().iter() { + if k == "id" { + // Get `tb` and `id` columns from surrealdb + // and convert them to sea-orm's `id` + let tb = v.as_object().unwrap().get("tb").unwrap().to_string(); + let id = v + .as_object() + .unwrap() + .get("id") + .unwrap() + .get("String") + .unwrap(); + + // Remove the quotes + let tb = tb.to_string().replace("\"", ""); + let id = id.to_string().replace("\"", ""); + + map.insert("id".to_owned(), format!("{}:{}", tb, id).into()); + continue; + } + + map.insert(k.to_owned(), v.to_owned()); + } + serde_json::Value::Object(map) + }) + .map(|v: serde_json::Value| { + let mut ret: BTreeMap = BTreeMap::new(); + for (k, v) in v.as_object().unwrap().iter() { + ret.insert( + k.to_owned(), + match v { + serde_json::Value::Bool(b) => { + sea_orm::Value::TinyInt(if *b { Some(1) } else { Some(0) }) + } + serde_json::Value::Number(n) => { + if n.is_i64() { + sea_orm::Value::BigInt(Some(n.as_i64().unwrap())) + } else if n.is_u64() { + sea_orm::Value::BigUnsigned(Some(n.as_u64().unwrap())) + } else if n.is_f64() { + sea_orm::Value::Double(Some(n.as_f64().unwrap())) + } else { + unreachable!() + } + } + serde_json::Value::String(s) => { + sea_orm::Value::String(Some(Box::new(s.to_owned()))) + } + _ => sea_orm::Value::Json(Some(Box::new(v.to_owned()))), + }, + ); + } + ProxyRow { values: ret } + }) + .collect::>(); + + Ok(ret) + } + + fn execute(&self, statement: Statement) -> Result { + async_std::task::block_on(async { + if let Some(values) = statement.values { + // Replace all the '?' with the statement values + use sqlparser::ast::{Expr, Value}; + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + + let dialect = GenericDialect {}; + let mut ast = Parser::parse_sql(&dialect, statement.sql.as_str()).unwrap(); + match &mut ast[0] { + sqlparser::ast::Statement::Insert { + table_name, + columns, + source, + .. + } => { + // Replace the table name's quote style + table_name.0[0].quote_style = Some('`'); + + // Replace all the column names' quote style + for item in columns.iter_mut() { + item.quote_style = Some('`'); + } + + // Convert the values to sea-orm's format + if let Some(obj) = source { + match &mut *obj.body { + sqlparser::ast::SetExpr::Values(obj) => { + for (mut item, val) in + obj.rows[0].iter_mut().zip(values.0.iter()) + { + match &mut item { + Expr::Value(item) => { + *item = match val { + sea_orm::Value::String(val) => { + Value::SingleQuotedString(match val { + Some(val) => val.to_string(), + None => "".to_string(), + }) + } + sea_orm::Value::BigInt(val) => Value::Number( + val.unwrap_or(0).to_string(), + false, + ), + _ => todo!(), + }; + } + _ => todo!(), + } + } + } + _ => todo!(), + } + } + } + _ => todo!(), + } + + let statement = &ast[0]; + let sql = statement.to_string(); + println!("SQL: {}", sql); + self.mem.query(sql).await + } else { + self.mem.query(statement.sql).await + } + }) + .unwrap(); + + Ok(ProxyExecResult { + last_insert_id: 1, + rows_affected: 1, + }) + } +} + +#[async_std::main] +async fn main() { + let mem = Surreal::new::(()).await.unwrap(); + mem.use_ns("test").use_db("post").await.unwrap(); + + let db = Database::connect_proxy( + DbBackend::MySql, + Arc::new(Mutex::new(Box::new(ProxyDb { mem }))), + ) + .await + .unwrap(); + + println!("Initialized"); + + let data = ActiveModel { + title: Set("Homo".to_owned()), + text: Set("いいよ、来いよ".to_owned()), + ..Default::default() + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + title: Set("Homo".to_owned()), + text: Set("そうだよ".to_owned()), + ..Default::default() + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + title: Set("Homo".to_owned()), + text: Set("悔い改めて".to_owned()), + ..Default::default() + }; + Entity::insert(data).exec(&db).await.unwrap(); + + let list = Entity::find().all(&db).await.unwrap().to_vec(); + println!("Result: {:?}", list); +} + +#[cfg(test)] +mod tests { + #[smol_potat::test] + async fn try_run() { + crate::main() + } +} diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index ad2b56900..29815ec9f 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -10,7 +10,7 @@ use url::Url; #[cfg(feature = "sqlx-dep")] use sqlx::pool::PoolConnection; -#[cfg(feature = "mock")] +#[cfg(any(feature = "mock", feature = "proxy"))] use std::sync::Arc; /// Handle a database connection depending on the backend @@ -20,15 +20,23 @@ pub enum DatabaseConnection { /// Create a MYSQL database connection and pool #[cfg(feature = "sqlx-mysql")] SqlxMySqlPoolConnection(crate::SqlxMySqlPoolConnection), - /// Create a PostgreSQL database connection and pool + + /// Create a PostgreSQL database connection and pool #[cfg(feature = "sqlx-postgres")] SqlxPostgresPoolConnection(crate::SqlxPostgresPoolConnection), - /// Create a SQLite database connection and pool + + /// Create a SQLite database connection and pool #[cfg(feature = "sqlx-sqlite")] SqlxSqlitePoolConnection(crate::SqlxSqlitePoolConnection), - /// Create a Mock database connection useful for testing + + /// Create a Mock database connection useful for testing #[cfg(feature = "mock")] MockDatabaseConnection(Arc), + + /// Create a Proxy database connection useful for proxying + #[cfg(feature = "proxy")] + ProxyDatabaseConnection(Arc), + /// The connection to the database has been severed Disconnected, } @@ -66,7 +74,9 @@ pub(crate) enum InnerConnection { #[cfg(feature = "sqlx-sqlite")] Sqlite(PoolConnection), #[cfg(feature = "mock")] - Mock(std::sync::Arc), + Mock(Arc), + #[cfg(feature = "proxy")] + Proxy(Arc), } impl std::fmt::Debug for DatabaseConnection { @@ -83,6 +93,8 @@ impl std::fmt::Debug for DatabaseConnection { Self::SqlxSqlitePoolConnection(_) => "SqlxSqlitePoolConnection", #[cfg(feature = "mock")] Self::MockDatabaseConnection(_) => "MockDatabaseConnection", + #[cfg(feature = "proxy")] + Self::ProxyDatabaseConnection(_) => "ProxyDatabaseConnection", Self::Disconnected => "Disconnected", } ) @@ -101,6 +113,8 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(_) => DbBackend::Sqlite, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => conn.get_database_backend(), + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => conn.get_database_backend(), DatabaseConnection::Disconnected => panic!("Disconnected"), } } @@ -117,6 +131,8 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.execute(stmt).await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => conn.execute(stmt), + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => conn.execute(stmt), DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -141,6 +157,12 @@ impl ConnectionTrait for DatabaseConnection { let stmt = Statement::from_string(db_backend, sql); conn.execute(stmt) } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + let db_backend = conn.get_database_backend(); + let stmt = Statement::from_string(db_backend, sql); + conn.execute(stmt) + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -157,6 +179,8 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_one(stmt).await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => conn.query_one(stmt), + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => conn.query_one(stmt), DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -173,6 +197,8 @@ impl ConnectionTrait for DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.query_all(stmt).await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => conn.query_all(stmt), + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => conn.query_all(stmt), DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -205,6 +231,10 @@ impl StreamTrait for DatabaseConnection { DatabaseConnection::MockDatabaseConnection(conn) => { Ok(crate::QueryStream::from((Arc::clone(conn), stmt, None))) } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + Ok(crate::QueryStream::from((Arc::clone(conn), stmt, None))) + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } }) @@ -226,6 +256,10 @@ impl TransactionTrait for DatabaseConnection { DatabaseConnection::MockDatabaseConnection(conn) => { DatabaseTransaction::new_mock(Arc::clone(conn), None).await } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + DatabaseTransaction::new_proxy(conn.clone(), None).await + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -253,6 +287,10 @@ impl TransactionTrait for DatabaseConnection { DatabaseConnection::MockDatabaseConnection(conn) => { DatabaseTransaction::new_mock(Arc::clone(conn), None).await } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + DatabaseTransaction::new_proxy(conn.clone(), None).await + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -289,6 +327,13 @@ impl TransactionTrait for DatabaseConnection { .map_err(TransactionError::Connection)?; transaction.run(_callback).await } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + let transaction = DatabaseTransaction::new_proxy(conn.clone(), None) + .await + .map_err(TransactionError::Connection)?; + transaction.run(_callback).await + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected").into()), } } @@ -333,6 +378,13 @@ impl TransactionTrait for DatabaseConnection { .map_err(TransactionError::Connection)?; transaction.run(_callback).await } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => { + let transaction = DatabaseTransaction::new_proxy(conn.clone(), None) + .await + .map_err(TransactionError::Connection)?; + transaction.run(_callback).await + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected").into()), } } @@ -367,6 +419,21 @@ impl DatabaseConnection { } } +#[cfg(feature = "proxy")] +impl DatabaseConnection { + /// Generate a database connection for testing the Proxy database + /// + /// # Panics + /// + /// Panics if [DbConn] is not a proxy connection. + pub fn as_proxy_connection(&self) -> &crate::ProxyDatabaseConnection { + match self { + DatabaseConnection::ProxyDatabaseConnection(proxy_conn) => proxy_conn, + _ => panic!("Not proxy connection"), + } + } +} + impl DatabaseConnection { /// Sets a callback to metric this connection pub fn set_metric_callback(&mut self, _callback: F) @@ -401,6 +468,8 @@ impl DatabaseConnection { DatabaseConnection::SqlxSqlitePoolConnection(conn) => conn.ping().await, #[cfg(feature = "mock")] DatabaseConnection::MockDatabaseConnection(conn) => conn.ping(), + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(conn) => conn.ping(), DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } @@ -419,6 +488,11 @@ impl DatabaseConnection { // Nothing to cleanup, we just consume the `DatabaseConnection` Ok(()) } + #[cfg(feature = "proxy")] + DatabaseConnection::ProxyDatabaseConnection(_) => { + // Nothing to cleanup, we just consume the `DatabaseConnection` + Ok(()) + } DatabaseConnection::Disconnected => Err(conn_err("Disconnected")), } } diff --git a/src/database/mock.rs b/src/database/mock.rs index e760ceaf7..6462766f2 100644 --- a/src/database/mock.rs +++ b/src/database/mock.rs @@ -30,7 +30,8 @@ pub struct MockExecResult { /// which is just a [BTreeMap]<[String], [Value]> #[derive(Clone, Debug)] pub struct MockRow { - values: BTreeMap, + /// The values of the single row + pub values: BTreeMap, } /// A trait to get a [MockRow] from a type useful for testing in the [MockDatabase] diff --git a/src/database/mod.rs b/src/database/mod.rs index 36fe5709f..a844d1c85 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -5,6 +5,9 @@ mod db_connection; #[cfg(feature = "mock")] #[cfg_attr(docsrs, doc(cfg(feature = "mock")))] mod mock; +#[cfg(feature = "proxy")] +#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))] +mod proxy; mod statement; mod stream; mod transaction; @@ -14,6 +17,9 @@ pub use db_connection::*; #[cfg(feature = "mock")] #[cfg_attr(docsrs, doc(cfg(feature = "mock")))] pub use mock::*; +#[cfg(feature = "proxy")] +#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))] +pub use proxy::*; pub use statement::*; use std::borrow::Cow; pub use stream::*; @@ -79,11 +85,41 @@ impl Database { if crate::MockDatabaseConnector::accepts(&opt.url) { return crate::MockDatabaseConnector::connect(&opt.url).await; } + Err(conn_err(format!( "The connection string '{}' has no supporting driver.", opt.url ))) } + + /// Method to create a [DatabaseConnection] on a proxy database + #[cfg(feature = "proxy")] + #[instrument(level = "trace", skip(proxy_func_arc))] + pub async fn connect_proxy( + db_type: DbBackend, + proxy_func_arc: std::sync::Arc>>, + ) -> Result { + match db_type { + DbBackend::MySql => { + return crate::ProxyDatabaseConnector::connect( + DbBackend::MySql, + proxy_func_arc.to_owned(), + ); + } + DbBackend::Postgres => { + return crate::ProxyDatabaseConnector::connect( + DbBackend::Postgres, + proxy_func_arc.to_owned(), + ); + } + DbBackend::Sqlite => { + return crate::ProxyDatabaseConnector::connect( + DbBackend::Sqlite, + proxy_func_arc.to_owned(), + ); + } + } + } } impl From for ConnectOptions diff --git a/src/database/proxy.rs b/src/database/proxy.rs new file mode 100644 index 000000000..470810670 --- /dev/null +++ b/src/database/proxy.rs @@ -0,0 +1,925 @@ +use crate::{error::*, ExecResult, ExecResultHolder, QueryResult, QueryResultRow, Statement}; + +use sea_query::{Value, ValueType}; +use std::{collections::BTreeMap, fmt::Debug}; + +/// Defines the [ProxyDatabaseTrait] to save the functions +pub trait ProxyDatabaseTrait: Send + Sync + std::fmt::Debug { + /// Execute a query in the [ProxyDatabase], and return the query results + fn query(&self, statement: Statement) -> Result, DbErr>; + + /// Execute a command in the [ProxyDatabase], and report the number of rows affected + fn execute(&self, statement: Statement) -> Result; + + /// Begin a transaction in the [ProxyDatabase] + fn begin(&self) {} + + /// Commit a transaction in the [ProxyDatabase] + fn commit(&self) {} + + /// Rollback a transaction in the [ProxyDatabase] + fn rollback(&self) {} + + /// Ping the [ProxyDatabase], it should return an error if the database is not available + fn ping(&self) -> Result<(), DbErr> { + Ok(()) + } +} + +/// Defines the results obtained from a [ProxyDatabase] +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +pub struct ProxyExecResult { + /// The last inserted id on auto-increment + pub last_insert_id: u64, + /// The number of rows affected by the database operation + pub rows_affected: u64, +} + +impl ProxyExecResult { + /// Create a new [ProxyExecResult] from the last inserted id and the number of rows affected + pub fn new(last_insert_id: u64, rows_affected: u64) -> Self { + Self { + last_insert_id, + rows_affected, + } + } +} + +impl Default for ExecResultHolder { + fn default() -> Self { + Self::Proxy(ProxyExecResult::default()) + } +} + +impl From for ExecResult { + fn from(result: ProxyExecResult) -> Self { + Self { + result: ExecResultHolder::Proxy(result), + } + } +} + +impl From for ProxyExecResult { + fn from(result: ExecResult) -> Self { + match result.result { + #[cfg(feature = "sqlx-mysql")] + ExecResultHolder::SqlxMySql(result) => Self { + last_insert_id: result.last_insert_id() as u64, + rows_affected: result.rows_affected(), + }, + #[cfg(feature = "sqlx-postgres")] + ExecResultHolder::SqlxPostgres(result) => Self { + last_insert_id: 0, + rows_affected: result.rows_affected(), + }, + #[cfg(feature = "sqlx-sqlite")] + ExecResultHolder::SqlxSqlite(result) => Self { + last_insert_id: result.last_insert_rowid() as u64, + rows_affected: result.rows_affected(), + }, + #[cfg(feature = "mock")] + ExecResultHolder::Mock(result) => Self { + last_insert_id: result.last_insert_id, + rows_affected: result.rows_affected, + }, + ExecResultHolder::Proxy(result) => result, + } + } +} + +/// Defines the structure of a Row for the [ProxyDatabase] +/// which is just a [BTreeMap]<[String], [Value]> +#[derive(Clone, Debug)] +pub struct ProxyRow { + /// The values of the single row + pub values: BTreeMap, +} + +impl ProxyRow { + /// Create a new [ProxyRow] from a [BTreeMap]<[String], [Value]> + pub fn new(values: BTreeMap) -> Self { + Self { values } + } +} + +impl Default for ProxyRow { + fn default() -> Self { + Self { + values: BTreeMap::new(), + } + } +} + +impl From> for ProxyRow { + fn from(values: BTreeMap) -> Self { + Self { values } + } +} + +impl From for BTreeMap { + fn from(row: ProxyRow) -> Self { + row.values + } +} + +impl From for Vec<(String, Value)> { + fn from(row: ProxyRow) -> Self { + row.values.into_iter().collect() + } +} + +impl From for QueryResult { + fn from(row: ProxyRow) -> Self { + QueryResult { + row: QueryResultRow::Proxy(row), + } + } +} + +#[cfg(feature = "with-json")] +impl Into for ProxyRow { + fn into(self) -> serde_json::Value { + self.values + .into_iter() + .map(|(k, v)| (k, sea_query::sea_value_to_json_value(&v))) + .collect() + } +} + +/// Convert [QueryResult] to [ProxyRow] +pub fn from_query_result_to_proxy_row(result: &QueryResult) -> ProxyRow { + match &result.row { + #[cfg(feature = "sqlx-mysql")] + QueryResultRow::SqlxMySql(row) => from_sqlx_mysql_row_to_proxy_row(&row), + #[cfg(feature = "sqlx-postgres")] + QueryResultRow::SqlxPostgres(row) => from_sqlx_postgres_row_to_proxy_row(&row), + #[cfg(feature = "sqlx-sqlite")] + QueryResultRow::SqlxSqlite(row) => from_sqlx_sqlite_row_to_proxy_row(&row), + #[cfg(feature = "mock")] + QueryResultRow::Mock(row) => ProxyRow { + values: row.values.clone(), + }, + QueryResultRow::Proxy(row) => row.to_owned(), + } +} + +#[cfg(feature = "sqlx-mysql")] +pub(crate) fn from_sqlx_mysql_row_to_proxy_row(row: &sqlx::mysql::MySqlRow) -> ProxyRow { + // https://docs.rs/sqlx-mysql/0.7.2/src/sqlx_mysql/protocol/text/column.rs.html + // https://docs.rs/sqlx-mysql/0.7.2/sqlx_mysql/types/index.html + use sqlx::{Column, Row, TypeInfo}; + ProxyRow { + values: row + .columns() + .iter() + .map(|c| { + ( + c.name().to_string(), + match c.type_info().name() { + "TINYINT(1)" | "BOOLEAN" => Value::Bool(Some( + row.try_get(c.ordinal()).expect("Failed to get boolean"), + )), + "TINYINT UNSIGNED" => Value::TinyUnsigned(Some( + row.try_get(c.ordinal()) + .expect("Failed to get unsigned tiny integer"), + )), + "SMALLINT UNSIGNED" => Value::SmallUnsigned(Some( + row.try_get(c.ordinal()) + .expect("Failed to get unsigned small integer"), + )), + "INT UNSIGNED" => Value::Unsigned(Some( + row.try_get(c.ordinal()) + .expect("Failed to get unsigned integer"), + )), + "MEDIUMINT UNSIGNED" | "BIGINT UNSIGNED" => Value::BigUnsigned(Some( + row.try_get(c.ordinal()) + .expect("Failed to get unsigned big integer"), + )), + "TINYINT" => Value::TinyInt(Some( + row.try_get(c.ordinal()) + .expect("Failed to get tiny integer"), + )), + "SMALLINT" => Value::SmallInt(Some( + row.try_get(c.ordinal()) + .expect("Failed to get small integer"), + )), + "INT" => Value::Int(Some( + row.try_get(c.ordinal()).expect("Failed to get integer"), + )), + "MEDIUMINT" | "BIGINT" => Value::BigInt(Some( + row.try_get(c.ordinal()).expect("Failed to get big integer"), + )), + "FLOAT" => Value::Float(Some( + row.try_get(c.ordinal()).expect("Failed to get float"), + )), + "DOUBLE" => Value::Double(Some( + row.try_get(c.ordinal()).expect("Failed to get double"), + )), + + "BIT" | "BINARY" | "VARBINARY" | "TINYBLOB" | "BLOB" | "MEDIUMBLOB" + | "LONGBLOB" => Value::Bytes(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get bytes"), + ))), + + "CHAR" | "VARCHAR" | "TINYTEXT" | "TEXT" | "MEDIUMTEXT" | "LONGTEXT" => { + Value::String(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get string"), + ))) + } + + #[cfg(feature = "with-chrono")] + "TIMESTAMP" => Value::ChronoDateTimeUtc(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIMESTAMP" => Value::TimeDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + + #[cfg(feature = "with-chrono")] + "DATE" => Value::ChronoDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "DATE" => Value::TimeDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + + #[cfg(feature = "with-chrono")] + "TIME" => Value::ChronoTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIME" => Value::TimeTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + + #[cfg(feature = "with-chrono")] + "DATETIME" => Value::ChronoDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get datetime"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "DATETIME" => Value::TimeDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get datetime"), + ))), + + #[cfg(feature = "with-chrono")] + "YEAR" => Value::ChronoDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get year"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "YEAR" => Value::TimeDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get year"), + ))), + + "ENUM" | "SET" | "GEOMETRY" => Value::String(Some(Box::new( + row.try_get(c.ordinal()) + .expect("Failed to get serialized string"), + ))), + + #[cfg(feature = "with-bigdecimal")] + "DECIMAL" => Value::BigDecimal(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get decimal"), + ))), + #[cfg(all( + feature = "with-rust_decimal", + not(feature = "with-bigdecimal") + ))] + "DECIMAL" => Value::Decimal(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get decimal"), + ))), + + #[cfg(feature = "with-json")] + "JSON" => Value::Json(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get json"), + ))), + + _ => unreachable!("Unknown column type: {}", c.type_info().name()), + }, + ) + }) + .collect(), + } +} + +#[cfg(feature = "sqlx-postgres")] +pub(crate) fn from_sqlx_postgres_row_to_proxy_row(row: &sqlx::postgres::PgRow) -> ProxyRow { + // https://docs.rs/sqlx-postgres/0.7.2/src/sqlx_postgres/type_info.rs.html + // https://docs.rs/sqlx-postgres/0.7.2/sqlx_postgres/types/index.html + use sqlx::{Column, Row, TypeInfo}; + ProxyRow { + values: row + .columns() + .iter() + .map(|c| { + ( + c.name().to_string(), + match c.type_info().name() { + "BOOL" => Value::Bool(Some( + row.try_get(c.ordinal()).expect("Failed to get boolean"), + )), + #[cfg(feature = "postgres-array")] + "BOOL[]" => Value::Array( + sea_query::ArrayType::Bool, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get boolean array") + .iter() + .map(|val| Value::Bool(Some(*val))) + .collect(), + )), + ), + + "\"CHAR\"" => Value::TinyInt(Some( + row.try_get(c.ordinal()) + .expect("Failed to get small integer"), + )), + #[cfg(feature = "postgres-array")] + "\"CHAR\"[]" => Value::Array( + sea_query::ArrayType::TinyInt, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get small integer array") + .iter() + .map(|val| Value::TinyInt(Some(*val))) + .collect(), + )), + ), + + "SMALLINT" | "SMALLSERIAL" | "INT2" => Value::SmallInt(Some( + row.try_get(c.ordinal()) + .expect("Failed to get small integer"), + )), + #[cfg(feature = "postgres-array")] + "SMALLINT[]" | "SMALLSERIAL[]" | "INT2[]" => Value::Array( + sea_query::ArrayType::SmallInt, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get small integer array") + .iter() + .map(|val| Value::SmallInt(Some(*val))) + .collect(), + )), + ), + + "INT" | "SERIAL" | "INT4" => Value::Int(Some( + row.try_get(c.ordinal()).expect("Failed to get integer"), + )), + #[cfg(feature = "postgres-array")] + "INT[]" | "SERIAL[]" | "INT4[]" => Value::Array( + sea_query::ArrayType::Int, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get integer array") + .iter() + .map(|val| Value::Int(Some(*val))) + .collect(), + )), + ), + + "BIGINT" | "BIGSERIAL" | "INT8" => Value::BigInt(Some( + row.try_get(c.ordinal()).expect("Failed to get big integer"), + )), + #[cfg(feature = "postgres-array")] + "BIGINT[]" | "BIGSERIAL[]" | "INT8[]" => Value::Array( + sea_query::ArrayType::BigInt, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get big integer array") + .iter() + .map(|val| Value::BigInt(Some(*val))) + .collect(), + )), + ), + + "FLOAT4" | "REAL" => Value::Float(Some( + row.try_get(c.ordinal()).expect("Failed to get float"), + )), + #[cfg(feature = "postgres-array")] + "FLOAT4[]" | "REAL[]" => Value::Array( + sea_query::ArrayType::Float, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get float array") + .iter() + .map(|val| Value::Float(Some(*val))) + .collect(), + )), + ), + + "FLOAT8" | "DOUBLE PRECISION" => Value::Double(Some( + row.try_get(c.ordinal()).expect("Failed to get double"), + )), + #[cfg(feature = "postgres-array")] + "FLOAT8[]" | "DOUBLE PRECISION[]" => Value::Array( + sea_query::ArrayType::Double, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get double array") + .iter() + .map(|val| Value::Double(Some(*val))) + .collect(), + )), + ), + + "VARCHAR" | "CHAR" | "TEXT" | "NAME" => Value::String(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get string"), + ))), + #[cfg(feature = "postgres-array")] + "VARCHAR[]" | "CHAR[]" | "TEXT[]" | "NAME[]" => Value::Array( + sea_query::ArrayType::String, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get string array") + .iter() + .map(|val| Value::String(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + "BYTEA" => Value::Bytes(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get bytes"), + ))), + #[cfg(feature = "postgres-array")] + "BYTEA[]" => Value::Array( + sea_query::ArrayType::Bytes, + Some(Box::new( + row.try_get::>, _>(c.ordinal()) + .expect("Failed to get bytes array") + .iter() + .map(|val| Value::Bytes(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-bigdecimal")] + "NUMERIC" => Value::BigDecimal(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get numeric"), + ))), + #[cfg(all( + feature = "with-rust_decimal", + not(feature = "with-bigdecimal") + ))] + "NUMERIC" => Value::Decimal(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get numeric"), + ))), + + #[cfg(all(feature = "with-bigdecimal", feature = "postgres-array"))] + "NUMERIC[]" => Value::Array( + sea_query::ArrayType::BigDecimal, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get numeric array") + .iter() + .map(|val| Value::BigDecimal(Some(Box::new(val.clone())))) + .collect(), + )), + ), + #[cfg(all( + feature = "with-rust_decimal", + not(feature = "with-bigdecimal"), + feature = "postgres-array" + ))] + "NUMERIC[]" => Value::Array( + sea_query::ArrayType::Decimal, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get numeric array") + .iter() + .map(|val| Value::Decimal(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + "OID" => Value::BigInt(Some( + row.try_get(c.ordinal()).expect("Failed to get oid"), + )), + #[cfg(feature = "postgres-array")] + "OID[]" => Value::Array( + sea_query::ArrayType::BigInt, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get oid array") + .iter() + .map(|val| Value::BigInt(Some(*val))) + .collect(), + )), + ), + + "JSON" | "JSONB" => Value::Json(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get json"), + ))), + #[cfg(any(feature = "json-array", feature = "postgres-array"))] + "JSON[]" | "JSONB[]" => Value::Array( + sea_query::ArrayType::Json, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get json array") + .iter() + .map(|val| Value::Json(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-ipnetwork")] + "INET" | "CIDR" => Value::IpNetwork(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get ip address"), + ))), + #[cfg(feature = "with-ipnetwork")] + "INET[]" | "CIDR[]" => Value::Array( + sea_query::ArrayType::IpNetwork, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get ip address array") + .iter() + .map(|val| Value::IpNetwork(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-mac_address")] + "MACADDR" | "MACADDR8" => Value::MacAddress(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get mac address"), + ))), + #[cfg(all(feature = "with-mac_address", feature = "postgres-array"))] + "MACADDR[]" | "MACADDR8[]" => Value::Array( + sea_query::ArrayType::MacAddress, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get mac address array") + .iter() + .map(|val| Value::MacAddress(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-chrono")] + "TIMESTAMP" => Value::ChronoDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIMESTAMP" => Value::TimeDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + + #[cfg(all(feature = "with-chrono", feature = "postgres-array"))] + "TIMESTAMP[]" => Value::Array( + sea_query::ArrayType::ChronoDateTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get timestamp array") + .iter() + .map(|val| Value::ChronoDateTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + #[cfg(all( + feature = "with-time", + not(feature = "with-chrono"), + feature = "postgres-array" + ))] + "TIMESTAMP[]" => Value::Array( + sea_query::ArrayType::TimeDateTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get timestamp array") + .iter() + .map(|val| Value::TimeDateTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-chrono")] + "DATE" => Value::ChronoDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "DATE" => Value::TimeDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + + #[cfg(all(feature = "with-chrono", feature = "postgres-array"))] + "DATE[]" => Value::Array( + sea_query::ArrayType::ChronoDate, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get date array") + .iter() + .map(|val| Value::ChronoDate(Some(Box::new(val.clone())))) + .collect(), + )), + ), + #[cfg(all( + feature = "with-time", + not(feature = "with-chrono"), + feature = "postgres-array" + ))] + "DATE[]" => Value::Array( + sea_query::ArrayType::TimeDate, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get date array") + .iter() + .map(|val| Value::TimeDate(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-chrono")] + "TIME" => Value::ChronoTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIME" => Value::TimeTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + + #[cfg(all(feature = "with-chrono", feature = "postgres-array"))] + "TIME[]" => Value::Array( + sea_query::ArrayType::ChronoTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get time array") + .iter() + .map(|val| Value::ChronoTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + #[cfg(all( + feature = "with-time", + not(feature = "with-chrono"), + feature = "postgres-array" + ))] + "TIME[]" => Value::Array( + sea_query::ArrayType::TimeTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get time array") + .iter() + .map(|val| Value::TimeTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-chrono")] + "TIMESTAMPTZ" => Value::ChronoDateTimeUtc(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamptz"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIMESTAMPTZ" => Value::TimeDateTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamptz"), + ))), + + #[cfg(all(feature = "with-chrono", feature = "postgres-array"))] + "TIMESTAMPTZ[]" => Value::Array( + sea_query::ArrayType::ChronoDateTimeUtc, + Some(Box::new( + row.try_get::>, _>(c.ordinal()) + .expect("Failed to get timestamptz array") + .iter() + .map(|val| { + Value::ChronoDateTimeUtc(Some(Box::new(val.clone()))) + }) + .collect(), + )), + ), + #[cfg(all( + feature = "with-time", + not(feature = "with-chrono"), + feature = "postgres-array" + ))] + "TIMESTAMPTZ[]" => Value::Array( + sea_query::ArrayType::TimeDateTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get timestamptz array") + .iter() + .map(|val| Value::TimeDateTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-chrono")] + "TIMETZ" => Value::ChronoTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timetz"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIMETZ" => Value::TimeTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timetz"), + ))), + + #[cfg(all(feature = "with-chrono", feature = "postgres-array"))] + "TIMETZ[]" => Value::Array( + sea_query::ArrayType::ChronoTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get timetz array") + .iter() + .map(|val| Value::ChronoTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + #[cfg(all( + feature = "with-time", + not(feature = "with-chrono"), + feature = "postgres-array" + ))] + "TIMETZ[]" => Value::Array( + sea_query::ArrayType::TimeTime, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get timetz array") + .iter() + .map(|val| Value::TimeTime(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + #[cfg(feature = "with-uuid")] + "UUID" => Value::Uuid(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get uuid"), + ))), + + #[cfg(all(feature = "with-uuid", feature = "postgres-array"))] + "UUID[]" => Value::Array( + sea_query::ArrayType::Uuid, + Some(Box::new( + row.try_get::, _>(c.ordinal()) + .expect("Failed to get uuid array") + .iter() + .map(|val| Value::Uuid(Some(Box::new(val.clone())))) + .collect(), + )), + ), + + _ => unreachable!("Unknown column type: {}", c.type_info().name()), + }, + ) + }) + .collect(), + } +} + +#[cfg(feature = "sqlx-sqlite")] +pub(crate) fn from_sqlx_sqlite_row_to_proxy_row(row: &sqlx::sqlite::SqliteRow) -> ProxyRow { + // https://docs.rs/sqlx-sqlite/0.7.2/src/sqlx_sqlite/type_info.rs.html + // https://docs.rs/sqlx-sqlite/0.7.2/sqlx_sqlite/types/index.html + use sqlx::{Column, Row, TypeInfo}; + ProxyRow { + values: row + .columns() + .iter() + .map(|c| { + ( + c.name().to_string(), + match c.type_info().name() { + "BOOLEAN" => Value::Bool(Some( + row.try_get(c.ordinal()).expect("Failed to get boolean"), + )), + + "INTEGER" => Value::Int(Some( + row.try_get(c.ordinal()).expect("Failed to get integer"), + )), + + "BIGINT" | "INT8" => Value::BigInt(Some( + row.try_get(c.ordinal()).expect("Failed to get big integer"), + )), + + "REAL" => Value::Double(Some( + row.try_get(c.ordinal()).expect("Failed to get double"), + )), + + "TEXT" => Value::String(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get string"), + ))), + + "BLOB" => Value::Bytes(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get bytes"), + ))), + + #[cfg(feature = "with-chrono")] + "DATETIME" => Value::ChronoDateTimeUtc(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "DATETIME" => Value::TimeDateTimeWithTimeZone(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get timestamp"), + ))), + + #[cfg(feature = "with-chrono")] + "DATE" => Value::ChronoDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "DATE" => Value::TimeDate(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get date"), + ))), + + #[cfg(feature = "with-chrono")] + "TIME" => Value::ChronoTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + #[cfg(all(feature = "with-time", not(feature = "with-chrono")))] + "TIME" => Value::TimeTime(Some(Box::new( + row.try_get(c.ordinal()).expect("Failed to get time"), + ))), + + _ => unreachable!("Unknown column type: {}", c.type_info().name()), + }, + ) + }) + .collect(), + } +} + +impl ProxyRow { + /// Get a value from the [ProxyRow] + pub fn try_get(&self, index: I) -> Result + where + T: ValueType, + { + if let Some(index) = index.as_str() { + T::try_from( + self.values + .get(index) + .ok_or_else(|| query_err(format!("No column for ColIdx {index:?}")))? + .clone(), + ) + .map_err(type_err) + } else if let Some(index) = index.as_usize() { + let (_, value) = self + .values + .iter() + .nth(*index) + .ok_or_else(|| query_err(format!("Column at index {index} not found")))?; + T::try_from(value.clone()).map_err(type_err) + } else { + unreachable!("Missing ColIdx implementation for ProxyRow"); + } + } + + /// An iterator over the keys and values of a proxy row + pub fn into_column_value_tuples(self) -> impl Iterator { + self.values.into_iter() + } +} + +#[cfg(test)] +mod tests { + use crate::{ + entity::*, tests_cfg::*, Database, DbBackend, DbErr, ProxyDatabaseTrait, ProxyExecResult, + ProxyRow, Statement, + }; + use std::sync::{Arc, Mutex}; + + #[derive(Debug)] + struct ProxyDb {} + + impl ProxyDatabaseTrait for ProxyDb { + fn query(&self, statement: Statement) -> Result, DbErr> { + println!("SQL query: {}", statement.sql); + Ok(vec![].into()) + } + + fn execute(&self, statement: Statement) -> Result { + println!("SQL execute: {}", statement.sql); + Ok(ProxyExecResult { + last_insert_id: 1, + rows_affected: 1, + }) + } + } + + #[smol_potat::test] + async fn create_proxy_conn() { + let _db = + Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {})))) + .await + .unwrap(); + } + + #[smol_potat::test] + async fn select_rows() { + let db = + Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {})))) + .await + .unwrap(); + + let _ = cake::Entity::find().all(&db).await; + } + + #[smol_potat::test] + async fn insert_one_row() { + let db = + Database::connect_proxy(DbBackend::MySql, Arc::new(Mutex::new(Box::new(ProxyDb {})))) + .await + .unwrap(); + + let item = cake::ActiveModel { + id: NotSet, + name: Set("Alice".to_string()), + }; + + cake::Entity::insert(item).exec(&db).await.unwrap(); + } +} diff --git a/src/database/stream/query.rs b/src/database/stream/query.rs index c38dc3b34..30642bde0 100644 --- a/src/database/stream/query.rs +++ b/src/database/stream/query.rs @@ -2,7 +2,7 @@ use tracing::instrument; -#[cfg(feature = "mock")] +#[cfg(any(feature = "mock", feature = "proxy"))] use std::sync::Arc; use std::{pin::Pin, task::Poll}; @@ -105,6 +105,25 @@ impl } } +#[cfg(feature = "proxy")] +impl + From<( + Arc, + Statement, + Option, + )> for QueryStream +{ + fn from( + (conn, stmt, metric_callback): ( + Arc, + Statement, + Option, + ), + ) -> Self { + QueryStream::build(stmt, InnerConnection::Proxy(conn), metric_callback) + } +} + impl std::fmt::Debug for QueryStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "QueryStream") @@ -163,6 +182,13 @@ impl QueryStream { let elapsed = _start.map(|s| s.elapsed().unwrap_or_default()); MetricStream::new(_metric_callback, stmt, elapsed, stream) } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(c) => { + let _start = _metric_callback.is_some().then(std::time::SystemTime::now); + let stream = c.fetch(stmt); + let elapsed = _start.map(|s| s.elapsed().unwrap_or_default()); + MetricStream::new(_metric_callback, stmt, elapsed, stream) + } #[allow(unreachable_patterns)] _ => unreachable!(), }, diff --git a/src/database/stream/transaction.rs b/src/database/stream/transaction.rs index d64feaba4..3e9285d15 100644 --- a/src/database/stream/transaction.rs +++ b/src/database/stream/transaction.rs @@ -86,6 +86,13 @@ impl<'a> TransactionStream<'a> { let elapsed = _start.map(|s| s.elapsed().unwrap_or_default()); MetricStream::new(_metric_callback, stmt, elapsed, stream) } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(c) => { + let _start = _metric_callback.is_some().then(std::time::SystemTime::now); + let stream = c.fetch(stmt); + let elapsed = _start.map(|s| s.elapsed().unwrap_or_default()); + MetricStream::new(_metric_callback, stmt, elapsed, stream) + } #[allow(unreachable_patterns)] _ => unreachable!(), }, diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 14765b214..3216ffd99 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -95,6 +95,22 @@ impl DatabaseTransaction { .await } + #[cfg(feature = "proxy")] + pub(crate) async fn new_proxy( + inner: Arc, + metric_callback: Option, + ) -> Result { + let backend = inner.get_database_backend(); + Self::begin( + Arc::new(Mutex::new(InnerConnection::Proxy(inner))), + backend, + metric_callback, + None, + None, + ) + .await + } + #[instrument(level = "trace", skip(metric_callback))] async fn begin( conn: Arc>, diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 33b6c847c..5e456e687 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -1,5 +1,7 @@ #[cfg(feature = "mock")] mod mock; +#[cfg(feature = "proxy")] +mod proxy; #[cfg(feature = "sqlx-dep")] mod sqlx_common; #[cfg(feature = "sqlx-mysql")] @@ -11,6 +13,8 @@ pub(crate) mod sqlx_sqlite; #[cfg(feature = "mock")] pub use mock::*; +#[cfg(feature = "proxy")] +pub use proxy::*; #[cfg(feature = "sqlx-dep")] pub use sqlx_common::*; #[cfg(feature = "sqlx-mysql")] diff --git a/src/driver/proxy.rs b/src/driver/proxy.rs new file mode 100644 index 000000000..1ed1ab81b --- /dev/null +++ b/src/driver/proxy.rs @@ -0,0 +1,140 @@ +use crate::{ + debug_print, error::*, DatabaseConnection, DbBackend, ExecResult, ProxyDatabaseTrait, + QueryResult, Statement, +}; +use futures::Stream; +use std::{ + fmt::Debug, + pin::Pin, + sync::{Arc, Mutex}, +}; +use tracing::instrument; + +/// Defines a database driver for the [ProxyDatabase] +#[derive(Debug)] +pub struct ProxyDatabaseConnector; + +/// Defines a connection for the [ProxyDatabase] +#[derive(Debug)] +pub struct ProxyDatabaseConnection { + db_backend: DbBackend, + proxy: Arc>>, +} + +impl ProxyDatabaseConnector { + /// Check if the database URI given and the [DatabaseBackend](crate::DatabaseBackend) selected are the same + #[allow(unused_variables)] + pub fn accepts(string: &str) -> bool { + // As this is a proxy database, it accepts any URI + true + } + + /// Connect to the [ProxyDatabase] + #[allow(unused_variables)] + #[instrument(level = "trace")] + pub fn connect( + db_type: DbBackend, + func: Arc>>, + ) -> Result { + Ok(DatabaseConnection::ProxyDatabaseConnection(Arc::new( + ProxyDatabaseConnection::new(db_type, func), + ))) + } +} + +impl ProxyDatabaseConnection { + /// Create a connection to the [ProxyDatabase] + pub fn new(db_backend: DbBackend, funcs: Arc>>) -> Self { + Self { + db_backend, + proxy: funcs.to_owned(), + } + } + + /// Get the [DatabaseBackend](crate::DatabaseBackend) being used by the [ProxyDatabase] + pub fn get_database_backend(&self) -> DbBackend { + self.db_backend + } + + /// Execute the SQL statement in the [ProxyDatabase] + #[instrument(level = "trace")] + pub fn execute(&self, statement: Statement) -> Result { + debug_print!("{}", statement); + Ok(self + .proxy + .lock() + .map_err(exec_err)? + .execute(statement)? + .into()) + } + + /// Return one [QueryResult] if the query was successful + #[instrument(level = "trace")] + pub fn query_one(&self, statement: Statement) -> Result, DbErr> { + debug_print!("{}", statement); + let result = self.proxy.lock().map_err(query_err)?.query(statement)?; + + if let Some(first) = result.first() { + return Ok(Some(QueryResult { + row: crate::QueryResultRow::Proxy(first.to_owned()), + })); + } else { + return Ok(None); + } + } + + /// Return all [QueryResult]s if the query was successful + #[instrument(level = "trace")] + pub fn query_all(&self, statement: Statement) -> Result, DbErr> { + debug_print!("{}", statement); + let result = self.proxy.lock().map_err(query_err)?.query(statement)?; + + Ok(result + .into_iter() + .map(|row| QueryResult { + row: crate::QueryResultRow::Proxy(row), + }) + .collect()) + } + + /// Return [QueryResult]s from a multi-query operation + #[instrument(level = "trace")] + pub fn fetch( + &self, + statement: &Statement, + ) -> Pin> + Send>> { + match self.query_all(statement.clone()) { + Ok(v) => Box::pin(futures::stream::iter(v.into_iter().map(Ok))), + Err(e) => Box::pin(futures::stream::iter(Some(Err(e)).into_iter())), + } + } + + /// Create a statement block of SQL statements that execute together. + #[instrument(level = "trace")] + pub fn begin(&self) { + self.proxy.lock().expect("Failed to acquire mocker").begin() + } + + /// Commit a transaction atomically to the database + #[instrument(level = "trace")] + pub fn commit(&self) { + self.proxy + .lock() + .expect("Failed to acquire mocker") + .commit() + } + + /// Roll back a faulty transaction + #[instrument(level = "trace")] + pub fn rollback(&self) { + self.proxy + .lock() + .expect("Failed to acquire mocker") + .rollback() + } + + /// Checks if a connection to the database is still valid. + pub fn ping(&self) -> Result<(), DbErr> { + self.proxy.lock().map_err(query_err)?.ping() + } +} diff --git a/src/executor/execute.rs b/src/executor/execute.rs index 487ca41aa..626dd81b8 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -22,6 +22,9 @@ pub(crate) enum ExecResultHolder { /// Holds the result of executing an operation on the Mock database #[cfg(feature = "mock")] Mock(crate::MockExecResult), + /// Holds the result of executing an operation on the Proxy database + #[cfg(feature = "proxy")] + Proxy(crate::ProxyExecResult), } // ExecResult // @@ -51,6 +54,8 @@ impl ExecResult { } #[cfg(feature = "mock")] ExecResultHolder::Mock(result) => result.last_insert_id, + #[cfg(feature = "proxy")] + ExecResultHolder::Proxy(result) => result.last_insert_id, #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -67,6 +72,8 @@ impl ExecResult { ExecResultHolder::SqlxSqlite(result) => result.rows_affected(), #[cfg(feature = "mock")] ExecResultHolder::Mock(result) => result.rows_affected, + #[cfg(feature = "proxy")] + ExecResultHolder::Proxy(result) => result.rows_affected, #[allow(unreachable_patterns)] _ => unreachable!(), } diff --git a/src/executor/query.rs b/src/executor/query.rs index a7df40abf..01c3a1d0e 100644 --- a/src/executor/query.rs +++ b/src/executor/query.rs @@ -1,7 +1,7 @@ use crate::{error::*, SelectGetableValue, SelectorRaw, Statement}; use std::fmt; -#[cfg(feature = "mock")] +#[cfg(any(feature = "mock", feature = "proxy"))] use crate::debug_print; #[cfg(feature = "sqlx-dep")] @@ -25,6 +25,8 @@ pub(crate) enum QueryResultRow { SqlxSqlite(sqlx::sqlite::SqliteRow), #[cfg(feature = "mock")] Mock(crate::MockRow), + #[cfg(feature = "proxy")] + Proxy(crate::ProxyRow), } /// An interface to get a value from the query result @@ -127,6 +129,8 @@ impl fmt::Debug for QueryResultRow { Self::SqlxSqlite(_) => write!(f, "QueryResultRow::SqlxSqlite cannot be inspected"), #[cfg(feature = "mock")] Self::Mock(row) => write!(f, "{row:?}"), + #[cfg(feature = "proxy")] + Self::Proxy(row) => write!(f, "{row:?}"), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -271,6 +275,11 @@ macro_rules! try_getable_all { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -306,6 +315,11 @@ macro_rules! try_getable_unsigned { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -342,6 +356,11 @@ macro_rules! try_getable_mysql { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -383,6 +402,11 @@ macro_rules! try_getable_date_time { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -478,6 +502,12 @@ impl TryGetable for Decimal { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -525,6 +555,12 @@ impl TryGetable for BigDecimal { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -559,6 +595,12 @@ macro_rules! try_getable_uuid { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get::(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), }; @@ -613,6 +655,12 @@ impl TryGetable for u32 { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -658,6 +706,12 @@ mod postgres_array { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -745,6 +799,13 @@ mod postgres_array { err_null_idx_col(idx) }) } + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => { + row.try_get::, _>(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }) + } #[allow(unreachable_patterns)] _ => unreachable!(), }; @@ -799,6 +860,12 @@ mod postgres_array { debug_print!("{:#?}", e.to_string()); err_null_idx_col(idx) }), + #[cfg(feature = "proxy")] + #[allow(unused_variables)] + QueryResultRow::Proxy(row) => row.try_get(idx).map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }), #[allow(unreachable_patterns)] _ => unreachable!(), } @@ -1014,6 +1081,14 @@ where err_null_idx_col(idx) }) .and_then(|json| serde_json::from_value(json).map_err(|e| json_err(e).into())), + #[cfg(feature = "proxy")] + QueryResultRow::Proxy(row) => row + .try_get::(idx) + .map_err(|e| { + debug_print!("{:#?}", e.to_string()); + err_null_idx_col(idx) + }) + .and_then(|json| serde_json::from_value(json).map_err(|e| json_err(e).into())), #[allow(unreachable_patterns)] _ => unreachable!(), } diff --git a/src/query/json.rs b/src/query/json.rs index 66b4dc978..da91fb3b6 100644 --- a/src/query/json.rs +++ b/src/query/json.rs @@ -214,6 +214,18 @@ impl FromQueryResult for JsonValue { } Ok(JsonValue::Object(map)) } + #[cfg(feature = "proxy")] + crate::QueryResultRow::Proxy(row) => { + for (column, value) in row.clone().into_column_value_tuples() { + let col = if !column.starts_with(pre) { + continue; + } else { + column.replacen(pre, "", 1) + }; + map.insert(col, sea_query::sea_value_to_json_value(&value)); + } + Ok(JsonValue::Object(map)) + } #[allow(unreachable_patterns)] _ => unreachable!(), }