From 4af58bc82a964f7b1342c9216547fcc23ad1d8f3 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 9 Aug 2024 10:31:31 -0700 Subject: [PATCH] chore: Add additional logs for state transitions (#989) I added a little bit more logging to the lifecycle state changes, as well as renamed the `Stopping/Stopped` states to `Suspending/Suspended` states to more accurately capture the intention. --- coordinator/src/indexer_state.rs | 72 ++++++++++++++++++++++++++++++-- coordinator/src/lifecycle.rs | 44 ++++++++++--------- 2 files changed, 93 insertions(+), 23 deletions(-) diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 9e7b7a34..8fdb9ff8 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -16,13 +16,18 @@ pub enum ProvisionedState { Failed, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum OldLifecycleState { + Stopping, + Stopped, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct OldIndexerState { pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, pub enabled: bool, - pub provisioned_state: ProvisionedState, + pub lifecycle_state: OldLifecycleState, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -75,15 +80,23 @@ impl IndexerStateManagerImpl { tracing::info!("Migrating {}", raw_state); let old_state: OldIndexerState = serde_json::from_str(&raw_state)?; + let migrated_lifecycle_state = + if old_state.lifecycle_state == OldLifecycleState::Stopping { + LifecycleState::Suspending + } else if old_state.lifecycle_state == OldLifecycleState::Stopped { + LifecycleState::Suspended + } else { + tracing::warn!("Unknown lifecycle state: {:?}", old_state.lifecycle_state); + continue; + }; let state = IndexerState { account_id: old_state.account_id, function_name: old_state.function_name, block_stream_synced_at: old_state.block_stream_synced_at, enabled: old_state.enabled, - lifecycle_state: LifecycleState::Running, + lifecycle_state: migrated_lifecycle_state, }; - self.redis_client .set(state.get_state_key(), serde_json::to_string(&state)?) .await?; @@ -182,6 +195,59 @@ mod tests { use mockall::predicate; use registry_types::{Rule, StartBlock, Status}; + #[tokio::test] + async fn migrate_state() { + let mut mock_redis_client = RedisClient::default(); + let valid_state = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_1", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string(); + let valid_state_two = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_2", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Running" }).to_string(); + let state_to_migrate_stopping = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopping", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopping" }).to_string(); + let state_to_migrate_stopped = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopped", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopped" }).to_string(); + let migrated_suspending = IndexerState { + account_id: "morgs.near".parse().unwrap(), + function_name: "test_migrate_stopping".to_string(), + block_stream_synced_at: Some(200), + enabled: true, + lifecycle_state: LifecycleState::Suspending, + }; + let migrated_suspended = IndexerState { + account_id: "morgs.near".parse().unwrap(), + function_name: "test_migrate_stopped".to_string(), + block_stream_synced_at: Some(200), + enabled: true, + lifecycle_state: LifecycleState::Suspended, + }; + mock_redis_client + .expect_list_indexer_states() + .returning(move || { + Ok(vec![ + valid_state.clone(), + valid_state_two.clone(), + state_to_migrate_stopping.clone(), + state_to_migrate_stopped.clone(), + ]) + }) + .once(); + mock_redis_client + .expect_set::() + .with( + predicate::eq(migrated_suspending.get_state_key()), + predicate::eq(serde_json::to_string(&migrated_suspending).unwrap()), + ) + .returning(|_, _| Ok(())) + .once(); + mock_redis_client + .expect_set::() + .with( + predicate::eq(migrated_suspended.get_state_key()), + predicate::eq(serde_json::to_string(&migrated_suspended).unwrap()), + ) + .returning(|_, _| Ok(())) + .once(); + + let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client); + let _ = indexer_manager.migrate().await; + } + #[tokio::test] async fn list_indexer_states() { let mut mock_redis_client = RedisClient::default(); diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index 32fb2f9f..11bb1cfa 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -24,22 +24,22 @@ pub enum LifecycleState { /// they are running the latest version of the Indexer. /// /// Transitions: - /// - `Stopping` if suspended + /// - `Suspending` if suspended /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a /// retry /// - `Running` on success Running, - /// Indexer is being stopped, Block Stream and Executors are being stopped. + /// Indexer is being suspended, Block Stream and Executors are being stopped. /// /// Transitions: - /// - `Stopping` on failure, triggering a retry - /// - `Stopped` on success - Stopping, - /// Indexer is stopped, Block Stream and Executors are not running. + /// - `Suspending` on failure, triggering a retry + /// - `Suspended` on success + Suspending, + /// Indexer is suspended, Block Stream and Executors are not running. /// /// Transitions: /// - `Running` if unsuspended - Stopped, + Suspended, /// Indexer is in a bad state, currently requires manual intervention, but should eventually /// self heal. This is a dead-end state /// @@ -103,6 +103,7 @@ impl<'a> LifecycleManager<'a> { .await .is_err() { + tracing::warn!("Failed to provision data layer"); return LifecycleState::Repairing; } @@ -120,7 +121,7 @@ impl<'a> LifecycleManager<'a> { } if !state.enabled { - return LifecycleState::Stopping; + return LifecycleState::Suspending; } if let Err(error) = self @@ -129,7 +130,6 @@ impl<'a> LifecycleManager<'a> { .await { warn!(?error, "Failed to synchronise block stream, retrying..."); - return LifecycleState::Running; } @@ -137,15 +137,14 @@ impl<'a> LifecycleManager<'a> { if let Err(error) = self.executors_handler.synchronise(config).await { warn!(?error, "Failed to synchronise executor, retrying..."); - return LifecycleState::Running; } LifecycleState::Running } - #[tracing::instrument(name = "stopping", skip_all)] - async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState { + #[tracing::instrument(name = "suspending", skip_all)] + async fn handle_suspending(&self, config: &IndexerConfig) -> LifecycleState { if config.is_deleted() { return LifecycleState::Deleting; } @@ -156,7 +155,7 @@ impl<'a> LifecycleManager<'a> { .await { warn!(?error, "Failed to stop block stream, retrying..."); - return LifecycleState::Stopping; + return LifecycleState::Suspending; } if let Err(error) = self @@ -165,14 +164,18 @@ impl<'a> LifecycleManager<'a> { .await { warn!(?error, "Failed to stop executor, retrying..."); - return LifecycleState::Stopping; + return LifecycleState::Suspending; } - LifecycleState::Stopped + LifecycleState::Suspended } - #[tracing::instrument(name = "stopped", skip_all)] - async fn handle_stopped(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState { + #[tracing::instrument(name = "suspended", skip_all)] + async fn handle_suspended( + &self, + config: &IndexerConfig, + state: &IndexerState, + ) -> LifecycleState { if config.is_deleted() { return LifecycleState::Deleting; } @@ -180,10 +183,11 @@ impl<'a> LifecycleManager<'a> { // TODO Transistion to `Running` on config update if state.enabled { + tracing::debug!("Suspended indexer was reactivated"); return LifecycleState::Running; } - LifecycleState::Stopped + LifecycleState::Suspended } #[tracing::instrument(name = "repairing", skip_all)] @@ -299,8 +303,8 @@ impl<'a> LifecycleManager<'a> { let desired_lifecycle_state = match state.lifecycle_state { LifecycleState::Initializing => self.handle_initializing(&config, &state).await, LifecycleState::Running => self.handle_running(&config, &mut state).await, - LifecycleState::Stopping => self.handle_stopping(&config).await, - LifecycleState::Stopped => self.handle_stopped(&config, &state).await, + LifecycleState::Suspending => self.handle_suspending(&config).await, + LifecycleState::Suspended => self.handle_suspended(&config, &state).await, LifecycleState::Repairing => self.handle_repairing(&config, &state).await, LifecycleState::Deleting => self.handle_deleting(&state).await, LifecycleState::Deleted => LifecycleState::Deleted,