diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 26d5580fe..cff582543 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -43,6 +43,10 @@ async fn synchronise_registry_config( if active_block_stream.version == registry_version { continue; } + + block_streams_handler + .stop(active_block_stream.stream_id) + .await?; } let start_block_height = if let Some(start_block_height) = @@ -347,5 +351,73 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn restarts_streams_with_mismatched_versions() { + let mut registry = Registry::default(); + registry.expect_fetch().returning(|| { + Ok(HashMap::from([( + "morgs.near".parse().unwrap(), + HashMap::from([( + "test".to_string(), + IndexerConfig { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: String::new(), + schema: Some(String::new()), + filter: IndexerRule { + id: None, + name: None, + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule: MatchingRule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }, + }, + created_at_block_height: 101, + updated_at_block_height: Some(200), + start_block_height: Some(1000), + }, + )]), + )])) + }); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get::() + .returning(|_| anyhow::bail!("none")); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler.expect_list().returning(|| { + Ok(vec![block_streamer::StreamInfo { + stream_id: "stream_id".to_string(), + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 101, + }]) + }); + block_stream_handler + .expect_stop() + .with(predicate::eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + block_stream_handler + .expect_start() + .with( + predicate::eq(1000), + predicate::eq("morgs.near".to_string()), + predicate::eq("test".to_string()), + predicate::eq(200), + predicate::eq(MatchingRule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }), + ) + .returning(|_, _, _, _, _| Ok(())); + + synchronise_registry_config(®istry, &redis_client, &mut block_stream_handler) + .await + .unwrap(); + } } }