Skip to content

Commit

Permalink
feat: Delay restarting unhealthy streams/executors
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Aug 14, 2024
1 parent 7fc6666 commit 8e4235d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
2 changes: 0 additions & 2 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use tonic::transport::channel::Channel;
use crate::indexer_config::IndexerConfig;
use crate::redis::{KeyProvider, RedisClient};

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Debug, PartialEq)]
pub enum BlockStreamStatus {
/// Block Stream is running as expected
Expand Down
2 changes: 0 additions & 2 deletions coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use tonic::transport::channel::Channel;
use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Debug, PartialEq)]
pub enum ExecutorStatus {
/// Executor is running as expected
Expand Down
10 changes: 8 additions & 2 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::redis::{KeyProvider, RedisClient};
use crate::registry::Registry;

const LOOP_THROTTLE_MS: u64 = 1000;
const RESTART_TIMEOUT_SECONDS: u64 = 600;

/// Represents the different lifecycle states of an Indexer
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -138,9 +139,12 @@ impl<'a> LifecycleManager<'a> {

if let Err(error) = match stream_status {
BlockStreamStatus::Active => Ok(()),
BlockStreamStatus::Unhealthy => self.block_streams_handler.restart(config).await,
BlockStreamStatus::Inactive => self.block_streams_handler.resume(config).await,
BlockStreamStatus::Outdated => self.block_streams_handler.reconfigure(config).await,
BlockStreamStatus::Unhealthy => {
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.block_streams_handler.restart(config).await
}
BlockStreamStatus::NotStarted => {
self.block_streams_handler
.start_new_block_stream(config)
Expand All @@ -164,7 +168,9 @@ impl<'a> LifecycleManager<'a> {
if let Err(error) = match executor_status {
ExecutorStatus::Active => Ok(()),
ExecutorStatus::Inactive => self.executors_handler.start(config).await,
ExecutorStatus::Unhealthy | ExecutorStatus::Outdated => {
ExecutorStatus::Outdated => self.executors_handler.restart(config).await,
ExecutorStatus::Unhealthy => {
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.executors_handler.restart(config).await
}
} {
Expand Down

0 comments on commit 8e4235d

Please sign in to comment.