Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog: Fix savepoint upper calculation #28360

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl Catalog {
};

let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
let mut txn = storage.transaction().await?;

// Migrate/update durable data before we start loading the in-memory catalog.
Expand Down
18 changes: 12 additions & 6 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,16 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
/// Fetch the current upper of the catalog state.
#[mz_ore::instrument]
async fn current_upper(&mut self) -> Timestamp {
self.write_handle
.fetch_recent_upper()
.await
.as_option()
.cloned()
.expect("we use a totally ordered time and never finalize the shard")
match self.mode {
Mode::Writable | Mode::Readonly => self
.write_handle
.fetch_recent_upper()
.await
.as_option()
.cloned()
.expect("we use a totally ordered time and never finalize the shard"),
Mode::Savepoint => self.upper,
}
}

/// Appends `updates` iff the current global upper of the catalog is `self.upper`.
Expand All @@ -296,6 +300,8 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
&mut self,
updates: Vec<(S, Diff)>,
) -> Result<Timestamp, CatalogError> {
assert_eq!(self.mode, Mode::Writable);

let updates = updates.into_iter().map(|(kind, diff)| {
let kind: StateUpdateKindRaw = kind.into();
((Into::<SourceData>::into(kind), ()), self.upper, diff)
Expand Down
66 changes: 66 additions & 0 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,13 @@ where
mod tests {
use super::*;

use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::PersistClient;
use uuid::Uuid;

use crate::durable::{test_bootstrap_args, test_persist_backed_catalog_state};
use crate::memory;

#[mz_ore::test]
fn test_table_transaction_simple() {
fn uniqueness_violation(a: &String, b: &String) -> bool {
Expand Down Expand Up @@ -2942,4 +2949,63 @@ mod tests {
let pending = table_txn.pending::<Vec<u8>, String>();
assert!(pending.is_empty());
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_savepoint() {
let deploy_generation = 0;
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let openable_state1 =
test_persist_backed_catalog_state(persist_client.clone(), organization_id).await;
let openable_state2 =
test_persist_backed_catalog_state(persist_client, organization_id).await;

// Initialize catalog.
let _ = openable_state1
.open(
SYSTEM_TIME(),
&test_bootstrap_args(),
deploy_generation,
None,
)
.await
.unwrap();
let mut savepoint_state = openable_state2
.open_savepoint(
SYSTEM_TIME(),
&test_bootstrap_args(),
deploy_generation,
None,
)
.await
.unwrap();

let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
assert!(!initial_snapshot.is_empty());

let db_name = "db";
let db_owner = RoleId::User(42);
let db_privileges = Vec::new();
let mut txn = savepoint_state.transaction().await.unwrap();
let (db_id, db_oid) = txn
.insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
.unwrap();
txn.commit_internal().await.unwrap();
let updates = savepoint_state.sync_to_current_updates().await.unwrap();
let update = updates.into_element();

assert_eq!(update.diff, StateDiff::Addition);

let db = match update.kind {
memory::objects::StateUpdateKind::Database(db) => db,
update => panic!("unexpected update: {update:?}"),
};

assert_eq!(db_id, db.id);
assert_eq!(db_oid, db.oid);
assert_eq!(db_name, db.name);
assert_eq!(db_owner, db.owner_id);
assert_eq!(db_privileges, db.privileges);
}
}