diff --git a/coordinator/src/block_streams/mod.rs b/coordinator/src/block_streams/mod.rs deleted file mode 100644 index cd8b6fd96..000000000 --- a/coordinator/src/block_streams/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod handler; -mod synchronise; - -pub use handler::BlockStreamsHandler; -pub use synchronise::synchronise_block_streams; diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs deleted file mode 100644 index 5a8af2ecb..000000000 --- a/coordinator/src/block_streams/synchronise.rs +++ /dev/null @@ -1,651 +0,0 @@ -use registry_types::StartBlock; - -use crate::indexer_config::IndexerConfig; -use crate::indexer_state::{IndexerStateManager, SyncStatus}; -use crate::redis::RedisClient; -use crate::registry::IndexerRegistry; - -use super::handler::{BlockStreamsHandler, StreamInfo}; - -pub async fn synchronise_block_streams( - indexer_registry: &IndexerRegistry, - indexer_manager: &IndexerStateManager, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - let mut active_block_streams = block_streams_handler.list().await?; - - for indexer_config in indexer_registry.iter() { - let active_block_stream = active_block_streams - .iter() - .position(|stream| { - stream.account_id == *indexer_config.account_id - && stream.function_name == indexer_config.function_name - }) - .map(|index| active_block_streams.swap_remove(index)); - - let _ = synchronise_block_stream( - active_block_stream, - indexer_config, - indexer_manager, - redis_client, - block_streams_handler, - ) - .await - .map_err(|err| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version(), - "failed to sync block stream: {err:?}" - ) - }); - } - - for unregistered_block_stream in active_block_streams { - tracing::info!( - account_id = unregistered_block_stream.account_id.as_str(), - function_name = unregistered_block_stream.function_name, - version = unregistered_block_stream.version, - "Stopping unregistered block stream" - ); - - block_streams_handler - .stop(unregistered_block_stream.stream_id) - .await?; - } - - Ok(()) -} - -#[tracing::instrument( - skip_all, - fields( - account_id = %indexer_config.account_id, - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version() - ) -)] -async fn synchronise_block_stream( - active_block_stream: Option, - indexer_config: &IndexerConfig, - indexer_manager: &IndexerStateManager, - redis_client: &RedisClient, - block_streams_handler: &BlockStreamsHandler, -) -> anyhow::Result<()> { - if let Some(active_block_stream) = active_block_stream { - if active_block_stream.version == indexer_config.get_registry_version() { - return Ok(()); - } - - tracing::info!( - previous_version = active_block_stream.version, - "Stopping outdated block stream" - ); - - block_streams_handler - .stop(active_block_stream.stream_id) - .await?; - } - - let sync_status = indexer_manager - .get_block_stream_sync_status(indexer_config) - .await?; - - clear_block_stream_if_needed(&sync_status, indexer_config, redis_client).await?; - - let start_block_height = - determine_start_block_height(&sync_status, indexer_config, redis_client).await?; - - block_streams_handler - .start(start_block_height, indexer_config) - .await?; - - indexer_manager - .set_block_stream_synced(indexer_config) - .await?; - - Ok(()) -} - -async fn clear_block_stream_if_needed( - sync_status: &SyncStatus, - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result<()> { - if matches!(sync_status, SyncStatus::Synced | SyncStatus::New) - || indexer_config.start_block == StartBlock::Continue - { - return Ok(()); - } - - tracing::info!("Clearing redis stream"); - - redis_client.clear_block_stream(indexer_config).await -} - -async fn determine_start_block_height( - sync_status: &SyncStatus, - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result { - if sync_status == &SyncStatus::Synced { - let height = get_continuation_block_height(indexer_config, redis_client).await?; - - tracing::info!(height, "Resuming block stream"); - - return Ok(height); - } - - let height = match indexer_config.start_block { - StartBlock::Latest => Ok(indexer_config.get_registry_version()), - StartBlock::Height(height) => Ok(height), - StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await, - }?; - - tracing::info!(height, "Starting block stream"); - - Ok(height) -} - -async fn get_continuation_block_height( - indexer_config: &IndexerConfig, - redis_client: &RedisClient, -) -> anyhow::Result { - redis_client - .get_last_published_block(indexer_config) - .await? - .map(|height| height + 1) - .ok_or(anyhow::anyhow!("Indexer has no `last_published_block`")) -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::collections::HashMap; - - use mockall::predicate; - use registry_types::{Rule, Status}; - - #[tokio::test] - async fn resumes_previously_synced_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Synced)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(Some(500))) - .once(); - redis_client.expect_clear_block_stream().never(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler - .expect_start() - .with(predicate::eq(501), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_latest() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Latest, - }; - - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(200), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_height() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(100), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_stream_with_continue() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(Some(100))) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(101), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_stream_not_in_registry() { - let indexer_registry = IndexerRegistry::from(&[]); - - let redis_client = RedisClient::default(); - - let mock_indexer_manager = IndexerStateManager::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 1, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block: StartBlock::Latest, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let redis_client = RedisClient::default(); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Synced)); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_unsynced_streams() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: Some(199), - start_block: StartBlock::Height(1000), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| { - Ok(vec![block_streamer::StreamInfo { - stream_id: "stream_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - version: 101, - }]) - }); - block_stream_handler - .expect_stop() - .with(predicate::eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_stream_handler - .expect_start() - .with(predicate::eq(1000), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn skips_stream_without_last_published_block() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::Outdated)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_last_published_block() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| anyhow::bail!("no last_published_block")) - .once(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler.expect_start().never(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } - - #[tokio::test] - async fn starts_new_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 101, - updated_at_block_height: None, - start_block: StartBlock::Height(50), - }; - let indexer_registry = IndexerRegistry::from(&[( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )]); - - let mut mock_indexer_manager = IndexerStateManager::default(); - mock_indexer_manager - .expect_get_block_stream_sync_status() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(SyncStatus::New)); - mock_indexer_manager - .expect_set_block_stream_synced() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - - let mut block_stream_handler = BlockStreamsHandler::default(); - block_stream_handler.expect_list().returning(|| Ok(vec![])); - block_stream_handler.expect_stop().never(); - block_stream_handler - .expect_start() - .with(predicate::eq(50), predicate::eq(indexer_config)) - .returning(|_, _| Ok(())) - .once(); - - synchronise_block_streams( - &indexer_registry, - &mock_indexer_manager, - &redis_client, - &block_stream_handler, - ) - .await - .unwrap(); - } -} diff --git a/coordinator/src/block_streams/handler.rs b/coordinator/src/block_streams_handler.rs similarity index 91% rename from coordinator/src/block_streams/handler.rs rename to coordinator/src/block_streams_handler.rs index f17a974ce..a2315c171 100644 --- a/coordinator/src/block_streams/handler.rs +++ b/coordinator/src/block_streams_handler.rs @@ -62,9 +62,7 @@ impl BlockStreamsHandlerImpl { .clone() .stop_stream(Request::new(request.clone())) .await - .map_err(|e| { - tracing::error!(stream_id, "Failed to stop stream\n{e:?}"); - }); + .context(format!("Failed to stop stream: {stream_id}"))?; tracing::debug!(stream_id, "Stop stream response: {:#?}", response); @@ -125,13 +123,10 @@ impl BlockStreamsHandlerImpl { .clone() .start_stream(Request::new(request.clone())) .await - .map_err(|error| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - "Failed to start stream\n{error:?}" - ); - }); + .context(format!( + "Failed to start stream: {}", + indexer_config.get_full_name() + ))?; tracing::debug!( account_id = indexer_config.account_id.as_str(), diff --git a/coordinator/src/executors/mod.rs b/coordinator/src/executors/mod.rs deleted file mode 100644 index 1b68609c6..000000000 --- a/coordinator/src/executors/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod handler; -mod synchronise; - -pub use handler::ExecutorsHandler; -pub use synchronise::synchronise_executors; diff --git a/coordinator/src/executors/synchronise.rs b/coordinator/src/executors/synchronise.rs deleted file mode 100644 index 2b22c8a93..000000000 --- a/coordinator/src/executors/synchronise.rs +++ /dev/null @@ -1,238 +0,0 @@ -use crate::indexer_config::IndexerConfig; -use crate::registry::IndexerRegistry; - -use super::handler::{ExecutorInfo, ExecutorsHandler}; - -pub async fn synchronise_executors( - indexer_registry: &IndexerRegistry, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let mut active_executors = executors_handler.list().await?; - - for indexer_config in indexer_registry.iter() { - let active_executor = active_executors - .iter() - .position(|stream| { - stream.account_id == *indexer_config.account_id - && stream.function_name == indexer_config.function_name - }) - .map(|index| active_executors.swap_remove(index)); - - let _ = synchronise_executor(active_executor, indexer_config, executors_handler) - .await - .map_err(|err| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version(), - "failed to sync executor: {err:?}" - ) - }); - } - - for unregistered_executor in active_executors { - tracing::info!( - account_id = unregistered_executor.account_id.as_str(), - function_name = unregistered_executor.function_name, - registry_version = unregistered_executor.version, - "Stopping unregistered executor" - ); - - executors_handler - .stop(unregistered_executor.executor_id) - .await?; - } - - Ok(()) -} - -#[tracing::instrument( - skip_all, - fields( - account_id = %indexer_config.account_id, - function_name = indexer_config.function_name, - version = indexer_config.get_registry_version() - ) -)] -async fn synchronise_executor( - active_executor: Option, - indexer_config: &IndexerConfig, - executors_handler: &ExecutorsHandler, -) -> anyhow::Result<()> { - let registry_version = indexer_config.get_registry_version(); - - if let Some(active_executor) = active_executor { - if active_executor.version == registry_version { - return Ok(()); - } - - tracing::info!("Stopping outdated executor"); - - executors_handler.stop(active_executor.executor_id).await?; - } - - tracing::info!("Starting new executor"); - - executors_handler.start(indexer_config).await?; - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::collections::HashMap; - - use mockall::predicate; - use registry_types::{Rule, StartBlock, Status}; - - use crate::indexer_config::IndexerConfig; - - #[tokio::test] - async fn starts_executor() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - executors_handler - .expect_start() - .with(predicate::eq(indexer_config)) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_executor_when_registry_version_differs() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block: StartBlock::Height(100), - }; - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), indexer_config.clone())]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 1, - }]) - }); - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - executors_handler - .expect_start() - .with(predicate::eq(indexer_config)) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_executor_with_matching_registry_version() { - let indexer_registry = IndexerRegistry(HashMap::from([( - "morgs.near".parse().unwrap(), - HashMap::from([( - "test".to_string(), - IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: "code".to_string(), - schema: "schema".to_string(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(2), - start_block: StartBlock::Height(100), - }, - )]), - )])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - executors_handler.expect_stop().never(); - - executors_handler.expect_start().never(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_executor_not_in_registry() { - let indexer_registry = IndexerRegistry::from(&[]); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| { - Ok(vec![runner::ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - status: "running".to_string(), - version: 2, - }]) - }); - - executors_handler - .expect_stop() - .with(predicate::eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - synchronise_executors(&indexer_registry, &executors_handler) - .await - .unwrap(); - } -} diff --git a/coordinator/src/executors/handler.rs b/coordinator/src/executors_handler.rs similarity index 89% rename from coordinator/src/executors/handler.rs rename to coordinator/src/executors_handler.rs index 68be45f29..28717b924 100644 --- a/coordinator/src/executors/handler.rs +++ b/coordinator/src/executors_handler.rs @@ -64,13 +64,10 @@ impl ExecutorsHandlerImpl { .clone() .start_executor(Request::new(request.clone())) .await - .map_err(|error| { - tracing::error!( - account_id = indexer_config.account_id.as_str(), - function_name = indexer_config.function_name, - "Failed to start executor\n{error:?}" - ); - }); + .context(format!( + "Failed to start executor: {}", + indexer_config.get_full_name() + ))?; tracing::debug!( account_id = indexer_config.account_id.as_str(), @@ -92,7 +89,8 @@ impl ExecutorsHandlerImpl { .client .clone() .stop_executor(Request::new(request.clone())) - .await?; + .await + .context(format!("Failed to stop executor: {executor_id}"))?; tracing::debug!(executor_id, "Stop executor response: {:#?}", response); diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs index a827c0f85..78220c76e 100644 --- a/coordinator/src/indexer_config.rs +++ b/coordinator/src/indexer_config.rs @@ -13,6 +13,25 @@ pub struct IndexerConfig { pub created_at_block_height: u64, } +#[cfg(test)] +impl Default for IndexerConfig { + fn default() -> Self { + Self { + account_id: "morgs.near".parse().unwrap(), + function_name: "test".to_string(), + code: "code".to_string(), + schema: "schema".to_string(), + rule: Rule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: registry_types::Status::Any, + }, + created_at_block_height: 1, + updated_at_block_height: Some(2), + start_block: StartBlock::Height(100), + } + } +} + impl IndexerConfig { pub fn get_full_name(&self) -> String { format!("{}/{}", self.account_id, self.function_name) @@ -26,10 +45,6 @@ impl IndexerConfig { format!("{}:last_published_block", self.get_full_name()) } - pub fn get_redis_stream_version_key(&self) -> String { - format!("{}:version", self.get_redis_stream_key()) - } - pub fn get_state_key(&self) -> String { format!("{}:state", self.get_full_name()) } diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index a8d1eddb9..431fb3345 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -1,32 +1,41 @@ #![cfg_attr(test, allow(dead_code))] -use std::cmp::Ordering; +use anyhow::Context; +use near_primitives::types::AccountId; +use std::str::FromStr; use crate::indexer_config::IndexerConfig; use crate::redis::RedisClient; use crate::registry::IndexerRegistry; -#[derive(Debug, PartialEq, Eq)] -pub enum SyncStatus { - Synced, - Outdated, - New, -} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct OldIndexerState { - block_stream_synced_at: Option, +pub struct OldIndexerState { + pub block_stream_synced_at: Option, + pub enabled: bool, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct IndexerState { + pub account_id: AccountId, + pub function_name: String, pub block_stream_synced_at: Option, pub enabled: bool, } +impl IndexerState { + // FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to + // construct the state key without it. But, this isn't ideal as we now have two places which + // define this key - we need to consolidate these somehow. + pub fn get_state_key(&self) -> String { + format!("{}/{}:state", self.account_id, self.function_name) + } +} + impl Default for IndexerState { fn default() -> Self { Self { + account_id: AccountId::from_str("morgs.near").unwrap(), + function_name: String::new(), block_stream_synced_at: None, enabled: true, } @@ -48,6 +57,53 @@ impl IndexerStateManagerImpl { Self { redis_client } } + pub async fn migrate(&self, registry: &IndexerRegistry) -> anyhow::Result<()> { + if self.redis_client.indexer_states_set_exists().await? { + return Ok(()); + } + + tracing::info!("Migrating indexer state"); + + for config in registry.iter() { + let raw_state = self.redis_client.get_indexer_state(config).await?; + + let state = if let Some(raw_state) = raw_state { + let old_state: OldIndexerState = + serde_json::from_str(&raw_state).context(format!( + "Failed to deserialize OldIndexerState for {}", + config.get_full_name() + ))?; + + IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + } + } else { + self.get_default_state(config) + }; + + self.set_state(config, state).await.context(format!( + "Failed to set state for {}", + config.get_full_name() + ))?; + } + + tracing::info!("Migration complete"); + + Ok(()) + } + + fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState { + IndexerState { + account_id: indexer_config.account_id.clone(), + function_name: indexer_config.function_name.clone(), + block_stream_synced_at: None, + enabled: true, + } + } + pub async fn get_state(&self, indexer_config: &IndexerConfig) -> anyhow::Result { let raw_state = self.redis_client.get_indexer_state(indexer_config).await?; @@ -55,7 +111,11 @@ impl IndexerStateManagerImpl { return Ok(serde_json::from_str(&raw_state)?); } - Ok(IndexerState::default()) + Ok(self.get_default_state(indexer_config)) + } + + pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> { + self.redis_client.delete_indexer_state(indexer_state).await } async fn set_state( @@ -70,138 +130,43 @@ impl IndexerStateManagerImpl { .await } - pub async fn set_enabled( - &self, - indexer_config: &IndexerConfig, - enabled: bool, - ) -> anyhow::Result<()> { + pub async fn set_synced(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let mut indexer_state = self.get_state(indexer_config).await?; - indexer_state.enabled = enabled; - - self.set_state(indexer_config, indexer_state).await?; - Ok(()) - } - - pub async fn filter_disabled_indexers( - &self, - indexer_registry: &IndexerRegistry, - ) -> anyhow::Result { - let mut filtered_registry = IndexerRegistry::new(); - - for indexer_config in indexer_registry.iter() { - let indexer_state = self.get_state(indexer_config).await?; - - if indexer_state.enabled { - filtered_registry - .0 - .entry(indexer_config.account_id.clone()) - .or_default() - .insert(indexer_config.function_name.clone(), indexer_config.clone()); - } - } - - Ok(filtered_registry) - } - - pub async fn migrate_state_if_needed( - &self, - indexer_registry: &IndexerRegistry, - ) -> anyhow::Result<()> { - if self.redis_client.is_migration_complete().await?.is_none() { - tracing::info!("Migrating indexer state"); - - for indexer_config in indexer_registry.iter() { - if let Some(version) = self.redis_client.get_stream_version(indexer_config).await? { - self.redis_client - .set_indexer_state( - indexer_config, - serde_json::to_string(&OldIndexerState { - block_stream_synced_at: Some(version), - })?, - ) - .await?; - } - } - - tracing::info!("Indexer state migration complete"); - - self.redis_client.set_migration_complete().await?; - } + indexer_state.block_stream_synced_at = Some(indexer_config.get_registry_version()); - if self - .redis_client - .get::<_, bool>("state_migration:enabled_flag") - .await? - .is_none() - { - tracing::info!("Migrating enabled flag"); - - for indexer_config in indexer_registry.iter() { - let existing_state = self.redis_client.get_indexer_state(indexer_config).await?; - - let state = match existing_state { - Some(state) => { - let old_state: OldIndexerState = serde_json::from_str(&state)?; - IndexerState { - block_stream_synced_at: old_state.block_stream_synced_at, - enabled: true, - } - } - None => IndexerState::default(), - }; - - self.set_state(indexer_config, state).await?; - } - - self.redis_client - .set("state_migration:enabled_flag", true) - .await?; - - tracing::info!("Enabled flag migration complete"); - } + self.set_state(indexer_config, indexer_state).await?; Ok(()) } - pub async fn get_block_stream_sync_status( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result { - let indexer_state = self.get_state(indexer_config).await?; - - if indexer_state.block_stream_synced_at.is_none() { - return Ok(SyncStatus::New); - } - - match indexer_config - .get_registry_version() - .cmp(&indexer_state.block_stream_synced_at.unwrap()) - { - Ordering::Equal => Ok(SyncStatus::Synced), - Ordering::Greater => Ok(SyncStatus::Outdated), - Ordering::Less => { - tracing::warn!( - "Found stream with version greater than registry, treating as outdated" - ); - - Ok(SyncStatus::Outdated) - } - } - } - - pub async fn set_block_stream_synced( + pub async fn set_enabled( &self, indexer_config: &IndexerConfig, + enabled: bool, ) -> anyhow::Result<()> { let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.block_stream_synced_at = Some(indexer_config.get_registry_version()); + indexer_state.enabled = enabled; self.set_state(indexer_config, indexer_state).await?; Ok(()) } + + pub async fn list(&self) -> anyhow::Result> { + self.redis_client + .list_indexer_states() + .await? + .iter() + .try_fold(Vec::new(), |mut acc, raw_state| { + acc.push( + serde_json::from_str(raw_state) + .context(format!("failed to deserailize {raw_state}"))?, + ); + anyhow::Ok(acc) + }) + .context("Failed to deserialize indexer states") + } } #[cfg(test)] @@ -214,157 +179,69 @@ mod tests { use registry_types::{Rule, StartBlock, Status}; #[tokio::test] - async fn filters_disabled_indexers() { - let morgs_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - let darunrs_config = IndexerConfig { - account_id: "darunrs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Height(100), - }; - - let indexer_registry = IndexerRegistry::from(&[ - ( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), morgs_config.clone())]), - ), - ( - "darunrs.near".parse().unwrap(), - HashMap::from([("test".to_string(), darunrs_config.clone())]), - ), - ]); - + async fn list_indexer_states() { let mut mock_redis_client = RedisClient::default(); mock_redis_client - .expect_get_indexer_state() - .with(predicate::eq(morgs_config.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 200, "enabled": true }) - .to_string(), - )) - }) + .expect_list_indexer_states() + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true }).to_string()])) .once(); mock_redis_client - .expect_get_indexer_state() - .with(predicate::eq(darunrs_config.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 1, "enabled": false }) - .to_string(), - )) - }) + .expect_list_indexer_states() + .returning(|| Ok(vec![serde_json::json!({}).to_string()])) .once(); let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client); - let filtered_registry = indexer_manager - .filter_disabled_indexers(&indexer_registry) - .await - .unwrap(); - - assert!(filtered_registry.contains_key(&morgs_config.account_id)); + assert_eq!(indexer_manager.list().await.unwrap().len(), 1); + assert!(indexer_manager.list().await.is_err()); } #[tokio::test] - async fn migrates_enabled_flag() { - let morgs_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - let darunrs_config = IndexerConfig { + async fn migrate() { + let config1 = IndexerConfig::default(); + let config2 = IndexerConfig { account_id: "darunrs.near".parse().unwrap(), function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Height(100), + ..Default::default() }; - let indexer_registry = IndexerRegistry::from(&[ + let registry = IndexerRegistry::from(&[ ( "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), morgs_config.clone())]), + HashMap::from([("test".to_string(), config1.clone())]), ), ( "darunrs.near".parse().unwrap(), - HashMap::from([("test".to_string(), darunrs_config.clone())]), + HashMap::from([("test".to_string(), config2.clone())]), ), ]); let mut mock_redis_client = RedisClient::default(); mock_redis_client - .expect_is_migration_complete() - .returning(|| Ok(Some(true))) - .times(2); - mock_redis_client - .expect_get::<&str, bool>() - .with(predicate::eq("state_migration:enabled_flag")) - .returning(|_| Ok(None)) - .once(); - mock_redis_client - .expect_get::<&str, bool>() - .with(predicate::eq("state_migration:enabled_flag")) - .returning(|_| Ok(Some(true))) + .expect_indexer_states_set_exists() + .returning(|| Ok(false)) .once(); mock_redis_client .expect_get_indexer_state() - .with(predicate::eq(morgs_config.clone())) + .with(predicate::eq(config1.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "block_stream_synced_at": 200 }).to_string(), + serde_json::json!({ "block_stream_synced_at": 200, "enabled": false }) + .to_string(), )) }) .once(); mock_redis_client .expect_get_indexer_state() - .with(predicate::eq(darunrs_config.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 1 }).to_string(), - )) - }) + .with(predicate::eq(config2.clone())) + .returning(|_| Ok(None)) .once(); mock_redis_client .expect_set_indexer_state() .with( - predicate::eq(morgs_config), + predicate::eq(config1), predicate::eq( - serde_json::json!({ "block_stream_synced_at": 200, "enabled": true }) - .to_string(), + "{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":200,\"enabled\":false}".to_string(), ), ) .returning(|_, _| Ok(())) @@ -372,235 +249,17 @@ mod tests { mock_redis_client .expect_set_indexer_state() .with( - predicate::eq(darunrs_config), + predicate::eq(config2), predicate::eq( - serde_json::json!({ "block_stream_synced_at": 1, "enabled": true }).to_string(), + "{\"account_id\":\"darunrs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":null,\"enabled\":true}".to_string() ), ) .returning(|_, _| Ok(())) .once(); - mock_redis_client - .expect_set::<&str, bool>() - .with( - predicate::eq("state_migration:enabled_flag"), - predicate::eq(true), - ) - .returning(|_, _| Ok(())) - .once(); let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client); - indexer_manager - .migrate_state_if_needed(&indexer_registry) - .await - .unwrap(); - - // ensure it is only called once - indexer_manager - .migrate_state_if_needed(&indexer_registry) - .await - .unwrap(); - } - - #[tokio::test] - async fn migrates_state_to_indexer_manager() { - let morgs_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Height(100), - }; - let darunrs_config = IndexerConfig { - account_id: "darunrs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Height(100), - }; - - let indexer_registry = IndexerRegistry(HashMap::from([ - ( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), morgs_config.clone())]), - ), - ( - "darunrs.near".parse().unwrap(), - HashMap::from([("test".to_string(), darunrs_config.clone())]), - ), - ])); - - let mut mock_redis_client = RedisClient::default(); - mock_redis_client - .expect_is_migration_complete() - .returning(|| Ok(None)) - .once(); - mock_redis_client - .expect_is_migration_complete() - .returning(|| Ok(Some(true))) - .once(); - mock_redis_client - .expect_get::<&str, _>() - .with(predicate::eq("state_migration:enabled_flag")) - .returning(|_| Ok(Some(true))); - mock_redis_client - .expect_set_migration_complete() - .returning(|| Ok(())) - .once(); - mock_redis_client - .expect_get_stream_version() - .with(predicate::eq(morgs_config.clone())) - .returning(|_| Ok(Some(200))) - .once(); - mock_redis_client - .expect_get_stream_version() - .with(predicate::eq(darunrs_config.clone())) - .returning(|_| Ok(Some(1))) - .once(); - mock_redis_client - .expect_set_indexer_state() - .with( - predicate::eq(morgs_config), - predicate::eq(serde_json::json!({ "block_stream_synced_at": 200 }).to_string()), - ) - .returning(|_, _| Ok(())) - .once(); - mock_redis_client - .expect_set_indexer_state() - .with( - predicate::eq(darunrs_config), - predicate::eq(serde_json::json!({ "block_stream_synced_at": 1 }).to_string()), - ) - .returning(|_, _| Ok(())) - .once(); - - let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client); - - indexer_manager - .migrate_state_if_needed(&indexer_registry) - .await - .unwrap(); - - // ensure it is only called once - indexer_manager - .migrate_state_if_needed(&indexer_registry) - .await - .unwrap(); - } - - #[tokio::test] - pub async fn outdated_block_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_indexer_state() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 300, "enabled": true }) - .to_string(), - )) - }); - - let indexer_manager = IndexerStateManagerImpl::new(redis_client); - let result = indexer_manager - .get_block_stream_sync_status(&indexer_config) - .await - .unwrap(); - - assert_eq!(result, SyncStatus::Outdated); - } - - #[tokio::test] - pub async fn synced_block_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: Some(200), - start_block: StartBlock::Continue, - }; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_indexer_state() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 200, "enabled": true }) - .to_string(), - )) - }); - - let indexer_manager = IndexerStateManagerImpl::new(redis_client); - let result = indexer_manager - .get_block_stream_sync_status(&indexer_config) - .await - .unwrap(); - - assert_eq!(result, SyncStatus::Synced); - } - - #[tokio::test] - pub async fn new_block_stream() { - let indexer_config = IndexerConfig { - account_id: "morgs.near".parse().unwrap(), - function_name: "test".to_string(), - code: String::new(), - schema: String::new(), - rule: Rule::ActionAny { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any, - }, - created_at_block_height: 1, - updated_at_block_height: None, - start_block: StartBlock::Continue, - }; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_get_indexer_state() - .with(predicate::eq(indexer_config.clone())) - .returning(|_| Ok(None)); - - let indexer_manager = IndexerStateManagerImpl::new(redis_client); - let result = indexer_manager - .get_block_stream_sync_status(&indexer_config) - .await - .unwrap(); - - assert_eq!(result, SyncStatus::New); + indexer_manager.migrate(®istry).await.unwrap(); } #[tokio::test] @@ -625,18 +284,15 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "block_stream_synced_at": 123, "enabled": true }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true }) .to_string(), )) }); redis_client .expect_set_indexer_state() .with( - predicate::eq(indexer_config.clone()), - predicate::eq( - serde_json::json!({ "block_stream_synced_at":123, "enabled": false }) - .to_string(), - ), + predicate::always(), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false}".to_string()), ) .returning(|_, _| Ok(())) .once(); @@ -644,7 +300,7 @@ mod tests { let indexer_manager = IndexerStateManagerImpl::new(redis_client); indexer_manager - .set_enabled(&indexer_config.into(), false) + .set_enabled(&indexer_config, false) .await .unwrap(); } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index f19d5e5e8..676d04196 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -2,26 +2,33 @@ use std::sync::Arc; use std::time::Duration; use near_primitives::types::AccountId; -use tokio::time::sleep; use tracing_subscriber::prelude::*; -use crate::block_streams::{synchronise_block_streams, BlockStreamsHandler}; -use crate::executors::{synchronise_executors, ExecutorsHandler}; +use crate::block_streams_handler::BlockStreamsHandler; +use crate::executors_handler::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; use crate::redis::RedisClient; use crate::registry::Registry; +use crate::synchroniser::Synchroniser; -mod block_streams; -mod executors; +mod block_streams_handler; +mod executors_handler; mod indexer_config; mod indexer_state; mod redis; mod registry; mod server; +mod synchroniser; mod utils; const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); +async fn sleep(duration: Duration) -> anyhow::Result<()> { + tokio::time::sleep(duration).await; + + Ok(()) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() @@ -54,6 +61,13 @@ async fn main() -> anyhow::Result<()> { let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &indexer_state_manager, + &redis_client, + ); tokio::spawn({ let indexer_state_manager = indexer_state_manager.clone(); @@ -61,29 +75,10 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); - loop { - let indexer_registry = registry.fetch().await?; - - indexer_state_manager - .migrate_state_if_needed(&indexer_registry) - .await?; + let indexer_registry = registry.fetch().await?; + indexer_state_manager.migrate(&indexer_registry).await?; - let indexer_registry = indexer_state_manager - .filter_disabled_indexers(&indexer_registry) - .await?; - - tokio::try_join!( - synchronise_executors(&indexer_registry, &executors_handler), - synchronise_block_streams( - &indexer_registry, - &indexer_state_manager, - &redis_client, - &block_streams_handler - ), - async { - sleep(CONTROL_LOOP_THROTTLE_SECONDS).await; - Ok(()) - } - )?; + loop { + tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; } } diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index e750776f5..74f536c70 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -5,7 +5,7 @@ use std::fmt::Debug; use anyhow::Context; use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs}; -use crate::indexer_config::IndexerConfig; +use crate::{indexer_config::IndexerConfig, indexer_state::IndexerState}; #[cfg(test)] pub use MockRedisClientImpl as RedisClient; @@ -18,6 +18,8 @@ pub struct RedisClientImpl { } impl RedisClientImpl { + const INDEXER_STATES_SET: &'static str = "indexer_states"; + pub async fn connect(redis_url: &str) -> anyhow::Result { let connection = redis::Client::open(redis_url)? .get_connection_manager() @@ -51,11 +53,12 @@ impl RedisClientImpl { { tracing::debug!("SET: {:?}, {:?}", key, value); - let mut cmd = redis::cmd("SET"); - cmd.arg(key).arg(value); - cmd.query_async(&mut self.connection.clone()).await?; - - Ok(()) + redis::cmd("SET") + .arg(&key) + .arg(&value) + .query_async(&mut self.connection.clone()) + .await + .context(format!("SET: {key:?} {value:?}")) } pub async fn del(&self, key: K) -> anyhow::Result<()> @@ -73,12 +76,64 @@ impl RedisClientImpl { Ok(()) } - pub async fn get_stream_version( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result> { - self.get::<_, u64>(indexer_config.get_redis_stream_version_key()) + pub async fn smembers(&self, set: S) -> anyhow::Result> + where + S: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SMEMBERS {set:?}"); + + redis::cmd("SMEMBERS") + .arg(&set) + .query_async(&mut self.connection.clone()) .await + .context(format!("SMEMBERS {set:?}")) + } + + pub async fn sadd(&self, set: S, member: M) -> anyhow::Result<()> + where + S: ToRedisArgs + Debug + Send + Sync + 'static, + M: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SADD {set:?} {member:?}"); + + redis::cmd("SADD") + .arg(&set) + .arg(&member) + .query_async(&mut self.connection.clone()) + .await + .context(format!("SADD {set:?} {member:?}")) + } + + pub async fn srem(&self, set: S, member: M) -> anyhow::Result<()> + where + S: ToRedisArgs + Debug + Send + Sync + 'static, + M: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("SADD {set:?} {member:?}"); + + redis::cmd("SREM") + .arg(&set) + .arg(&member) + .query_async(&mut self.connection.clone()) + .await + .context(format!("SADD {set:?} {member:?}")) + } + + pub async fn exists(&self, key: K) -> anyhow::Result + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("EXISTS {key:?}"); + + redis::cmd("EXISTS") + .arg(&key) + .query_async(&mut self.connection.clone()) + .await + .context(format!("EXISTS {key:?}")) + } + + pub async fn indexer_states_set_exists(&self) -> anyhow::Result { + self.exists(Self::INDEXER_STATES_SET).await } pub async fn get_last_published_block( @@ -90,7 +145,10 @@ impl RedisClientImpl { } pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - self.del(indexer_config.get_redis_stream_key()).await + let stream_key = indexer_config.get_redis_stream_key(); + self.del(stream_key.clone()) + .await + .context(format!("Failed to clear Redis Stream: {}", stream_key)) } pub async fn get_indexer_state( @@ -105,15 +163,37 @@ impl RedisClientImpl { indexer_config: &IndexerConfig, state: String, ) -> anyhow::Result<()> { - self.set(indexer_config.get_state_key(), state).await + self.set(indexer_config.get_state_key(), state).await?; + + self.sadd(Self::INDEXER_STATES_SET, indexer_config.get_state_key()) + .await } - pub async fn set_migration_complete(&self) -> anyhow::Result<()> { - self.set("indexer_manager_migration_complete", true).await + pub async fn delete_indexer_state(&self, state: &IndexerState) -> anyhow::Result<()> { + self.del(state.get_state_key()).await?; + + self.srem(Self::INDEXER_STATES_SET, state.get_state_key()) + .await } - pub async fn is_migration_complete(&self) -> anyhow::Result> { - self.get("indexer_manager_migration_complete").await + pub async fn list_indexer_states(&self) -> anyhow::Result> { + let mut states = vec![]; + + for state_key in self.smembers(Self::INDEXER_STATES_SET).await? { + let state = self.get(state_key.clone()).await?; + + if state.is_none() { + anyhow::bail!( + "Key: {} from Set: {} set, does not exist", + state_key, + Self::INDEXER_STATES_SET + ); + } + + states.push(state.unwrap()); + } + + Ok(states) } } @@ -130,11 +210,6 @@ mockall::mock! { state: String, ) -> anyhow::Result<()>; - pub async fn get_stream_version( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result>; - pub async fn get_last_published_block( &self, indexer_config: &IndexerConfig, @@ -142,10 +217,6 @@ mockall::mock! { pub async fn clear_block_stream(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()>; - pub async fn set_migration_complete(&self) -> anyhow::Result<()>; - - pub async fn is_migration_complete(&self) -> anyhow::Result>; - pub async fn get(&self, key: T) -> anyhow::Result> where T: ToRedisArgs + Debug + Send + Sync + 'static, @@ -155,6 +226,17 @@ mockall::mock! { where K: ToRedisArgs + Debug + Send + Sync + 'static, V: ToRedisArgs + Debug + Send + Sync + 'static; + + pub async fn indexer_states_set_exists(&self) -> anyhow::Result; + + pub async fn sadd(&self, set: S, value: V) -> anyhow::Result<()> + where + S: ToRedisArgs + Debug + Send + Sync + 'static, + V: ToRedisArgs + Debug + Send + Sync + 'static; + + pub async fn list_indexer_states(&self) -> anyhow::Result>; + + pub async fn delete_indexer_state(&self, state: &IndexerState) -> anyhow::Result<()>; } impl Clone for RedisClientImpl { diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index 8b09d8b10..64175d58c 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -14,6 +14,7 @@ use registry_types::AllIndexers; use crate::indexer_config::IndexerConfig; use crate::utils::exponential_retry; +#[derive(Clone)] pub struct IndexerRegistry(pub HashMap>); impl IndexerRegistry { @@ -32,6 +33,14 @@ impl IndexerRegistry { function_iter: None, } } + + pub fn get(&self, account_id: &AccountId, function_name: &str) -> Option<&IndexerConfig> { + self.0.get(account_id)?.get(function_name) + } + + pub fn remove(&mut self, account_id: &AccountId, function_name: &str) -> Option { + self.0.get_mut(account_id)?.remove(function_name) + } } pub struct IndexerRegistryIter<'a> { @@ -60,6 +69,12 @@ impl<'a> Iterator for IndexerRegistryIter<'a> { } } +impl std::ops::DerefMut for IndexerRegistry { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + impl std::ops::Deref for IndexerRegistry { type Target = HashMap>; fn deref(&self) -> &Self::Target { diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs new file mode 100644 index 000000000..4e47c1f04 --- /dev/null +++ b/coordinator/src/synchroniser.rs @@ -0,0 +1,1197 @@ +use registry_types::StartBlock; +use tracing::instrument; + +use crate::{ + block_streams_handler::{BlockStreamsHandler, StreamInfo}, + executors_handler::{ExecutorInfo, ExecutorsHandler}, + indexer_config::IndexerConfig, + indexer_state::{IndexerState, IndexerStateManager}, + redis::RedisClient, + registry::Registry, +}; + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum SynchronisationState { + New(IndexerConfig), + Existing( + IndexerConfig, + IndexerState, + Option, + Option, + ), + Deleted(IndexerState, Option, Option), +} + +pub struct Synchroniser<'a> { + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, +} + +impl<'a> Synchroniser<'a> { + pub fn new( + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, + ) -> Self { + Self { + block_streams_handler, + executors_handler, + registry, + state_manager, + redis_client, + } + } + + async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let start_block = match config.start_block { + StartBlock::Height(height) => height, + StartBlock::Latest => config.get_registry_version(), + StartBlock::Continue => { + tracing::warn!( + "Attempted to start new Block Stream with CONTINUE, using LATEST instead" + ); + config.get_registry_version() + } + }; + + self.block_streams_handler.start(start_block, config).await + } + + #[instrument( + skip_all, + fields( + account_id = config.account_id.to_string(), + function_name = config.function_name, + version = config.get_registry_version() + ) + )] + async fn sync_new_indexer(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if let Err(err) = self.executors_handler.start(config).await { + tracing::error!(?err, "Failed to start Executor"); + return Ok(()); + } + + // FIX if this fails, then subsequent control loops will perpetually fail since the + // above will error with ALREADY_EXISTS + if let Err(err) = self.start_new_block_stream(config).await { + tracing::error!(?err, "Failed to start Block Stream"); + return Ok(()); + } + + self.state_manager.set_synced(config).await?; + + Ok(()) + } + + async fn sync_existing_executor( + &self, + config: &IndexerConfig, + executor: Option<&ExecutorInfo>, + ) -> anyhow::Result<()> { + if let Some(executor) = executor { + if executor.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!("Stopping outdated executor"); + + self.executors_handler + .stop(executor.executor_id.clone()) + .await?; + } + + tracing::info!("Starting executor"); + + self.executors_handler.start(config).await?; + + Ok(()) + } + + async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result { + let height = self + .redis_client + .get_last_published_block(config) + .await? + .map(|height| height + 1) + .unwrap_or_else(|| { + tracing::warn!( + "Failed to get continuation block height, using registry version instead" + ); + + config.get_registry_version() + }); + + Ok(height) + } + + async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if matches!( + config.start_block, + StartBlock::Latest | StartBlock::Height(..) + ) { + self.redis_client.clear_block_stream(config).await?; + } + + let height = match config.start_block { + StartBlock::Latest => config.get_registry_version(), + StartBlock::Height(height) => height, + StartBlock::Continue => self.get_continuation_block_height(config).await?, + }; + + tracing::info!(height, "Starting block stream"); + + self.block_streams_handler.start(height, config).await?; + + Ok(()) + } + + async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = self.get_continuation_block_height(config).await?; + + tracing::info!(height, "Resuming block stream"); + + self.block_streams_handler.start(height, config).await?; + + Ok(()) + } + + async fn sync_existing_block_stream( + &self, + config: &IndexerConfig, + state: &IndexerState, + block_stream: Option<&StreamInfo>, + ) -> anyhow::Result<()> { + if let Some(block_stream) = block_stream { + if block_stream.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = block_stream.version, + "Stopping outdated block stream" + ); + + self.block_streams_handler + .stop(block_stream.stream_id.clone()) + .await?; + + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + if state.block_stream_synced_at.is_none() { + // NOTE: A value of `None` would suggest that `state` was created before initialisation, + // which is currently not possible, but may be in future + tracing::warn!("Existing block stream has no previous sync state, treating as new"); + + self.start_new_block_stream(config).await?; + + return Ok(()); + } + + if state.block_stream_synced_at.unwrap() != config.get_registry_version() { + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + self.resume_block_stream(config).await?; + + Ok(()) + } + + #[instrument( + skip_all, + fields( + account_id = config.account_id.to_string(), + function_name = config.function_name, + version = config.get_registry_version() + ) + )] + async fn sync_existing_indexer( + &self, + config: &IndexerConfig, + state: &IndexerState, + executor: Option<&ExecutorInfo>, + block_stream: Option<&StreamInfo>, + ) -> anyhow::Result<()> { + if !state.enabled { + if let Some(executor) = executor { + self.executors_handler + .stop(executor.executor_id.clone()) + .await?; + } + + if let Some(block_stream) = block_stream { + self.block_streams_handler + .stop(block_stream.stream_id.clone()) + .await?; + } + + return Ok(()); + } + + if let Err(error) = self.sync_existing_executor(config, executor).await { + tracing::error!(?error, "Failed to sync executor"); + return Ok(()); + } + + if let Err(error) = self + .sync_existing_block_stream(config, state, block_stream) + .await + { + tracing::error!(?error, "Failed to sync block stream"); + return Ok(()); + } + + self.state_manager.set_synced(config).await?; + + Ok(()) + } + + #[instrument( + skip_all, + fields( + account_id = state.account_id.to_string(), + function_name = state.function_name + ) + )] + async fn sync_deleted_indexer( + &self, + state: &IndexerState, + executor: Option<&ExecutorInfo>, + block_stream: Option<&StreamInfo>, + ) -> anyhow::Result<()> { + if let Some(executor) = executor { + tracing::info!("Stopping executor"); + + self.executors_handler + .stop(executor.executor_id.clone()) + .await?; + } + + if let Some(block_stream) = block_stream { + tracing::info!("Stopping block stream"); + + self.block_streams_handler + .stop(block_stream.stream_id.clone()) + .await?; + } + + self.state_manager.delete_state(state).await?; + + Ok(()) + } + + async fn generate_synchronisation_states(&self) -> anyhow::Result> { + let states = self.state_manager.list().await?; + let executors = self.executors_handler.list().await?; + let block_streams = self.block_streams_handler.list().await?; + let mut registry = self.registry.fetch().await?; + + let mut sync_states = vec![]; + + for state in states { + let config = registry.remove(&state.account_id, &state.function_name); + let executor = executors.iter().find(|executor| { + executor.account_id == state.account_id + && executor.function_name == state.function_name + }); + let block_stream = block_streams.iter().find(|block_stream| { + block_stream.account_id == state.account_id + && block_stream.function_name == state.function_name + }); + + if let Some(config) = config { + sync_states.push(SynchronisationState::Existing( + config, + state, + executor.cloned(), + block_stream.cloned(), + )) + } else { + sync_states.push(SynchronisationState::Deleted( + state, + executor.cloned(), + block_stream.cloned(), + )) + } + } + + for config in registry.iter() { + sync_states.push(SynchronisationState::New(config.clone())); + } + + Ok(sync_states) + } + + pub async fn sync(&self) -> anyhow::Result<()> { + let sync_states = self.generate_synchronisation_states().await?; + + for sync_state in sync_states { + match sync_state { + SynchronisationState::New(config) => { + self.sync_new_indexer(&config).await?; + } + SynchronisationState::Existing(config, state, executor, block_stream) => { + self.sync_existing_indexer( + &config, + &state, + executor.as_ref(), + block_stream.as_ref(), + ) + .await?; + } + SynchronisationState::Deleted(state, executor, block_stream) => { + self.sync_deleted_indexer(&state, executor.as_ref(), block_stream.as_ref()) + .await?; + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use mockall::predicate::*; + use std::collections::HashMap; + + use crate::registry::IndexerRegistry; + + #[tokio::test] + async fn generates_sync_states() { + let existing_account_ids = vec![ + "account1.near".to_string(), + "account2.near".to_string(), + "account3.near".to_string(), + "account4.near".to_string(), + ]; + let new_account_ids = vec![ + "new_account1.near".to_string(), + "new_account2.near".to_string(), + ]; + let deleted_account_ids = vec![ + "deleted_account1.near".to_string(), + "deleted_account2.near".to_string(), + ]; + + let mut existing_indexer_configs: Vec = Vec::new(); + for (i, account_id) in existing_account_ids.iter().enumerate() { + for j in 1..=5 { + existing_indexer_configs.push(IndexerConfig { + account_id: account_id.parse().unwrap(), + function_name: format!("existing_indexer{}_{}", i + 1, j), + ..Default::default() + }); + } + } + + let mut new_indexer_configs: Vec = Vec::new(); + for (i, account_id) in new_account_ids.iter().enumerate() { + for j in 1..=3 { + new_indexer_configs.push(IndexerConfig { + account_id: account_id.parse().unwrap(), + function_name: format!("new_indexer{}_{}", i + 1, j), + ..Default::default() + }); + } + } + + let mut deleted_indexer_configs: Vec = Vec::new(); + for (i, account_id) in deleted_account_ids.iter().enumerate() { + for j in 1..=2 { + deleted_indexer_configs.push(IndexerConfig { + account_id: account_id.parse().unwrap(), + function_name: format!("deleted_indexer{}_{}", i + 1, j), + ..Default::default() + }); + } + } + + let mut indexer_registry = IndexerRegistry::new(); + for indexer in existing_indexer_configs + .iter() + .chain(new_indexer_configs.iter()) + { + indexer_registry + .entry(indexer.account_id.clone()) + .or_default() + .insert(indexer.function_name.clone(), indexer.clone()); + } + + let mut block_streams_handler = BlockStreamsHandler::default(); + let block_streams: Vec = existing_indexer_configs + .iter() + // generate some "randomness" + .rev() + .enumerate() + .map(|(i, indexer)| StreamInfo { + stream_id: format!("stream_id{}", i + 1), + account_id: indexer.account_id.to_string(), + function_name: indexer.function_name.clone(), + version: indexer.get_registry_version(), + }) + .collect(); + block_streams_handler + .expect_list() + .returning(move || Ok(block_streams.clone())); + + let mut executors_handler = ExecutorsHandler::default(); + let executors: Vec = existing_indexer_configs + .iter() + // generate some "randomness" + .rev() + .enumerate() + .map(|(i, indexer)| ExecutorInfo { + executor_id: format!("executor_id{}", i + 1), + account_id: indexer.account_id.to_string(), + function_name: indexer.function_name.clone(), + version: indexer.get_registry_version(), + status: "running".to_string(), + }) + .collect(); + + executors_handler + .expect_list() + .returning(move || Ok(executors.clone())); + + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); + + let mut state_manager = IndexerStateManager::default(); + let states: Vec = existing_indexer_configs + .iter() + .map(|indexer| IndexerState { + account_id: indexer.account_id.clone(), + function_name: indexer.function_name.clone(), + block_stream_synced_at: Some(indexer.get_registry_version()), + enabled: true, + }) + .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { + account_id: indexer.account_id.clone(), + function_name: indexer.function_name.clone(), + block_stream_synced_at: Some(indexer.get_registry_version()), + enabled: true, + })) + .collect(); + state_manager + .expect_list() + .returning(move || Ok(states.clone())); + + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + let synchronisation_states = synchroniser + .generate_synchronisation_states() + .await + .unwrap(); + + let mut new_count = 0; + let mut existing_count = 0; + let mut deleted_count = 0; + + for state in &synchronisation_states { + match state { + SynchronisationState::New(_) => new_count += 1, + SynchronisationState::Existing(_, _, executor, block_stream) => { + assert!(executor.is_some(), "Executor should exist for the indexer"); + assert!( + block_stream.is_some(), + "Block stream should exist for the indexer" + ); + existing_count += 1; + } + SynchronisationState::Deleted(_, _, _) => { + deleted_count += 1; + } + } + } + + assert_eq!(new_count, 6); + assert_eq!(existing_count, 20); + assert_eq!(deleted_count, 4); + } + + mod new { + use super::*; + + #[tokio::test] + async fn start() { + let config1 = IndexerConfig::default(); + let config2 = IndexerConfig { + function_name: "test2".to_string(), + start_block: StartBlock::Latest, + ..Default::default() + }; + + let indexer_registry = IndexerRegistry::from(&[( + config1.account_id.clone(), + HashMap::from([ + (config1.function_name.clone(), config1.clone()), + (config2.function_name.clone(), config2.clone()), + ]), + )]); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler.expect_list().returning(|| Ok(vec![])); + block_streams_handler + .expect_start() + .with(eq(100), eq(config1.clone())) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with(eq(config2.get_registry_version()), eq(config2.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_list().returning(|| Ok(vec![])); + executors_handler + .expect_start() + .with(eq(config1.clone())) + .returning(|_| Ok(())) + .once(); + executors_handler + .expect_start() + .with(eq(config2.clone())) + .returning(|_| Ok(())) + .once(); + + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_list().returning(|| Ok(vec![])); + state_manager + .expect_set_synced() + .with(eq(config1)) + .returning(|_| Ok(())) + .once(); + state_manager + .expect_set_synced() + .with(eq(config2)) + .returning(|_| Ok(())) + .once(); + + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser.sync().await.unwrap(); + } + + #[tokio::test] + async fn configures_block_stream() { + let config_with_latest = IndexerConfig { + start_block: StartBlock::Latest, + ..IndexerConfig::default() + }; + let height = 5; + let config_with_height = IndexerConfig { + start_block: StartBlock::Height(height), + ..IndexerConfig::default() + }; + let config_with_continue = IndexerConfig { + start_block: StartBlock::Continue, + ..IndexerConfig::default() + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with( + eq(config_with_continue.get_registry_version()), + eq(config_with_continue.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with( + eq(config_with_latest.get_registry_version()), + eq(config_with_latest.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with(eq(height), eq(config_with_height.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config_with_continue.clone())) + .returning(|_| Ok(())) + .once(); + state_manager + .expect_set_synced() + .with(eq(config_with_latest.clone())) + .returning(|_| Ok(())) + .once(); + state_manager + .expect_set_synced() + .with(eq(config_with_height.clone())) + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_start() + .returning(|_| Ok(())) + .times(3); + + let redis_client = RedisClient::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_new_indexer(&config_with_latest) + .await + .unwrap(); + synchroniser + .sync_new_indexer(&config_with_height) + .await + .unwrap(); + synchroniser + .sync_new_indexer(&config_with_continue) + .await + .unwrap(); + } + } + + mod existing { + use super::*; + + #[tokio::test] + async fn ignores_synced() { + let config = IndexerConfig::default(); + + let indexer_registry = IndexerRegistry::from(&[( + config.account_id.clone(), + HashMap::from([(config.function_name.clone(), config.clone())]), + )]); + + let mut block_streams_handler = BlockStreamsHandler::default(); + let config_clone = config.clone(); + block_streams_handler.expect_list().returning(move || { + Ok(vec![StreamInfo { + stream_id: config_clone.get_redis_stream_key(), + account_id: config_clone.account_id.to_string(), + function_name: config_clone.function_name.clone(), + version: config_clone.get_registry_version(), + }]) + }); + block_streams_handler.expect_stop().never(); + block_streams_handler.expect_start().never(); + + let mut executors_handler = ExecutorsHandler::default(); + let config_clone = config.clone(); + executors_handler.expect_list().returning(move || { + Ok(vec![ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: config_clone.account_id.to_string(), + function_name: config_clone.function_name.clone(), + version: config_clone.get_registry_version(), + status: "running".to_string(), + }]) + }); + executors_handler.expect_stop().never(); + executors_handler.expect_start().never(); + + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + state_manager.expect_list().returning(move || { + Ok(vec![IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: true, + }]) + }); + + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser.sync().await.unwrap(); + } + + #[tokio::test] + async fn restarts_outdated() { + let config = IndexerConfig::default(); + + let indexer_registry = IndexerRegistry::from(&[( + config.account_id.clone(), + HashMap::from([(config.function_name.clone(), config.clone())]), + )]); + + let mut block_streams_handler = BlockStreamsHandler::default(); + let config_clone = config.clone(); + block_streams_handler.expect_list().returning(move || { + Ok(vec![StreamInfo { + stream_id: "stream_id".to_string(), + account_id: config_clone.account_id.to_string(), + function_name: config_clone.function_name.clone(), + version: config_clone.get_registry_version() + 1, + }]) + }); + block_streams_handler + .expect_stop() + .with(eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with(eq(100), eq(config.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + let config_clone = config.clone(); + executors_handler.expect_list().returning(move || { + Ok(vec![ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: config_clone.account_id.to_string(), + function_name: config_clone.function_name.clone(), + version: config_clone.get_registry_version() + 1, + status: "running".to_string(), + }]) + }); + executors_handler + .expect_stop() + .with(eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + executors_handler + .expect_start() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_clear_block_stream() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + state_manager.expect_list().returning(move || { + Ok(vec![IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: true, + }]) + }); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser.sync().await.unwrap(); + } + + #[tokio::test] + async fn treats_unsynced_blocks_streams_as_new() { + let config = IndexerConfig::default(); + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: None, + enabled: true, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with(eq(100), eq(config.clone())) + .returning(|_, _| Ok(())) + .once(); + + let redis_client = RedisClient::default(); + let state_manager = IndexerStateManager::default(); + let executors_handler = ExecutorsHandler::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_existing_block_stream(&config, &state, None) + .await + .unwrap(); + } + + #[tokio::test] + async fn restarts_stopped_and_outdated_block_stream() { + let config = IndexerConfig::default(); + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version() - 1), + enabled: true, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with(eq(100), eq(config.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_clear_block_stream() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + + let state_manager = IndexerStateManager::default(); + let executors_handler = ExecutorsHandler::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_existing_block_stream(&config, &state, None) + .await + .unwrap(); + } + + #[tokio::test] + async fn resumes_stopped_and_synced_block_stream() { + let config = IndexerConfig::default(); + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: true, + }; + + let last_published_block = 1; + + let mut redis_client = RedisClient::default(); + redis_client.expect_clear_block_stream().never(); + redis_client + .expect_get_last_published_block() + .with(eq(config.clone())) + .returning(move |_| Ok(Some(last_published_block))); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with(eq(last_published_block + 1), eq(config.clone())) + .returning(|_, _| Ok(())) + .once(); + + let state_manager = IndexerStateManager::default(); + let executors_handler = ExecutorsHandler::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_existing_block_stream(&config, &state, None) + .await + .unwrap(); + } + + #[tokio::test] + async fn reconfigures_block_stream() { + let config_with_latest = IndexerConfig { + start_block: StartBlock::Latest, + ..IndexerConfig::default() + }; + let height = 5; + let config_with_height = IndexerConfig { + start_block: StartBlock::Height(height), + ..IndexerConfig::default() + }; + let last_published_block = 1; + let config_with_continue = IndexerConfig { + start_block: StartBlock::Continue, + ..IndexerConfig::default() + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_start() + .with( + eq(last_published_block + 1), + eq(config_with_continue.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with( + eq(config_with_latest.get_registry_version()), + eq(config_with_latest.clone()), + ) + .returning(|_, _| Ok(())) + .once(); + block_streams_handler + .expect_start() + .with(eq(height), eq(config_with_height.clone())) + .returning(|_, _| Ok(())) + .once(); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_clear_block_stream() + .with(eq(config_with_latest.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_clear_block_stream() + .with(eq(config_with_height.clone())) + .returning(|_| Ok(())) + .once(); + redis_client + .expect_get_last_published_block() + .with(eq(config_with_continue.clone())) + .returning(move |_| Ok(Some(last_published_block))); + + let state_manager = IndexerStateManager::default(); + let executors_handler = ExecutorsHandler::default(); + let registry = Registry::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .reconfigure_block_stream(&config_with_latest) + .await + .unwrap(); + synchroniser + .reconfigure_block_stream(&config_with_height) + .await + .unwrap(); + synchroniser + .reconfigure_block_stream(&config_with_continue) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_disabled_indexers() { + let config = IndexerConfig::default(); + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: false, + }; + let executor = ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: config.account_id.to_string(), + function_name: config.function_name.clone(), + version: config.get_registry_version(), + status: "running".to_string(), + }; + let block_stream = StreamInfo { + stream_id: "stream_id".to_string(), + account_id: config.account_id.to_string(), + function_name: config.function_name.clone(), + version: config.get_registry_version(), + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_stop() + .with(eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_stop() + .with(eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_synced() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .never(); + + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_existing_indexer(&config, &state, Some(&executor), Some(&block_stream)) + .await + .unwrap(); + // Simulate second run, start/stop etc should not be called + synchroniser + .sync_existing_indexer(&config, &state, None, None) + .await + .unwrap(); + } + } + + mod deleted { + use super::*; + + #[tokio::test] + async fn stops_and_deletes() { + let config = IndexerConfig::default(); + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: false, + }; + let executor = ExecutorInfo { + executor_id: "executor_id".to_string(), + account_id: config.account_id.to_string(), + function_name: config.function_name.clone(), + version: config.get_registry_version(), + status: "running".to_string(), + }; + let block_stream = StreamInfo { + stream_id: "stream_id".to_string(), + account_id: config.account_id.to_string(), + function_name: config.function_name.clone(), + version: config.get_registry_version(), + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_stop() + .with(eq("stream_id".to_string())) + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_stop() + .with(eq("executor_id".to_string())) + .returning(|_| Ok(())) + .once(); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_delete_state() + .with(eq(state.clone())) + .returning(|_| Ok(())) + .once(); + + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) + .await + .unwrap(); + } + } +}