From 3a31c150c2466d488fb9010968953495733da8be Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 11 Jun 2024 15:49:26 +1200 Subject: [PATCH] refactor: Remove old synchronisation code --- coordinator/src/block_streams/mod.rs | 5 - coordinator/src/block_streams/synchronise.rs | 651 ------------------ .../handler.rs => block_streams_handler.rs} | 0 coordinator/src/executors/mod.rs | 5 - coordinator/src/executors/synchronise.rs | 238 ------- .../handler.rs => executors_handler.rs} | 0 coordinator/src/main.rs | 8 +- coordinator/src/synchroniser.rs | 4 +- 8 files changed, 6 insertions(+), 905 deletions(-) delete mode 100644 coordinator/src/block_streams/mod.rs delete mode 100644 coordinator/src/block_streams/synchronise.rs rename coordinator/src/{block_streams/handler.rs => block_streams_handler.rs} (100%) delete mode 100644 coordinator/src/executors/mod.rs delete mode 100644 coordinator/src/executors/synchronise.rs rename coordinator/src/{executors/handler.rs => executors_handler.rs} (100%) diff --git a/coordinator/src/block_streams/mod.rs b/coordinator/src/block_streams/mod.rs deleted file mode 100644 index cd8b6fd96..000000000 --- a/coordinator/src/block_streams/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod handler; -mod synchronise; - -pub use handler::BlockStreamsHandler; -pub use synchronise::synchronise_block_streams; diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs deleted file mode 100644 index 5a8af2ecb..000000000 --- a/coordinator/src/block_streams/synchronise.rs +++ /dev/null @@ -1,651 +0,0 @@ -use registry_types::StartBlock; - -use crate::indexer_config::IndexerConfig; -use crate::indexer_state::{IndexerStateManager, SyncStatus}; -use crate::redis::RedisClient; -use crate::registry::IndexerRegistry; - -use super::handler::{BlockStreamsHandler, StreamInfo}; - -pub async fn synchronise_block_streams( - indexer_registry: &IndexerRegistry, - indexer_manager: &IndexerStateManager, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - let mut active_block_streams = block_streams_handler.list().await?; - - for indexer_config in indexer_registry.iter() { - let active_block_stream = active_block_streams - .iter() - .position(|stream| { - stream.account_id == *indexer_config.account_id - && stream.function_name == indexer_config.function_name - }) - .map(|index| active_block_streams.swap_remove(index)); - - let _ = synchronise_block_stream( - active_block_stream, - indexer_config, - indexer_manager, - redis_client, - block_streams_handler, - ) - .await - .map_err(|err| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version(), - "failed to sync block stream: {err:?}" - ) - }); - } - - for unregistered_block_stream in active_block_streams { - tracing::info!( - account_id = unregistered_block_stream.account_id.as_str(), - function_name = unregistered_block_stream.function_name, - version = unregistered_block_stream.version, - "Stopping unregistered block stream" - ); - - block_streams_handler - .stop(unregistered_block_stream.stream_id) - .await?; - } - - Ok(()) -} - -#[tracing::instrument( - skip_all, - fields( - account_id = %indexer_config.account_id, - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version() - ) -)] -async fn synchronise_block_stream( - active_block_stream: Option, - indexer_config: &IndexerConfig, - indexer_manager: &IndexerStateManager, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - if let Some(active_block_stream) = active_block_stream { - if active_block_stream.version == indexer_config.get_registry_version() { - return Ok(()); - } - - tracing::info!( - previous_version = active_block_stream.version, - "Stopping outdated block stream" - ); - - block_streams_handler - .stop(active_block_stream.stream_id) - .await?; - } - - let sync_status = indexer_manager - .get_block_stream_sync_status(indexer_config) - .await?; - - clear_block_stream_if_needed(&sync_status, indexer_config, redis_client).await?; - - let start_block_height = - determine_start_block_height(&sync_status, indexer_config, redis_client).await?; - - block_streams_handler - .start(start_block_height, indexer_config) - .await?; - - indexer_manager - .set_block_stream_synced(indexer_config) - .await?; - - Ok(()) -} - -async fn clear_block_stream_if_needed( - sync_status: &SyncStatus, - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result<()> { - if matches!(sync_status, SyncStatus::Synced | SyncStatus::New) - || indexer_config.start_block == StartBlock::Continue - { - return Ok(()); - } - - tracing::info!("Clearing redis stream"); - - redis_client.clear_block_stream(indexer_config).await -} - -async fn determine_start_block_height( - sync_status: &SyncStatus, - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result { - if sync_status == &SyncStatus::Synced { - let height = get_continuation_block_height(indexer_config, redis_client).await?; - - tracing::info!(height, "Resuming block stream"); - - return Ok(height); - } - - let height = match indexer_config.start_block { - StartBlock::Latest => Ok(indexer_config.get_registry_version()), - StartBlock::Height(height) => Ok(height), - StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await, - }?; - - tracing::info!(height, "Starting block stream"); - - Ok(height) -} - -async fn get_continuation_block_height( - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result { - redis_client - .get_last_published_block(indexer_config) - .await? - .map(|height| height + 1) - .ok_or(anyhow::anyhow!("Indexer has no `last_published_block`")) -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::collections::HashMap; - - use mockall::predicate; - use registry_types::{Rule, Status}; - - #[tokio::test] - async fn resumes_previously_synced_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Synced)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(Some(500))) - .once(); - redis_client.expect_clear_block_stream().never(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with(predicate::eq(501), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_latest() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Latest, - }; - - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(200), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_height() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(100), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_continue() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(Some(100))) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(101), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_stream_not_in_registry() { - let indexer_registry = IndexerRegistry::from(&[]); - - let redis_client = RedisClient::default(); - - let mock_indexer_manager = IndexerStateManager::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block: StartBlock::Latest, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let redis_client = RedisClient::default(); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Synced)); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_unsynced_streams() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: Some(199), - start_block: StartBlock::Height(1000), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with(predicate::eq(1000), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn skips_stream_without_last_published_block() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| anyhow::bail!("no last_published_block")) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_new_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block: StartBlock::Height(50), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::New)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(50), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } -} diff --git a/coordinator/src/block_streams/handler.rs b/coordinator/src/block_streams_handler.rs similarity index 100% rename from coordinator/src/block_streams/handler.rs rename to coordinator/src/block_streams_handler.rs diff --git a/coordinator/src/executors/mod.rs b/coordinator/src/executors/mod.rs deleted file mode 100644 index 1b68609c6..000000000 --- a/coordinator/src/executors/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod handler; -mod synchronise; - -pub use handler::ExecutorsHandler; -pub use synchronise::synchronise_executors; diff --git a/coordinator/src/executors/synchronise.rs b/coordinator/src/executors/synchronise.rs deleted file mode 100644 index 2b22c8a93..000000000 --- a/coordinator/src/executors/synchronise.rs +++ /dev/null @@ -1,238 +0,0 @@ -use crate::indexer_config::IndexerConfig; -use crate::registry::IndexerRegistry; - -use super::handler::{ExecutorInfo, ExecutorsHandler}; - -pub async fn synchronise_executors( - indexer_registry: &IndexerRegistry, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let mut active_executors = executors_handler.list().await?; - - for indexer_config in indexer_registry.iter() { - let active_executor = active_executors - .iter() - .position(|stream| { - stream.account_id == *indexer_config.account_id - && stream.function_name == indexer_config.function_name - }) - .map(|index| active_executors.swap_remove(index)); - - let _ = synchronise_executor(active_executor, indexer_config, executors_handler) - .await - .map_err(|err| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version(), - "failed to sync executor: {err:?}" - ) - }); - } - - for unregistered_executor in active_executors { - tracing::info!( - account_id = unregistered_executor.account_id.as_str(), - function_name = unregistered_executor.function_name, - registry_version = unregistered_executor.version, - "Stopping unregistered executor" - ); - - executors_handler - .stop(unregistered_executor.executor_id) - .await?; - } - - Ok(()) -} - -#[tracing::instrument( - skip_all, - fields( - account_id = %indexer_config.account_id, - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version() - ) -)] -async fn synchronise_executor( - active_executor: Option, - indexer_config: &IndexerConfig, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let registry_version = indexer_config.get_registry_version(); - - if let Some(active_executor) = active_executor { - if active_executor.version == registry_version { - return Ok(()); - } - - tracing::info!("Stopping outdated executor"); - - executors_handler.stop(active_executor.executor_id).await?; - } - - tracing::info!("Starting new executor"); - - executors_handler.start(indexer_config).await?; - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::collections::HashMap; - - use mockall::predicate; - use registry_types::{Rule, StartBlock, Status}; - - use crate::indexer_config::IndexerConfig; - - #[tokio::test] - async fn starts_executor() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - executors_handler - .expect_start() - .with(predicate::eq(indexer_config)) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_executor_when_registry_version_differs() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 1, - }]) - }); - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - executors_handler - .expect_start() - .with(predicate::eq(indexer_config)) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_executor_with_matching_registry_version() { - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block: StartBlock::Height(100), - }, - )]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - executors_handler.expect_stop().never(); - - executors_handler.expect_start().never(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_executor_not_in_registry() { - let indexer_registry = IndexerRegistry::from(&[]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } -} diff --git a/coordinator/src/executors/handler.rs b/coordinator/src/executors_handler.rs similarity index 100% rename from coordinator/src/executors/handler.rs rename to coordinator/src/executors_handler.rs diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 68f6b5b65..7a9730453 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -5,15 +5,15 @@ use near_primitives::types::AccountId; use tokio::time::sleep; use tracing_subscriber::prelude::*; -use crate::block_streams::BlockStreamsHandler; -use crate::executors::ExecutorsHandler; +use crate::block_streams_handler::BlockStreamsHandler; +use crate::executors_handler::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; use crate::redis::RedisClient; use crate::registry::Registry; use crate::synchroniser::Synchroniser; -mod block_streams; -mod executors; +mod block_streams_handler; +mod executors_handler; mod indexer_config; mod indexer_state; mod redis; diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index 744bb9737..5a6b0ce23 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -5,8 +5,8 @@ use runner::ExecutorInfo; use tracing::instrument; use crate::{ - block_streams::BlockStreamsHandler, - executors::ExecutorsHandler, + block_streams_handler::BlockStreamsHandler, + executors_handler::ExecutorsHandler, indexer_config::IndexerConfig, indexer_state::{IndexerState, IndexerStateManager}, redis::RedisClient,