Skip to content

Commit

Permalink
adapter: Create deterministic log index IDs
Browse files Browse the repository at this point in the history
This commit adds a new variant of `GlobalId` and `CatalogItemId` for
introspection source indexes. The values of these IDs are
deterministically derived from the cluster ID and the log variant.

Introspection source indexes are a special edge case of items. They are
considered system items, but they are the only system item that can be
created by the user at any time. All other system items can only be
created by the system during the startup of an upgrade.

Previously, it was possible to allocate the same System ID to two
different objects if something like the following happened:

1. Materialize version `v` is running in read-write mode.
2. Materialize version `v + 1` starts in read-only mode.
3. The next system item ID is `s`.
4. `v + 1` allocates `s` for a new system item (table, view,
   introspection source, etc.)
5. `v` creates a new user cluster and allocates `s` through `s + n` to
   the introspection source indexes in that cluster. At this point we
   have two separate objects with the same Global ID, which is bad.
6. `v + 1` reboots in read-write mode and allocates `s + n + 1` to the
   new system item. At this point the new system item has received two
   different IDs, which is also bad.

Putting introspection source index IDs in their own namespace and
making them deterministic removes this issue and ones like it.

Fixes #MaterializeInc/database-issues/issues/8731
  • Loading branch information
jkosh44 committed Nov 25, 2024
1 parent 2141dfd commit ac3a528
Show file tree
Hide file tree
Showing 30 changed files with 1,902 additions and 125 deletions.
59 changes: 50 additions & 9 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,51 @@ impl Catalog {
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let environmentd_id = Uuid::new_v4();
let organization_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let catalog = Self::open_debug_catalog(persist_client, environmentd_id, &bootstrap_args)
let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
.await
.expect("can open debug catalog");
f(catalog).await
}

/// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
/// in progress.
pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
where
F: FnOnce(Catalog) -> Fut,
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let mut catalog =
Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
.await
.expect("can open debug catalog");

// Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
let now = SYSTEM_TIME.clone();
let openable_storage = TestCatalogStateBuilder::new(persist_client)
.with_organization_id(organization_id)
.with_default_deploy_generation()
.build()
.await
.expect("can create durable catalog");
let mut storage = openable_storage
.open(now(), &bootstrap_args)
.await
.expect("can open durable catalog");
// Drain updates.
let _ = storage
.sync_to_current_updates()
.await
.expect("can sync to current updates");
catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));

f(catalog).await
}

/// Opens a debug catalog.
///
/// See [`Catalog::with_debug`].
Expand Down Expand Up @@ -718,13 +755,17 @@ impl Catalog {
#[cfg(test)]
pub async fn allocate_system_id(&self) -> Result<(CatalogItemId, GlobalId), Error> {
use mz_ore::collections::CollectionExt;
self.storage()
.await
.allocate_system_ids(1)
.await
.maybe_terminate("allocating system ids")
.map(|ids| ids.into_element())
.err_into()

let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;
let id = txn
.allocate_system_item_ids(1)
.maybe_terminate("allocating system ids")?
.into_element();
// Drain transaction.
let _ = txn.get_and_commit_op_updates();
txn.commit().await?;
Ok(id)
}

/// Get the next system item ID without allocating it.
Expand Down
25 changes: 11 additions & 14 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
use mz_catalog::durable::objects::{
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
};
use mz_catalog::durable::{
ClusterVariant, ClusterVariantManaged, Transaction, SYSTEM_CLUSTER_ID_ALLOC_KEY,
};
use mz_catalog::durable::{ClusterVariant, ClusterVariantManaged, Transaction};
use mz_catalog::expr_cache::{
ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
};
Expand Down Expand Up @@ -632,6 +630,8 @@ impl Catalog {
);
}

catalog.storage().await.mark_bootstrap_complete();

Ok(OpenCatalogResult {
catalog,
storage_collections_to_drop,
Expand Down Expand Up @@ -772,6 +772,10 @@ impl Catalog {

let (new_item_id, new_global_id) = match id {
CatalogItemId::System(_) => txn.allocate_system_item_ids(1)?.into_element(),
CatalogItemId::IntrospectionSourceIndex(id) => (
CatalogItemId::IntrospectionSourceIndex(id),
GlobalId::IntrospectionSourceIndex(id),
),
CatalogItemId::User(_) => txn.allocate_user_item_ids(1)?.into_element(),
_ => unreachable!("can't migrate id: {id}"),
};
Expand Down Expand Up @@ -1222,10 +1226,7 @@ fn add_new_builtin_clusters_migration(
if !cluster_names.contains(builtin_cluster.name) {
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
let id = txn.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
let id = ClusterId::System(id);
txn.insert_system_cluster(
id,
builtin_cluster.name,
vec![],
builtin_cluster.privileges.to_vec(),
Expand Down Expand Up @@ -1265,13 +1266,9 @@ fn add_new_remove_old_builtin_introspection_source_migration(
}
}

let new_ids = txn.allocate_system_item_ids(usize_to_u64(new_logs.len()))?;
let new_entries = new_logs
.into_iter()
.zip_eq(new_ids)
.map(|(log, (item_id, gid))| (log, item_id, gid));

for (log, item_id, gid) in new_entries {
for log in new_logs {
let (item_id, gid) =
Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
}

Expand Down Expand Up @@ -1757,7 +1754,7 @@ mod builtin_migration_tests {
}

async fn run_test_case(test_case: BuiltinMigrationTestCase) {
Catalog::with_debug(|mut catalog| async move {
Catalog::with_debug_in_bootstrap(|mut catalog| async move {
let mut item_id_mapping = BTreeMap::new();
let mut name_mapping = BTreeMap::new();

Expand Down
15 changes: 11 additions & 4 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use mz_catalog::memory::objects::{
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::cast::usize_to_u64;
use mz_ore::collections::HashSet;
use mz_ore::instrument;
use mz_ore::now::EpochMillis;
Expand All @@ -56,6 +55,7 @@ use mz_sql::session::vars::{Value as VarValue, VarInput};
use mz_sql::{rbac, DEFAULT_SCHEMA};
use mz_sql_parser::ast::{QualifiedReplica, Value};
use mz_storage_client::controller::StorageController;
use timely::Container;
use tracing::{info, trace};

use crate::catalog::{
Expand Down Expand Up @@ -903,12 +903,19 @@ impl Catalog {
let privileges: Vec<_> =
merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
.collect();
let introspection_source_ids =
tx.allocate_system_item_ids(usize_to_u64(introspection_sources.len()))?;
let introspection_source_ids: Vec<_> = introspection_sources
.iter()
.map(|introspection_source| {
Transaction::allocate_introspection_source_index_id(
&id,
introspection_source.variant,
)
})
.collect();

let introspection_sources = introspection_sources
.into_iter()
.zip_eq(introspection_source_ids.into_iter())
.zip_eq(introspection_source_ids)
.map(|(log, (item_id, gid))| (log, item_id, gid))
.collect();

Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,8 @@ impl Coordinator {
let id_too_large = match id {
CatalogItemId::System(id) => *id >= next_system_item_id,
CatalogItemId::User(id) => *id >= next_user_item_id,
CatalogItemId::Transient(_) => false,
CatalogItemId::IntrospectionSourceIndex(_)
| CatalogItemId::Transient(_) => false,
};
if id_too_large {
info!(
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v72.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v73.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "2d781c72c4a56b13dfb1b4215f3614f0"
"md5": "9404d316bbf1ffa30c1fe7b37180c936"
},
{
"name": "objects_v67.proto",
Expand All @@ -26,5 +26,9 @@
{
"name": "objects_v72.proto",
"md5": "b21cb2b1b41649c78405731e53560d59"
},
{
"name": "objects_v73.proto",
"md5": "4e64aa50e0cc6dc6cb9e5ef05be0dc08"
}
]
23 changes: 18 additions & 5 deletions src/catalog/protos/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ message ClusterIntrospectionSourceIndexKey {
}

message ClusterIntrospectionSourceIndexValue {
// TODO(parkmycar): Ideally this is a SystemCatalogItemId but making this change panics 0dt
// TODO(parkmycar): Ideally this is a IntrospectionSourceCatalogItemId but making this change panics 0dt
// upgrades if there were new builtin objects added since the older version of Materialize
// doesn't know how to read the new SystemCatalogItemId type.
// doesn't know how to read the new IntrospectionSourceCatalogItemId type.
uint64 index_id = 1;
uint32 oid = 2;
SystemGlobalId global_id = 3;
IntrospectionSourceIndexGlobalId global_id = 3;
}

message ClusterReplicaKey {
Expand Down Expand Up @@ -307,6 +307,8 @@ message CatalogItemId {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
// This needs to be 5 to match the GlobalId tag.
uint64 introspection_source_index = 5;
}
}

Expand All @@ -315,12 +317,18 @@ message SystemCatalogItemId {
uint64 value = 1;
}

/// A newtype wrapper for a `CatalogItemId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexCatalogItemId {
uint64 value = 1;
}

message GlobalId {
oneof value {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
Empty explain = 4;
uint64 introspection_source_index = 5;
}
}

Expand All @@ -329,10 +337,15 @@ message SystemGlobalId {
uint64 value = 1;
}

/// A newtype wrapper for a `GlobalId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexGlobalId {
uint64 value = 1;
}

message ClusterId {
oneof value {
uint64 system = 1;
uint64 user = 2;
uint32 system = 1;
uint32 user = 2;
}
}

Expand Down
Loading

0 comments on commit ac3a528

Please sign in to comment.