From cb11bad575a9b99c44925c6c9dd1275a0c155ec8 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 31 May 2024 15:49:07 +1200 Subject: [PATCH] feat: Handle sync failures on new indexers --- coordinator/src/synchroniser.rs | 242 ++++++++++++++++++++++++++++++-- 1 file changed, 230 insertions(+), 12 deletions(-) diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index 2008fd51a..59111d028 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -39,7 +39,10 @@ impl<'a> Synchroniser<'a> { } async fn sync_new_indexer(&self, config: &IndexerConfig) -> anyhow::Result<()> { - self.executors_handler.start(config).await?; + if let Err(err) = self.executors_handler.start(config).await { + tracing::error!(?err, "Failed to start Executor"); + return Ok(()); + } let start_block = match config.start_block { StartBlock::Height(height) => height, @@ -52,10 +55,12 @@ impl<'a> Synchroniser<'a> { } }; - self.block_streams_handler - .start(start_block, config) - .await?; + if let Err(err) = self.block_streams_handler.start(start_block, config).await { + tracing::error!(?err, "Failed to start Block Stream"); + return Ok(()); + } + // TODO handle failures self.state_manager.set_synced(config).await?; Ok(()) @@ -174,16 +179,20 @@ impl<'a> Synchroniser<'a> { executor: Option<&ExecutorInfo>, block_stream: Option<&StreamInfo>, ) -> anyhow::Result<()> { - let result = tokio::try_join!( - self.sync_existing_executor(config, executor), - self.sync_existing_block_stream(config, state, block_stream) - ); + if let Err(error) = self.sync_existing_executor(config, executor).await { + tracing::error!(?error, "Failed to sync executor"); + return Ok(()); + } - if let Err(error) = result { - tracing::error!(?error, "Failed to sync Indexer"); + if let Err(error) = self + .sync_existing_block_stream(config, state, block_stream) + .await + { + tracing::error!(?error, "Failed to sync block stream"); return Ok(()); } + // TODO handle failures self.state_manager.set_synced(config).await?; Ok(()) @@ -310,6 +319,144 @@ mod test { synchroniser.sync().await.unwrap(); } + + #[tokio::test] + async fn configures_block_stream() { + let config_with_latest = IndexerConfig { + start_block: StartBlock::Latest, + ..IndexerConfig::default() + }; + let height = 5; + let config_with_height = IndexerConfig { + start_block: StartBlock::Height(height), + ..IndexerConfig::default() + }; + let config_with_continue = IndexerConfig { + start_block: StartBlock::Continue, + ..IndexerConfig::default() + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with( + eq(config_with_continue.get_registry_version()), + eq(config_with_continue.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with( + eq(config_with_latest.get_registry_version()), + eq(config_with_latest.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with(eq(height), eq(config_with_height.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config_with_continue.clone())) + .returning(|_| Ok(())) + .once(); + state_manager + .expect_set_synced() + .with(eq(config_with_latest.clone())) + .returning(|_| Ok(())) + .once(); + state_manager + .expect_set_synced() + .with(eq(config_with_height.clone())) + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_start() + .returning(|_| Ok(())) + .times(3); + + let redis_client = RedisClient::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_new_indexer(&config_with_latest) + .await + .unwrap(); + synchroniser + .sync_new_indexer(&config_with_height) + .await + .unwrap(); + synchroniser + .sync_new_indexer(&config_with_continue) + .await + .unwrap(); + } + + #[tokio::test] + async fn handles_synchronisation_failures() { + let config = IndexerConfig::default(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_start() + .with(eq(config.clone())) + .returning(|_| anyhow::bail!("")) + .once(); + executors_handler + .expect_start() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .times(2); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with(eq(100), eq(config.clone())) + .returning(|_, _| anyhow::bail!("")) + .once(); + block_streams_handler + .expect_start() + .with(eq(100), eq(config.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + + let redis_client = RedisClient::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser.sync_new_indexer(&config).await.unwrap(); // fail + synchroniser.sync_new_indexer(&config).await.unwrap(); // fail + synchroniser.sync_new_indexer(&config).await.unwrap(); // success + } } mod existing { @@ -642,8 +789,79 @@ mod test { .unwrap(); } - #[tokio::test] - async fn handles_synchronisation_failures() {} + //#[tokio::test] + //async fn handles_synchronisation_failures() { + // let config = IndexerConfig::default(); + // let state = IndexerState { + // account_id: config.account_id.clone(), + // function_name: config.function_name.clone(), + // block_stream_synced_at: Some(config.get_registry_version()), + // enabled: true, + // }; + // + // let mut executors_handler = ExecutorsHandler::default(); + // executors_handler + // .expect_stop() + // .with(always()) + // .returning(|_| anyhow::bail!("")) + // .once(); + // executors_handler + // .expect_stop() + // .with(always()) + // .returning(|_| Ok(())); + // executors_handler + // .expect_start() + // .with(eq(config.clone())) + // .returning(|_| anyhow::bail!("")) + // .once(); + // executors_handler + // .expect_start() + // .with(eq(config.clone())) + // .returning(|_| Ok(())); + // + // let mut block_streams_handler = BlockStreamsHandler::default(); + // block_streams_handler + // .expect_start() + // .with(eq(100), eq(config.clone())) + // .returning(|_, _| anyhow::bail!("")) + // .once(); + // block_streams_handler + // .expect_start() + // .with(eq(100), eq(config.clone())) + // .returning(|_, _| Ok(())) + // .once(); + // + // let mut state_manager = IndexerStateManager::default(); + // state_manager + // .expect_set_synced() + // .with(eq(config.clone())) + // .returning(|_| Ok(())) + // .once(); + // + // let redis_client = RedisClient::default(); + // let registry = Registry::default(); + // + // let synchroniser = Synchroniser::new( + // &block_streams_handler, + // &executors_handler, + // ®istry, + // &state_manager, + // &redis_client, + // ); + // + // synchroniser + // .sync_existing_indexer(&config, &state) + // .await + // .unwrap(); // fail + // synchroniser + // .sync_existing_indexer(&config, &state) + // .await + // .unwrap(); // fail + // synchroniser + // .sync_existing_indexer(&config, &state) + // .await + // .unwrap(); // success + //} #[tokio::test] async fn stops_disabled_indexers() {}