diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 53a8dd967df1b..7e28550ac5c69 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -263,6 +263,7 @@ impl Catalog { }; let mut updates: Vec<_> = storage.sync_to_current_updates().await?; + assert!(!updates.is_empty(), "initial catalog snapshot is missing"); let mut txn = storage.transaction().await?; // Migrate/update durable data before we start loading the in-memory catalog. diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index 0148128756c03..a719b4940670e 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -280,12 +280,16 @@ impl> PersistHandle { /// Fetch the current upper of the catalog state. #[mz_ore::instrument] async fn current_upper(&mut self) -> Timestamp { - self.write_handle - .fetch_recent_upper() - .await - .as_option() - .cloned() - .expect("we use a totally ordered time and never finalize the shard") + match self.mode { + Mode::Writable | Mode::Readonly => self + .write_handle + .fetch_recent_upper() + .await + .as_option() + .cloned() + .expect("we use a totally ordered time and never finalize the shard"), + Mode::Savepoint => self.upper, + } } /// Appends `updates` iff the current global upper of the catalog is `self.upper`. @@ -296,6 +300,8 @@ impl> PersistHandle { &mut self, updates: Vec<(S, Diff)>, ) -> Result { + assert_eq!(self.mode, Mode::Writable); + let updates = updates.into_iter().map(|(kind, diff)| { let kind: StateUpdateKindRaw = kind.into(); ((Into::::into(kind), ()), self.upper, diff) diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 587589981c3e1..c19d8f50f591b 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -2658,6 +2658,13 @@ where mod tests { use super::*; + use mz_ore::now::SYSTEM_TIME; + use mz_persist_client::PersistClient; + use uuid::Uuid; + + use crate::durable::{test_bootstrap_args, test_persist_backed_catalog_state}; + use crate::memory; + #[mz_ore::test] fn test_table_transaction_simple() { fn uniqueness_violation(a: &String, b: &String) -> bool { @@ -2942,4 +2949,63 @@ mod tests { let pending = table_txn.pending::, String>(); assert!(pending.is_empty()); } + + #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn test_savepoint() { + let deploy_generation = 0; + let persist_client = PersistClient::new_for_tests().await; + let organization_id = Uuid::new_v4(); + let openable_state1 = + test_persist_backed_catalog_state(persist_client.clone(), organization_id).await; + let openable_state2 = + test_persist_backed_catalog_state(persist_client, organization_id).await; + + // Initialize catalog. + let _ = openable_state1 + .open( + SYSTEM_TIME(), + &test_bootstrap_args(), + deploy_generation, + None, + ) + .await + .unwrap(); + let mut savepoint_state = openable_state2 + .open_savepoint( + SYSTEM_TIME(), + &test_bootstrap_args(), + deploy_generation, + None, + ) + .await + .unwrap(); + + let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap(); + assert!(!initial_snapshot.is_empty()); + + let db_name = "db"; + let db_owner = RoleId::User(42); + let db_privileges = Vec::new(); + let mut txn = savepoint_state.transaction().await.unwrap(); + let (db_id, db_oid) = txn + .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new()) + .unwrap(); + txn.commit_internal().await.unwrap(); + let updates = savepoint_state.sync_to_current_updates().await.unwrap(); + let update = updates.into_element(); + + assert_eq!(update.diff, StateDiff::Addition); + + let db = match update.kind { + memory::objects::StateUpdateKind::Database(db) => db, + update => panic!("unexpected update: {update:?}"), + }; + + assert_eq!(db_id, db.id); + assert_eq!(db_oid, db.oid); + assert_eq!(db_name, db.name); + assert_eq!(db_owner, db.owner_id); + assert_eq!(db_privileges, db.privileges); + } }