From 8e4235d6b1ffd0aff3e442be4c9f7d5f90062cd7 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 15 Aug 2024 10:28:21 +1200 Subject: [PATCH] feat: Delay restarting unhealthy streams/executors --- coordinator/src/handlers/block_streams.rs | 2 -- coordinator/src/handlers/executors.rs | 2 -- coordinator/src/lifecycle.rs | 10 ++++++++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index a73a6368..b549fdea 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -18,8 +18,6 @@ use tonic::transport::channel::Channel; use crate::indexer_config::IndexerConfig; use crate::redis::{KeyProvider, RedisClient}; -const RESTART_TIMEOUT_SECONDS: u64 = 600; - #[derive(Debug, PartialEq)] pub enum BlockStreamStatus { /// Block Stream is running as expected diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 03107576..389b00de 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -14,8 +14,6 @@ use tonic::transport::channel::Channel; use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; -const RESTART_TIMEOUT_SECONDS: u64 = 600; - #[derive(Debug, PartialEq)] pub enum ExecutorStatus { /// Executor is running as expected diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index 3d944da3..3d29967e 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -9,6 +9,7 @@ use crate::redis::{KeyProvider, RedisClient}; use crate::registry::Registry; const LOOP_THROTTLE_MS: u64 = 1000; +const RESTART_TIMEOUT_SECONDS: u64 = 600; /// Represents the different lifecycle states of an Indexer #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -138,9 +139,12 @@ impl<'a> LifecycleManager<'a> { if let Err(error) = match stream_status { BlockStreamStatus::Active => Ok(()), - BlockStreamStatus::Unhealthy => self.block_streams_handler.restart(config).await, BlockStreamStatus::Inactive => self.block_streams_handler.resume(config).await, BlockStreamStatus::Outdated => self.block_streams_handler.reconfigure(config).await, + BlockStreamStatus::Unhealthy => { + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; + self.block_streams_handler.restart(config).await + } BlockStreamStatus::NotStarted => { self.block_streams_handler .start_new_block_stream(config) @@ -164,7 +168,9 @@ impl<'a> LifecycleManager<'a> { if let Err(error) = match executor_status { ExecutorStatus::Active => Ok(()), ExecutorStatus::Inactive => self.executors_handler.start(config).await, - ExecutorStatus::Unhealthy | ExecutorStatus::Outdated => { + ExecutorStatus::Outdated => self.executors_handler.restart(config).await, + ExecutorStatus::Unhealthy => { + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; self.executors_handler.restart(config).await } } {