diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 8b761e99..03107576 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -16,6 +16,18 @@ use crate::redis::KeyProvider; const RESTART_TIMEOUT_SECONDS: u64 = 600; +#[derive(Debug, PartialEq)] +pub enum ExecutorStatus { + /// Executor is running as expected + Active, + /// Executor is in an unhealthy state + Unhealthy, + /// Executor + Inactive, + /// Executor is not synchronized with the latest config + Outdated, +} + #[cfg(not(test))] use ExecutorsClientWrapperImpl as ExecutorsClientWrapper; #[cfg(test)] @@ -148,56 +160,40 @@ impl ExecutorsHandlerImpl { Ok(()) } - async fn ensure_healthy( - &self, - config: &IndexerConfig, - executor: ExecutorInfo, - ) -> anyhow::Result<()> { + fn is_healthy(&self, executor: ExecutorInfo) -> bool { if let Some(health) = executor.health { - if !matches!( + return !matches!( health.execution_state.try_into(), Ok(ExecutionState::Stalled) - ) { - return Ok(()); - } + ); } - tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds"); - - self.stop(executor.executor_id).await?; - tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; - self.start(config).await?; - - Ok(()) + false } - pub async fn synchronise(&self, config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn get_status(&self, config: &IndexerConfig) -> anyhow::Result { let executor = self .get(config.account_id.clone(), config.function_name.clone()) .await?; if let Some(executor) = executor { - if executor.version == config.get_registry_version() { - self.ensure_healthy(config, executor).await?; - return Ok(()); + if executor.version != config.get_registry_version() { + return Ok(ExecutorStatus::Outdated); } - tracing::info!( - account_id = config.account_id.as_str(), - function_name = config.function_name, - version = executor.version, - "Stopping outdated executor" - ); + if !self.is_healthy(executor) { + return Ok(ExecutorStatus::Unhealthy); + } - self.stop(executor.executor_id).await?; + return Ok(ExecutorStatus::Active); } - tracing::info!( - account_id = config.account_id.as_str(), - function_name = config.function_name, - version = config.get_registry_version(), - "Starting executor" - ); + Ok(ExecutorStatus::Inactive) + } + + pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> { + self.stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await?; self.start(config).await?; @@ -238,18 +234,63 @@ mod tests { } #[tokio::test] - async fn resumes_stopped_executors() { + async fn returns_executor_status() { + let config = IndexerConfig::default(); + let test_cases = [ + ( + Some(ExecutorInfo { + version: config.get_registry_version(), + health: None, + ..Default::default() + }), + ExecutorStatus::Unhealthy, + ), + (None, ExecutorStatus::Inactive), + ( + Some(ExecutorInfo { + version: config.get_registry_version() - 1, + ..Default::default() + }), + ExecutorStatus::Outdated, + ), + ( + Some(ExecutorInfo { + version: config.get_registry_version(), + health: Some(runner::Health { + execution_state: runner::ExecutionState::Running.into(), + }), + ..Default::default() + }), + ExecutorStatus::Active, + ), + ]; + + for (executor, expected_status) in test_cases { + let mut mock_client = ExecutorsClientWrapper::default(); + mock_client + .expect_get_executor::() + .with(always()) + .returning(move |_| { + if let Some(executor) = executor.clone() { + Ok(Response::new(executor)) + } else { + Err(tonic::Status::not_found("not found")) + } + }); + + let handler = ExecutorsHandlerImpl { + client: mock_client, + }; + + assert_eq!(handler.get_status(&config).await.unwrap(), expected_status); + } + } + + #[tokio::test] + async fn starts_executors() { let config = IndexerConfig::default(); let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_get_executor::() - .with(eq(GetExecutorRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| Err(tonic::Status::not_found("not found"))) - .once(); mock_client .expect_start_executor::() .with(eq(StartExecutorRequest { @@ -271,11 +312,11 @@ mod tests { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + handler.start(&config).await.unwrap() } #[tokio::test] - async fn reconfigures_outdated_executors() { + async fn restarts_executors() { let config = IndexerConfig::default(); let executor = ExecutorInfo { @@ -324,11 +365,11 @@ mod tests { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + handler.restart(&config).await.unwrap() } #[tokio::test] - async fn restarts_unhealthy_executors() { + async fn unhealthy_executor() { tokio::time::pause(); let config = IndexerConfig::default(); @@ -343,49 +384,17 @@ mod tests { }), }; - let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_stop_executor::() - .with(eq(StopExecutorRequest { - executor_id: executor.executor_id.clone(), - })) - .returning(|_| { - Ok(Response::new(StopExecutorResponse { - executor_id: "executor_id".to_string(), - })) - }) - .once(); - mock_client - .expect_start_executor::() - .with(eq(StartExecutorRequest { - code: config.code.clone(), - schema: config.schema.clone(), - redis_stream: config.get_redis_stream_key(), - version: config.get_registry_version(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| { - Ok(tonic::Response::new(StartExecutorResponse { - executor_id: "executor_id".to_string(), - })) - }) - .once(); - mock_client - .expect_get_executor::() - .with(always()) - .returning(move |_| Ok(Response::new(executor.clone()))) - .once(); + let mock_client = ExecutorsClientWrapper::default(); let handler = ExecutorsHandlerImpl { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + assert!(!handler.is_healthy(executor)); } #[tokio::test] - async fn ignores_healthy_executors() { + async fn healthy_executors() { tokio::time::pause(); let config = IndexerConfig::default(); @@ -408,23 +417,13 @@ mod tests { }), }; - let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_stop_executor::() - .never(); - mock_client - .expect_start_executor::() - .never(); - mock_client - .expect_get_executor::() - .with(always()) - .returning(move |_| Ok(Response::new(executor.clone()))); + let mock_client = ExecutorsClientWrapper::default(); let handler = ExecutorsHandlerImpl { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + assert!(handler.is_healthy(executor)); } } } diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index a79ff549..3d944da3 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -2,7 +2,7 @@ use tracing::{info, warn}; use crate::handlers::block_streams::{BlockStreamStatus, BlockStreamsHandler}; use crate::handlers::data_layer::DataLayerHandler; -use crate::handlers::executors::ExecutorsHandler; +use crate::handlers::executors::{ExecutorStatus, ExecutorsHandler}; use crate::indexer_config::IndexerConfig; use crate::indexer_state::{IndexerState, IndexerStateManager}; use crate::redis::{KeyProvider, RedisClient}; @@ -153,7 +153,21 @@ impl<'a> LifecycleManager<'a> { state.block_stream_synced_at = Some(config.get_registry_version()); - if let Err(error) = self.executors_handler.synchronise(config).await { + let executor_status = match self.executors_handler.get_status(config).await { + Ok(status) => status, + Err(error) => { + warn!(?error, "Failed to synchronise executor"); + return LifecycleState::Running; + } + }; + + if let Err(error) = match executor_status { + ExecutorStatus::Active => Ok(()), + ExecutorStatus::Inactive => self.executors_handler.start(config).await, + ExecutorStatus::Unhealthy | ExecutorStatus::Outdated => { + self.executors_handler.restart(config).await + } + } { warn!(?error, "Failed to synchronise executor, retrying..."); return LifecycleState::Running; } @@ -655,7 +669,9 @@ mod tests { .returning(|_, _| Ok(BlockStreamStatus::Active)); let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_synchronise().returning(|_| Ok(())); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); let data_layer_handler = DataLayerHandler::default(); let state_manager = IndexerStateManager::default(); @@ -696,7 +712,9 @@ mod tests { .once(); let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_synchronise().returning(|_| Ok(())); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); let data_layer_handler = DataLayerHandler::default(); let state_manager = IndexerStateManager::default(); @@ -737,7 +755,9 @@ mod tests { .once(); let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_synchronise().returning(|_| Ok(())); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); let data_layer_handler = DataLayerHandler::default(); let state_manager = IndexerStateManager::default(); @@ -778,7 +798,9 @@ mod tests { .once(); let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_synchronise().returning(|_| Ok(())); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); let data_layer_handler = DataLayerHandler::default(); let state_manager = IndexerStateManager::default(); @@ -819,7 +841,177 @@ mod tests { .once(); let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_synchronise().returning(|_| Ok(())); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn ignores_active_executors() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn starts_inactive_executors() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Inactive)); + executors_handler + .expect_start() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn restarts_unhealthy_executor() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Unhealthy)); + executors_handler + .expect_restart() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn restarts_outdated_executor() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Outdated)); + executors_handler + .expect_restart() + .returning(|_| Ok(())) + .once(); let data_layer_handler = DataLayerHandler::default(); let state_manager = IndexerStateManager::default();