Skip to content

Commit

Permalink
catalog: Fix savepoint upper calculation (#28360)
Browse files Browse the repository at this point in the history
The savepoint catalog mode creates a logical branch of catalog state.
Previously, savepoint catalogs would look at the upper of the catalog
persist shard to determine the current catalog upper. Looking at the
persist shard gives us the upper of the "main" catalog state branch, but
not the current savepoint branch upper. Instead, we should be looking at
the upper saved in memory.

This commit fixes the issue by updating the fetch upper logical and adds
some sanity asserts to various related spots in the code.

Fixes #28326
  • Loading branch information
jkosh44 authored and ParkMyCar committed Jul 19, 2024
1 parent 3664a3b commit 93272a7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl Catalog {
};

let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
let mut txn = storage.transaction().await?;

// Migrate/update durable data before we start loading the in-memory catalog.
Expand Down
18 changes: 12 additions & 6 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,16 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
/// Fetch the current upper of the catalog state.
#[mz_ore::instrument]
async fn current_upper(&mut self) -> Timestamp {
self.write_handle
.fetch_recent_upper()
.await
.as_option()
.cloned()
.expect("we use a totally ordered time and never finalize the shard")
match self.mode {
Mode::Writable | Mode::Readonly => self
.write_handle
.fetch_recent_upper()
.await
.as_option()
.cloned()
.expect("we use a totally ordered time and never finalize the shard"),
Mode::Savepoint => self.upper,
}
}

/// Appends `updates` iff the current global upper of the catalog is `self.upper`.
Expand All @@ -296,6 +300,8 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
&mut self,
updates: Vec<(S, Diff)>,
) -> Result<Timestamp, CatalogError> {
assert_eq!(self.mode, Mode::Writable);

let updates = updates.into_iter().map(|(kind, diff)| {
let kind: StateUpdateKindRaw = kind.into();
((Into::<SourceData>::into(kind), ()), self.upper, diff)
Expand Down
66 changes: 66 additions & 0 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,13 @@ where
mod tests {
use super::*;

use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::PersistClient;
use uuid::Uuid;

use crate::durable::{test_bootstrap_args, test_persist_backed_catalog_state};
use crate::memory;

#[mz_ore::test]
fn test_table_transaction_simple() {
fn uniqueness_violation(a: &String, b: &String) -> bool {
Expand Down Expand Up @@ -2942,4 +2949,63 @@ mod tests {
let pending = table_txn.pending::<Vec<u8>, String>();
assert!(pending.is_empty());
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_savepoint() {
let deploy_generation = 0;
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let openable_state1 =
test_persist_backed_catalog_state(persist_client.clone(), organization_id).await;
let openable_state2 =
test_persist_backed_catalog_state(persist_client, organization_id).await;

// Initialize catalog.
let _ = openable_state1
.open(
SYSTEM_TIME(),
&test_bootstrap_args(),
deploy_generation,
None,
)
.await
.unwrap();
let mut savepoint_state = openable_state2
.open_savepoint(
SYSTEM_TIME(),
&test_bootstrap_args(),
deploy_generation,
None,
)
.await
.unwrap();

let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
assert!(!initial_snapshot.is_empty());

let db_name = "db";
let db_owner = RoleId::User(42);
let db_privileges = Vec::new();
let mut txn = savepoint_state.transaction().await.unwrap();
let (db_id, db_oid) = txn
.insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
.unwrap();
txn.commit_internal().await.unwrap();
let updates = savepoint_state.sync_to_current_updates().await.unwrap();
let update = updates.into_element();

assert_eq!(update.diff, StateDiff::Addition);

let db = match update.kind {
memory::objects::StateUpdateKind::Database(db) => db,
update => panic!("unexpected update: {update:?}"),
};

assert_eq!(db_id, db.id);
assert_eq!(db_oid, db.oid);
assert_eq!(db_name, db.name);
assert_eq!(db_owner, db.owner_id);
assert_eq!(db_privileges, db.privileges);
}
}

0 comments on commit 93272a7

Please sign in to comment.