Skip to content

Commit

Permalink
Merge pull request #2 from himadripal/sql-catalog-conn-pool-fix-2
Browse files Browse the repository at this point in the history
create sqlconfig, fix rest of the tests and remove todo
  • Loading branch information
JanKaul authored Apr 18, 2024
2 parents f98fde3 + aaa0181 commit b2cbc5d
Showing 1 changed file with 50 additions and 18 deletions.
68 changes: 50 additions & 18 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,21 @@ use iceberg::{
TableIdent,
};
use std::time::Duration;
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::error::from_sqlx_error;

/// Sql catalog config
#[derive(Debug, TypedBuilder)]
pub struct SqlCatalogConfig {
url: String,
name: String,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}

#[derive(Debug)]
/// Sql catalog implementation.
pub struct SqlCatalog {
Expand All @@ -50,14 +61,29 @@ pub struct SqlCatalog {

impl SqlCatalog {
/// Create new sql catalog instance
pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
install_default_drivers();
let max_connections: u32 = config
.props
.get("pool.max-connections")
.map(|v| v.parse().unwrap())
.unwrap_or(10);
let idle_timeout: u64 = config
.props
.get("pool.idle-timeout")
.map(|v| v.parse().unwrap())
.unwrap_or(10);
let test_before_acquire: bool = config
.props
.get("pool.test-before-acquire")
.map(|v| v.parse().unwrap())
.unwrap_or(true);

let pool = AnyPoolOptions::new()
.max_connections(20) //configurable?
.idle_timeout(Duration::from_secs(20))
.test_before_acquire(true)
.connect(url)
.max_connections(max_connections)
.idle_timeout(Duration::from_secs(idle_timeout))
.test_before_acquire(test_before_acquire)
.connect(&config.url)
.await
.map_err(from_sqlx_error)?;

Expand Down Expand Up @@ -88,10 +114,14 @@ impl SqlCatalog {
.await
.map_err(from_sqlx_error)?;

let file_io = FileIO::from_path(&config.warehouse)?
.with_props(&config.props)
.build()?;

Ok(SqlCatalog {
name: name.to_owned(),
name: config.name.to_owned(),
connection: pool,
storage,
storage: file_io,
cache: Arc::new(DashMap::new()),
})
}
Expand Down Expand Up @@ -281,7 +311,6 @@ impl Catalog for SqlCatalog {
let name = name.clone();
let metadata_location = metadata_location.to_string();

//TODO do we need to check if table exists,then update or delete and insert, otherwise unique constraint violation occurs
sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);")
.bind(&catalog_name)
.bind(&namespace)
Expand Down Expand Up @@ -312,19 +341,18 @@ impl Catalog for SqlCatalog {
#[cfg(test)]
pub mod tests {
use iceberg::{
io::FileIOBuilder,
spec::{NestedField, PrimitiveType, Schema, Type},
Catalog, NamespaceIdent, TableCreation, TableIdent,
};
use tempfile::TempDir;

use crate::SqlCatalog;
use crate::{SqlCatalog, SqlCatalogConfig};
use sqlx::migrate::MigrateDatabase;

#[tokio::test]
async fn test_create_update_drop_table() {
let dir = TempDir::new().unwrap();
let storage = FileIOBuilder::new_fs_io().build().unwrap();
let dir = TempDir::with_prefix("sql-test").unwrap();
let warehouse_root = dir.path().to_str().unwrap();

//name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found)
let sql_lite_url = "sqlite://iceberg";
Expand All @@ -333,9 +361,13 @@ pub mod tests {
sqlx::Sqlite::create_database(sql_lite_url).await.unwrap();
}

let catalog = SqlCatalog::new(sql_lite_url, "iceberg", storage)
.await
.unwrap();
let config = SqlCatalogConfig::builder()
.url(sql_lite_url.to_string())
.name("iceberg".to_string())
.warehouse(warehouse_root.to_owned())
.build();

let catalog = SqlCatalog::new(config).await.unwrap();

let namespace = NamespaceIdent::new("test".to_owned());

Expand All @@ -352,7 +384,7 @@ pub mod tests {

let creation = TableCreation::builder()
.name("table1".to_owned())
.location(dir.path().to_str().unwrap().to_owned() + "/warehouse/table1")
.location(warehouse_root.to_owned() + "/warehouse/table1")
.schema(schema)
.build();

Expand All @@ -378,9 +410,9 @@ pub mod tests {

//load table points to a /var location - check why

// let table = catalog.load_table(&identifier).await.unwrap();
let table = catalog.load_table(&identifier).await.unwrap();

// assert!(table.metadata().location().ends_with("/warehouse/table1"))
assert!(table.metadata().location().ends_with("/warehouse/table1"));

//tear down the database and tables
sqlx::Sqlite::drop_database(sql_lite_url).await.unwrap();
Expand Down

0 comments on commit b2cbc5d

Please sign in to comment.