Skip to content

Commit

Permalink
feat: Handle sync failures on new indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jun 9, 2024
1 parent da19ba9 commit 9c595c8
Showing 1 changed file with 230 additions and 12 deletions.
242 changes: 230 additions & 12 deletions coordinator/src/synchroniser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ impl<'a> Synchroniser<'a> {
}

async fn sync_new_indexer(&self, config: &IndexerConfig) -> anyhow::Result<()> {
self.executors_handler.start(config).await?;
if let Err(err) = self.executors_handler.start(config).await {
tracing::error!(?err, "Failed to start Executor");
return Ok(());
}

let start_block = match config.start_block {
StartBlock::Height(height) => height,
Expand All @@ -52,10 +55,12 @@ impl<'a> Synchroniser<'a> {
}
};

self.block_streams_handler
.start(start_block, config)
.await?;
if let Err(err) = self.block_streams_handler.start(start_block, config).await {
tracing::error!(?err, "Failed to start Block Stream");
return Ok(());
}

// TODO handle failures
self.state_manager.set_synced(config).await?;

Ok(())
Expand Down Expand Up @@ -174,16 +179,20 @@ impl<'a> Synchroniser<'a> {
executor: Option<&ExecutorInfo>,
block_stream: Option<&StreamInfo>,
) -> anyhow::Result<()> {
let result = tokio::try_join!(
self.sync_existing_executor(config, executor),
self.sync_existing_block_stream(config, state, block_stream)
);
if let Err(error) = self.sync_existing_executor(config, executor).await {
tracing::error!(?error, "Failed to sync executor");
return Ok(());
}

if let Err(error) = result {
tracing::error!(?error, "Failed to sync Indexer");
if let Err(error) = self
.sync_existing_block_stream(config, state, block_stream)
.await
{
tracing::error!(?error, "Failed to sync block stream");
return Ok(());
}

// TODO handle failures
self.state_manager.set_synced(config).await?;

Ok(())
Expand Down Expand Up @@ -310,6 +319,144 @@ mod test {

synchroniser.sync().await.unwrap();
}

#[tokio::test]
async fn configures_block_stream() {
let config_with_latest = IndexerConfig {
start_block: StartBlock::Latest,
..IndexerConfig::default()
};
let height = 5;
let config_with_height = IndexerConfig {
start_block: StartBlock::Height(height),
..IndexerConfig::default()
};
let config_with_continue = IndexerConfig {
start_block: StartBlock::Continue,
..IndexerConfig::default()
};

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_start()
.with(
eq(config_with_continue.get_registry_version()),
eq(config_with_continue.clone()),
)
.returning(|_, _| Ok(()))
.once();
block_streams_handler
.expect_start()
.with(
eq(config_with_latest.get_registry_version()),
eq(config_with_latest.clone()),
)
.returning(|_, _| Ok(()))
.once();
block_streams_handler
.expect_start()
.with(eq(height), eq(config_with_height.clone()))
.returning(|_, _| Ok(()))
.once();

let mut state_manager = IndexerStateManager::default();
state_manager
.expect_set_synced()
.with(eq(config_with_continue.clone()))
.returning(|_| Ok(()))
.once();
state_manager
.expect_set_synced()
.with(eq(config_with_latest.clone()))
.returning(|_| Ok(()))
.once();
state_manager
.expect_set_synced()
.with(eq(config_with_height.clone()))
.returning(|_| Ok(()))
.once();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_start()
.returning(|_| Ok(()))
.times(3);

let redis_client = RedisClient::default();
let registry = Registry::default();

let synchroniser = Synchroniser::new(
&block_streams_handler,
&executors_handler,
&registry,
&state_manager,
&redis_client,
);

synchroniser
.sync_new_indexer(&config_with_latest)
.await
.unwrap();
synchroniser
.sync_new_indexer(&config_with_height)
.await
.unwrap();
synchroniser
.sync_new_indexer(&config_with_continue)
.await
.unwrap();
}

#[tokio::test]
async fn handles_synchronisation_failures() {
let config = IndexerConfig::default();

let mut executors_handler = ExecutorsHandler::default();
executors_handler
.expect_start()
.with(eq(config.clone()))
.returning(|_| anyhow::bail!(""))
.once();
executors_handler
.expect_start()
.with(eq(config.clone()))
.returning(|_| Ok(()))
.times(2);

let mut block_streams_handler = BlockStreamsHandler::default();
block_streams_handler
.expect_start()
.with(eq(100), eq(config.clone()))
.returning(|_, _| anyhow::bail!(""))
.once();
block_streams_handler
.expect_start()
.with(eq(100), eq(config.clone()))
.returning(|_, _| Ok(()))
.once();

let mut state_manager = IndexerStateManager::default();
state_manager
.expect_set_synced()
.with(eq(config.clone()))
.returning(|_| Ok(()))
.once();

let redis_client = RedisClient::default();
let registry = Registry::default();

let synchroniser = Synchroniser::new(
&block_streams_handler,
&executors_handler,
&registry,
&state_manager,
&redis_client,
);

synchroniser.sync_new_indexer(&config).await.unwrap(); // fail
synchroniser.sync_new_indexer(&config).await.unwrap(); // fail
synchroniser.sync_new_indexer(&config).await.unwrap(); // success
}
}

mod existing {
Expand Down Expand Up @@ -642,8 +789,79 @@ mod test {
.unwrap();
}

#[tokio::test]
async fn handles_synchronisation_failures() {}
//#[tokio::test]
//async fn handles_synchronisation_failures() {
// let config = IndexerConfig::default();
// let state = IndexerState {
// account_id: config.account_id.clone(),
// function_name: config.function_name.clone(),
// block_stream_synced_at: Some(config.get_registry_version()),
// enabled: true,
// };
//
// let mut executors_handler = ExecutorsHandler::default();
// executors_handler
// .expect_stop()
// .with(always())
// .returning(|_| anyhow::bail!(""))
// .once();
// executors_handler
// .expect_stop()
// .with(always())
// .returning(|_| Ok(()));
// executors_handler
// .expect_start()
// .with(eq(config.clone()))
// .returning(|_| anyhow::bail!(""))
// .once();
// executors_handler
// .expect_start()
// .with(eq(config.clone()))
// .returning(|_| Ok(()));
//
// let mut block_streams_handler = BlockStreamsHandler::default();
// block_streams_handler
// .expect_start()
// .with(eq(100), eq(config.clone()))
// .returning(|_, _| anyhow::bail!(""))
// .once();
// block_streams_handler
// .expect_start()
// .with(eq(100), eq(config.clone()))
// .returning(|_, _| Ok(()))
// .once();
//
// let mut state_manager = IndexerStateManager::default();
// state_manager
// .expect_set_synced()
// .with(eq(config.clone()))
// .returning(|_| Ok(()))
// .once();
//
// let redis_client = RedisClient::default();
// let registry = Registry::default();
//
// let synchroniser = Synchroniser::new(
// &block_streams_handler,
// &executors_handler,
// &registry,
// &state_manager,
// &redis_client,
// );
//
// synchroniser
// .sync_existing_indexer(&config, &state)
// .await
// .unwrap(); // fail
// synchroniser
// .sync_existing_indexer(&config, &state)
// .await
// .unwrap(); // fail
// synchroniser
// .sync_existing_indexer(&config, &state)
// .await
// .unwrap(); // success
//}

#[tokio::test]
async fn stops_disabled_indexers() {}
Expand Down

0 comments on commit 9c595c8

Please sign in to comment.