From 93272a77341980672b4227e582e327f4c2949c5a Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Fri, 19 Jul 2024 07:33:53 -0700 Subject: [PATCH] catalog: Fix savepoint upper calculation (#28360) The savepoint catalog mode creates a logical branch of catalog state. Previously, savepoint catalogs would look at the upper of the catalog persist shard to determine the current catalog upper. Looking at the persist shard gives us the upper of the "main" catalog state branch, but not the current savepoint branch upper. Instead, we should be looking at the upper saved in memory. This commit fixes the issue by updating the fetch upper logical and adds some sanity asserts to various related spots in the code. Fixes #28326 --- src/adapter/src/catalog/open.rs | 1 + src/catalog/src/durable/persist.rs | 18 ++++--- src/catalog/src/durable/transaction.rs | 66 ++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 6 deletions(-) diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 53a8dd967df1b..7e28550ac5c69 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -263,6 +263,7 @@ impl Catalog { }; let mut updates: Vec<_> = storage.sync_to_current_updates().await?; + assert!(!updates.is_empty(), "initial catalog snapshot is missing"); let mut txn = storage.transaction().await?; // Migrate/update durable data before we start loading the in-memory catalog. diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index 0148128756c03..a719b4940670e 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -280,12 +280,16 @@ impl> PersistHandle { /// Fetch the current upper of the catalog state. #[mz_ore::instrument] async fn current_upper(&mut self) -> Timestamp { - self.write_handle - .fetch_recent_upper() - .await - .as_option() - .cloned() - .expect("we use a totally ordered time and never finalize the shard") + match self.mode { + Mode::Writable | Mode::Readonly => self + .write_handle + .fetch_recent_upper() + .await + .as_option() + .cloned() + .expect("we use a totally ordered time and never finalize the shard"), + Mode::Savepoint => self.upper, + } } /// Appends `updates` iff the current global upper of the catalog is `self.upper`. @@ -296,6 +300,8 @@ impl> PersistHandle { &mut self, updates: Vec<(S, Diff)>, ) -> Result { + assert_eq!(self.mode, Mode::Writable); + let updates = updates.into_iter().map(|(kind, diff)| { let kind: StateUpdateKindRaw = kind.into(); ((Into::::into(kind), ()), self.upper, diff) diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 587589981c3e1..c19d8f50f591b 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -2658,6 +2658,13 @@ where mod tests { use super::*; + use mz_ore::now::SYSTEM_TIME; + use mz_persist_client::PersistClient; + use uuid::Uuid; + + use crate::durable::{test_bootstrap_args, test_persist_backed_catalog_state}; + use crate::memory; + #[mz_ore::test] fn test_table_transaction_simple() { fn uniqueness_violation(a: &String, b: &String) -> bool { @@ -2942,4 +2949,63 @@ mod tests { let pending = table_txn.pending::, String>(); assert!(pending.is_empty()); } + + #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn test_savepoint() { + let deploy_generation = 0; + let persist_client = PersistClient::new_for_tests().await; + let organization_id = Uuid::new_v4(); + let openable_state1 = + test_persist_backed_catalog_state(persist_client.clone(), organization_id).await; + let openable_state2 = + test_persist_backed_catalog_state(persist_client, organization_id).await; + + // Initialize catalog. + let _ = openable_state1 + .open( + SYSTEM_TIME(), + &test_bootstrap_args(), + deploy_generation, + None, + ) + .await + .unwrap(); + let mut savepoint_state = openable_state2 + .open_savepoint( + SYSTEM_TIME(), + &test_bootstrap_args(), + deploy_generation, + None, + ) + .await + .unwrap(); + + let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap(); + assert!(!initial_snapshot.is_empty()); + + let db_name = "db"; + let db_owner = RoleId::User(42); + let db_privileges = Vec::new(); + let mut txn = savepoint_state.transaction().await.unwrap(); + let (db_id, db_oid) = txn + .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new()) + .unwrap(); + txn.commit_internal().await.unwrap(); + let updates = savepoint_state.sync_to_current_updates().await.unwrap(); + let update = updates.into_element(); + + assert_eq!(update.diff, StateDiff::Addition); + + let db = match update.kind { + memory::objects::StateUpdateKind::Database(db) => db, + update => panic!("unexpected update: {update:?}"), + }; + + assert_eq!(db_id, db.id); + assert_eq!(db_oid, db.oid); + assert_eq!(db_name, db.name); + assert_eq!(db_owner, db.owner_id); + assert_eq!(db_privileges, db.privileges); + } }