diff --git a/src/environmentd/src/deployment/preflight.rs b/src/environmentd/src/deployment/preflight.rs index 54ffa70f12a67..7d27a991050c8 100644 --- a/src/environmentd/src/deployment/preflight.rs +++ b/src/environmentd/src/deployment/preflight.rs @@ -63,7 +63,6 @@ pub async fn preflight_legacy( if !openable_adapter_storage.is_initialized().await? { tracing::info!("Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader"); - deployment_state.set_is_leader(); return Ok(openable_adapter_storage); } let catalog_generation = openable_adapter_storage.get_deployment_generation().await?; @@ -118,7 +117,6 @@ pub async fn preflight_legacy( .await?) } else if catalog_generation == deploy_generation { tracing::info!("Server requested generation {deploy_generation} which is equal to catalog's generation"); - deployment_state.set_is_leader(); Ok(openable_adapter_storage) } else { mz_ore::halt!("Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically"); @@ -146,8 +144,7 @@ pub async fn preflight_0dt( info!(%deploy_generation, ?hydration_max_wait, "performing 0dt preflight checks"); if !openable_adapter_storage.is_initialized().await? { - info!("catalog not initialized; booting as leader with writes allowed"); - deployment_state.set_is_leader(); + info!("catalog not initialized; booting with writes allowed"); return Ok(PreflightOutput { openable_adapter_storage, read_only: false, @@ -219,7 +216,6 @@ pub async fn preflight_0dt( }) } else if catalog_generation == deploy_generation { info!("this deployment is the current generation; booting with writes allowed"); - deployment_state.set_is_leader(); Ok(PreflightOutput { openable_adapter_storage, read_only: false, diff --git a/src/environmentd/src/deployment/state.rs b/src/environmentd/src/deployment/state.rs index 10a70106999d4..e955805a06cdd 100644 --- a/src/environmentd/src/deployment/state.rs +++ b/src/environmentd/src/deployment/state.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; enum DeploymentStateInner { Initializing, ReadyToPromote { _promote_trigger: Trigger }, + Promoting, IsLeader, } @@ -42,6 +43,7 @@ enum DeploymentStateInner { /// `set_ready_to_promote` will resolve when promotion has occurred and the /// deployment should take over from the prior generation and begin serving /// queries. +#[derive(Clone)] pub struct DeploymentState { inner: Arc>, } @@ -63,7 +65,7 @@ impl DeploymentState { /// /// Returns a future that resolves when the leadership promotion occurs. /// When the function returns, the state will be `ReadyToPromote`. When the - /// returned future resolves, the state will be `IsLeader`. + /// returned future resolves, the state will be `Promoting`. /// /// Panics if the leader state is not `Initializing`. pub fn set_ready_to_promote(&self) -> impl Future { @@ -83,11 +85,14 @@ impl DeploymentState { /// Marks the deployment as the leader. /// - /// Panics if the leader state is not `Initializing`. + /// Panics if the leader state is not `Initializing` or `Promoting`. pub fn set_is_leader(&self) { let mut inner = self.inner.lock().expect("lock poisoned"); assert!( - matches!(*inner, DeploymentStateInner::Initializing), + matches!( + *inner, + DeploymentStateInner::Initializing | DeploymentStateInner::Promoting + ), "LeaderState::set_is_leader called on non-initializing state", ); *inner = DeploymentStateInner::IsLeader; @@ -112,6 +117,7 @@ impl DeploymentStateHandle { match *inner { DeploymentStateInner::Initializing => DeploymentStatus::Initializing, DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote, + DeploymentStateInner::Promoting => DeploymentStatus::Promoting, DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader, } } @@ -119,8 +125,8 @@ impl DeploymentStateHandle { /// Attempts to promote this deployment to the leader. /// /// Deployments in the `Initializing` state cannot be promoted. Deployments - /// in the `ReadyToPromote` state or `IsLeader` state can be promoted (with - /// the latter case being a no-op). + /// in the `ReadyToPromote`, `Promoting`, and `IsLeader` states can be + /// promoted (with the latter two cases being no-ops). /// /// If the leader was successfully promoted, returns `Ok`. Otherwise, /// returns `Err`. @@ -129,9 +135,10 @@ impl DeploymentStateHandle { match *inner { DeploymentStateInner::Initializing => Err(()), DeploymentStateInner::ReadyToPromote { .. } => { - *inner = DeploymentStateInner::IsLeader; + *inner = DeploymentStateInner::Promoting; Ok(()) } + DeploymentStateInner::Promoting => Ok(()), DeploymentStateInner::IsLeader => Ok(()), } } @@ -148,6 +155,8 @@ pub enum DeploymentStatus { Initializing, /// This deployment is not the leader, but it is ready to become the leader. ReadyToPromote, + /// This deployment is in the process of becoming the leader. + Promoting, /// This deployment is the leader. IsLeader, } diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index 325258bafd65b..de54d6400fd47 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -445,7 +445,7 @@ impl Listeners { .clone(), bootstrap_role: config.bootstrap_role.clone(), deploy_generation: config.controller.deploy_generation, - deployment_state, + deployment_state: deployment_state.clone(), openable_adapter_storage, catalog_metrics: Arc::clone(&config.catalog_config.metrics), hydration_max_wait: with_0dt_deployment_max_wait, @@ -466,29 +466,42 @@ impl Listeners { bootstrap_role: config.bootstrap_role, }; + // Load the adapter durable storage. let adapter_storage = if read_only { // TODO: behavior of migrations when booting in savepoint mode is // not well defined. - openable_adapter_storage + let adapter_storage = openable_adapter_storage .open_savepoint( boot_ts, &bootstrap_args, config.controller.deploy_generation, None, ) - .await? + .await?; + + // In read-only mode, we intentionally do not call `set_is_leader`, + // because we are by definition not the leader if we are in + // read-only mode. + + adapter_storage } else { - openable_adapter_storage + let adapter_storage = openable_adapter_storage .open( boot_ts, &bootstrap_args, config.controller.deploy_generation, None, ) - .await? + .await?; + + // Once we have successfully opened the adapter storage in + // read/write mode, we can announce we are the leader, as we've + // fenced out all other environments using the adapter storage. + deployment_state.set_is_leader(); + + adapter_storage }; - // Load the adapter catalog from disk. if !config .cluster_replica_sizes .0