From ee5845b0da6a8f22f36f3ee6c8606d7352b7fdb5 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 18 Jul 2024 15:17:05 +1200 Subject: [PATCH] feat: Restart stalled Block Streams & Executors (#891) Coordinator will now continuously monitor Block Stream and Executor health, and restart them if they are "stalled". The goal here is to avoid the need for manually intervening on stopped processes. Stalled means slightly different things for each: - Block Stream - When it is able to, is not actively processing blocks - Executors - An uncaught error was encountered, causing the thread to exit --- coordinator/src/handlers/block_streams.rs | 33 ++++++++++++++++++++++- coordinator/src/handlers/executors.rs | 28 ++++++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index bc34c000..93878d61 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -1,12 +1,14 @@ #![cfg_attr(test, allow(dead_code))] +use std::time::{Duration, SystemTime}; + pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, - ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, + ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest, }; use near_primitives::types::AccountId; use registry_types::StartBlock; @@ -235,6 +237,34 @@ impl BlockStreamsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + block_stream: &StreamInfo, + ) -> anyhow::Result<()> { + if let Some(health) = block_stream.health.as_ref() { + let updated_at = + SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); + + let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30); + let stalled = matches!( + health.processing_state.try_into(), + Ok(ProcessingState::Stalled) + ); + + if !stale && !stalled { + return Ok(()); + } + } + + tracing::info!("Restarting stalled block stream"); + + self.stop(block_stream.stream_id.clone()).await?; + self.resume_block_stream(config).await?; + + Ok(()) + } + pub async fn synchronise_block_stream( &self, config: &IndexerConfig, @@ -246,6 +276,7 @@ impl BlockStreamsHandler { if let Some(block_stream) = block_stream { if block_stream.version == config.get_registry_version() { + self.ensure_healthy(config, &block_stream).await?; return Ok(()); } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 01c9cc4b..4e12ef26 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{ + ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, + StopExecutorRequest, +}; use tonic::transport::channel::Channel; use tonic::Request; @@ -119,6 +122,28 @@ impl ExecutorsHandler { Ok(()) } + async fn ensure_healthy( + &self, + config: &IndexerConfig, + executor: ExecutorInfo, + ) -> anyhow::Result<()> { + if let Some(health) = executor.health { + if !matches!( + health.execution_state.try_into(), + Ok(ExecutionState::Stalled) + ) { + return Ok(()); + } + } + + tracing::info!("Restarting stalled executor"); + + self.stop(executor.executor_id).await?; + self.start(config).await?; + + Ok(()) + } + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { let executor = self .get(config.account_id.clone(), config.function_name.clone()) @@ -126,6 +151,7 @@ impl ExecutorsHandler { if let Some(executor) = executor { if executor.version == config.get_registry_version() { + self.ensure_healthy(config, executor).await?; return Ok(()); }