diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index 9f71149b..01bb88a8 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -19,6 +19,8 @@ use crate::indexer_config::IndexerConfig; use crate::redis::{KeyProvider, RedisClient}; use crate::utils::exponential_retry; +const RESTART_TIMEOUT_SECONDS: u64 = 600; + #[derive(Clone)] pub struct BlockStreamsHandler { client: BlockStreamerClient, @@ -258,11 +260,13 @@ impl BlockStreamsHandler { tracing::info!(stale, stalled, "Restarting stalled block stream"); } } else { - tracing::info!("Restarting stalled block stream"); + tracing::info!( + "Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds" + ); } self.stop(block_stream.stream_id.clone()).await?; - + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; let height = self.get_continuation_block_height(config).await?; self.start(height, config).await?; diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 5b2a5a4c..ff2e2657 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -14,7 +14,7 @@ use crate::indexer_config::IndexerConfig; type TaskId = String; -const TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes +const TASK_TIMEOUT_SECONDS: u64 = 600; // 10 minutes #[derive(Clone)] pub struct DataLayerHandler { diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 4e12ef26..184718d0 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -16,6 +16,8 @@ use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; use crate::utils::exponential_retry; +const RESTART_TIMEOUT_SECONDS: u64 = 600; + #[derive(Clone)] pub struct ExecutorsHandler { client: RunnerClient, @@ -136,9 +138,10 @@ impl ExecutorsHandler { } } - tracing::info!("Restarting stalled executor"); + tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds"); self.stop(executor.executor_id).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; self.start(config).await?; Ok(())