Skip to content

Commit

Permalink
chore: Add additional logs for state transitions (#989)
Browse files Browse the repository at this point in the history
I added a little bit more logging to the lifecycle state changes, as
well as renamed the `Stopping/Stopped` states to `Suspending/Suspended`
states to more accurately capture the intention.
  • Loading branch information
darunrs authored Aug 9, 2024
1 parent 44bc2eb commit 4af58bc
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 23 deletions.
72 changes: 69 additions & 3 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ pub enum ProvisionedState {
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum OldLifecycleState {
Stopping,
Stopped,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
pub provisioned_state: ProvisionedState,
pub lifecycle_state: OldLifecycleState,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -75,15 +80,23 @@ impl IndexerStateManagerImpl {
tracing::info!("Migrating {}", raw_state);

let old_state: OldIndexerState = serde_json::from_str(&raw_state)?;
let migrated_lifecycle_state =
if old_state.lifecycle_state == OldLifecycleState::Stopping {
LifecycleState::Suspending
} else if old_state.lifecycle_state == OldLifecycleState::Stopped {
LifecycleState::Suspended
} else {
tracing::warn!("Unknown lifecycle state: {:?}", old_state.lifecycle_state);
continue;
};

let state = IndexerState {
account_id: old_state.account_id,
function_name: old_state.function_name,
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
lifecycle_state: LifecycleState::Running,
lifecycle_state: migrated_lifecycle_state,
};

self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;
Expand Down Expand Up @@ -182,6 +195,59 @@ mod tests {
use mockall::predicate;
use registry_types::{Rule, StartBlock, Status};

#[tokio::test]
async fn migrate_state() {
let mut mock_redis_client = RedisClient::default();
let valid_state = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_1", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string();
let valid_state_two = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_2", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Running" }).to_string();
let state_to_migrate_stopping = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopping", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopping" }).to_string();
let state_to_migrate_stopped = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopped", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopped" }).to_string();
let migrated_suspending = IndexerState {
account_id: "morgs.near".parse().unwrap(),
function_name: "test_migrate_stopping".to_string(),
block_stream_synced_at: Some(200),
enabled: true,
lifecycle_state: LifecycleState::Suspending,
};
let migrated_suspended = IndexerState {
account_id: "morgs.near".parse().unwrap(),
function_name: "test_migrate_stopped".to_string(),
block_stream_synced_at: Some(200),
enabled: true,
lifecycle_state: LifecycleState::Suspended,
};
mock_redis_client
.expect_list_indexer_states()
.returning(move || {
Ok(vec![
valid_state.clone(),
valid_state_two.clone(),
state_to_migrate_stopping.clone(),
state_to_migrate_stopped.clone(),
])
})
.once();
mock_redis_client
.expect_set::<String, String>()
.with(
predicate::eq(migrated_suspending.get_state_key()),
predicate::eq(serde_json::to_string(&migrated_suspending).unwrap()),
)
.returning(|_, _| Ok(()))
.once();
mock_redis_client
.expect_set::<String, String>()
.with(
predicate::eq(migrated_suspended.get_state_key()),
predicate::eq(serde_json::to_string(&migrated_suspended).unwrap()),
)
.returning(|_, _| Ok(()))
.once();

let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client);
let _ = indexer_manager.migrate().await;
}

#[tokio::test]
async fn list_indexer_states() {
let mut mock_redis_client = RedisClient::default();
Expand Down
44 changes: 24 additions & 20 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ pub enum LifecycleState {
/// they are running the latest version of the Indexer.
///
/// Transitions:
/// - `Stopping` if suspended
/// - `Suspending` if suspended
/// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a
/// retry
/// - `Running` on success
Running,
/// Indexer is being stopped, Block Stream and Executors are being stopped.
/// Indexer is being suspended, Block Stream and Executors are being stopped.
///
/// Transitions:
/// - `Stopping` on failure, triggering a retry
/// - `Stopped` on success
Stopping,
/// Indexer is stopped, Block Stream and Executors are not running.
/// - `Suspending` on failure, triggering a retry
/// - `Suspended` on success
Suspending,
/// Indexer is suspended, Block Stream and Executors are not running.
///
/// Transitions:
/// - `Running` if unsuspended
Stopped,
Suspended,
/// Indexer is in a bad state, currently requires manual intervention, but should eventually
/// self heal. This is a dead-end state
///
Expand Down Expand Up @@ -103,6 +103,7 @@ impl<'a> LifecycleManager<'a> {
.await
.is_err()
{
tracing::warn!("Failed to provision data layer");
return LifecycleState::Repairing;
}

Expand All @@ -120,7 +121,7 @@ impl<'a> LifecycleManager<'a> {
}

if !state.enabled {
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

if let Err(error) = self
Expand All @@ -129,23 +130,21 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to synchronise block stream, retrying...");

return LifecycleState::Running;
}

state.block_stream_synced_at = Some(config.get_registry_version());

if let Err(error) = self.executors_handler.synchronise(config).await {
warn!(?error, "Failed to synchronise executor, retrying...");

return LifecycleState::Running;
}

LifecycleState::Running
}

#[tracing::instrument(name = "stopping", skip_all)]
async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState {
#[tracing::instrument(name = "suspending", skip_all)]
async fn handle_suspending(&self, config: &IndexerConfig) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}
Expand All @@ -156,7 +155,7 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to stop block stream, retrying...");
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

if let Err(error) = self
Expand All @@ -165,25 +164,30 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to stop executor, retrying...");
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

LifecycleState::Stopped
LifecycleState::Suspended
}

#[tracing::instrument(name = "stopped", skip_all)]
async fn handle_stopped(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState {
#[tracing::instrument(name = "suspended", skip_all)]
async fn handle_suspended(
&self,
config: &IndexerConfig,
state: &IndexerState,
) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}

// TODO Transistion to `Running` on config update

if state.enabled {
tracing::debug!("Suspended indexer was reactivated");
return LifecycleState::Running;
}

LifecycleState::Stopped
LifecycleState::Suspended
}

#[tracing::instrument(name = "repairing", skip_all)]
Expand Down Expand Up @@ -299,8 +303,8 @@ impl<'a> LifecycleManager<'a> {
let desired_lifecycle_state = match state.lifecycle_state {
LifecycleState::Initializing => self.handle_initializing(&config, &state).await,
LifecycleState::Running => self.handle_running(&config, &mut state).await,
LifecycleState::Stopping => self.handle_stopping(&config).await,
LifecycleState::Stopped => self.handle_stopped(&config, &state).await,
LifecycleState::Suspending => self.handle_suspending(&config).await,
LifecycleState::Suspended => self.handle_suspended(&config, &state).await,
LifecycleState::Repairing => self.handle_repairing(&config, &state).await,
LifecycleState::Deleting => self.handle_deleting(&state).await,
LifecycleState::Deleted => LifecycleState::Deleted,
Expand Down

0 comments on commit 4af58bc

Please sign in to comment.