From 973efd6c0a26685ac533af1e84d3b369a64dfd3e Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 20 Jun 2024 08:35:21 +1200 Subject: [PATCH] refactor: Move Data Layer provisioning from Runner to Coordinator (#805) This PR updates Coordinator to handle Data Layer provisioning, removing the implicit step from Runner. Provisioning itself is still completed within Runner, but Coordinator will trigger and monitor it. Functionally, provisioning is the same, but there are some subtle differences around how it is handled: - Provisioning will now happen as soon as the Indexer is registered, rather than when the first matched block is executed. - Block Streams/Executors will not be started until provisioning is successful, neither will start when either pending or failed. A `provisioned_state` enum has been added to the Indexer state within Redis. This is used to persist what stage of provisioning the Data Layer is at, as well as ensuring we only provision once. ## Concerns with current implementation Overall, I'm not happy with the implementation here, but feel it is the best we can do given the current structure of Coordinator. As to not block this feature I decided to go forward with this approach, and will create a ticket to refactor/update later. Provisioning is triggered within the "new" handler, and then polled within the "existing" handler, which seems a little awkward. The separated handling is necessary since no operation within the control loop (i.e. `Synchroniser`) should block, as that would block synchronisation for all other Indexers. So we need to trigger the provisioning initially, and then poll the completion each subsequent control loop. I feel we have outgrown the current control loop, and am planning to refactor later. Rather than have a single control loop for _all_ Indexers, I'm thinking we can have dedicated loops for each of them. We could spawn a new task for each Indexer, which then manages its own lifecycle. Then each Indexer is free to wait for as long as it wants, without impacting other Indexers. This would allow us to handle the blocking provisioning step much more elegantly. --- .../block_streams.rs} | 0 coordinator/src/handlers/data_layer.rs | 96 ++++++ .../executors.rs} | 0 coordinator/src/handlers/mod.rs | 3 + coordinator/src/indexer_state.rs | 188 ++++------- coordinator/src/main.rs | 13 +- coordinator/src/synchroniser.rs | 312 +++++++++++------- runner-client/build.rs | 1 + .../check_provisioning_task_status.rs | 20 ++ .../examples/start_provisioning_task.rs | 21 ++ runner-client/proto/data-layer.proto | 33 ++ runner-client/src/lib.rs | 4 + runner/protos/data-layer.proto | 4 +- .../__snapshots__/indexer.test.ts.snap | 2 - runner/src/indexer/indexer.test.ts | 137 -------- runner/src/indexer/indexer.ts | 15 - .../data-layer/data-layer-service.test.ts | 18 +- .../services/data-layer/data-layer-service.ts | 10 +- runner/tests/integration.test.ts | 6 +- 19 files changed, 461 insertions(+), 422 deletions(-) rename coordinator/src/{block_streams_handler.rs => handlers/block_streams.rs} (100%) create mode 100644 coordinator/src/handlers/data_layer.rs rename coordinator/src/{executors_handler.rs => handlers/executors.rs} (100%) create mode 100644 coordinator/src/handlers/mod.rs create mode 100644 runner-client/examples/check_provisioning_task_status.rs create mode 100644 runner-client/examples/start_provisioning_task.rs create mode 100644 runner-client/proto/data-layer.proto diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/handlers/block_streams.rs similarity index 100% rename from coordinator/src/block_streams_handler.rs rename to coordinator/src/handlers/block_streams.rs diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs new file mode 100644 index 000000000..7db624648 --- /dev/null +++ b/coordinator/src/handlers/data_layer.rs @@ -0,0 +1,96 @@ +#![cfg_attr(test, allow(dead_code))] + +pub use runner::data_layer::ProvisioningStatus; + +use anyhow::Context; +use runner::data_layer::data_layer_client::DataLayerClient; +use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest}; +use tonic::transport::channel::Channel; +use tonic::{Request, Status}; + +use crate::indexer_config::IndexerConfig; + +#[cfg(not(test))] +pub use DataLayerHandlerImpl as DataLayerHandler; +#[cfg(test)] +pub use MockDataLayerHandlerImpl as DataLayerHandler; + +pub struct DataLayerHandlerImpl { + client: DataLayerClient, +} + +#[cfg_attr(test, mockall::automock)] +impl DataLayerHandlerImpl { + pub fn from_env() -> anyhow::Result { + let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?; + Self::connect(&runner_url) + } + + pub fn connect(runner_url: &str) -> anyhow::Result { + let channel = Channel::from_shared(runner_url.to_string()) + .context("Runner URL is invalid")? + .connect_lazy(); + let client = DataLayerClient::new(channel); + + Ok(Self { client }) + } + + pub async fn start_provisioning_task( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result { + let request = ProvisionRequest { + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), + schema: indexer_config.schema.clone(), + }; + + let response = self + .client + .clone() + .start_provisioning_task(Request::new(request)) + .await; + + if let Err(error) = response { + if error.code() == tonic::Code::AlreadyExists { + return Ok(ProvisioningStatus::Pending); + } + + return Err(error.into()); + } + + let status = match response.unwrap().into_inner().status { + 1 => ProvisioningStatus::Pending, + 2 => ProvisioningStatus::Complete, + 3 => ProvisioningStatus::Failed, + _ => ProvisioningStatus::Unspecified, + }; + + Ok(status) + } + + pub async fn check_provisioning_task_status( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result { + let request = CheckProvisioningTaskStatusRequest { + account_id: indexer_config.account_id.to_string(), + function_name: indexer_config.function_name.clone(), + }; + + let response = self + .client + .clone() + .check_provisioning_task_status(Request::new(request)) + .await?; + + let status = match response.into_inner().status { + 1 => ProvisioningStatus::Pending, + 2 => ProvisioningStatus::Complete, + 3 => ProvisioningStatus::Failed, + _ => ProvisioningStatus::Unspecified, + }; + + Ok(status) + } +} diff --git a/coordinator/src/executors_handler.rs b/coordinator/src/handlers/executors.rs similarity index 100% rename from coordinator/src/executors_handler.rs rename to coordinator/src/handlers/executors.rs diff --git a/coordinator/src/handlers/mod.rs b/coordinator/src/handlers/mod.rs new file mode 100644 index 000000000..ecabefea7 --- /dev/null +++ b/coordinator/src/handlers/mod.rs @@ -0,0 +1,3 @@ +pub mod block_streams; +pub mod data_layer; +pub mod executors; diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 431fb3345..ce100354c 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -2,14 +2,22 @@ use anyhow::Context; use near_primitives::types::AccountId; -use std::str::FromStr; use crate::indexer_config::IndexerConfig; use crate::redis::RedisClient; -use crate::registry::IndexerRegistry; -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum ProvisionedState { + Unprovisioned, + Provisioning, + Provisioned, + Failed, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct OldIndexerState { + pub account_id: AccountId, + pub function_name: String, pub block_stream_synced_at: Option, pub enabled: bool, } @@ -20,6 +28,7 @@ pub struct IndexerState { pub function_name: String, pub block_stream_synced_at: Option, pub enabled: bool, + pub provisioned_state: ProvisionedState, } impl IndexerState { @@ -31,17 +40,6 @@ impl IndexerState { } } -impl Default for IndexerState { - fn default() -> Self { - Self { - account_id: AccountId::from_str("morgs.near").unwrap(), - function_name: String::new(), - block_stream_synced_at: None, - enabled: true, - } - } -} - #[cfg(not(test))] pub use IndexerStateManagerImpl as IndexerStateManager; #[cfg(test)] @@ -57,41 +55,36 @@ impl IndexerStateManagerImpl { Self { redis_client } } - pub async fn migrate(&self, registry: &IndexerRegistry) -> anyhow::Result<()> { - if self.redis_client.indexer_states_set_exists().await? { - return Ok(()); - } + pub async fn migrate(&self) -> anyhow::Result<()> { + let raw_states = self.redis_client.list_indexer_states().await?; + + for raw_state in raw_states { + if let Ok(state) = serde_json::from_str::(&raw_state) { + tracing::info!( + "{}/{} already migrated, skipping", + state.account_id, + state.function_name + ); + continue; + } + + tracing::info!("Migrating {}", raw_state); - tracing::info!("Migrating indexer state"); - - for config in registry.iter() { - let raw_state = self.redis_client.get_indexer_state(config).await?; - - let state = if let Some(raw_state) = raw_state { - let old_state: OldIndexerState = - serde_json::from_str(&raw_state).context(format!( - "Failed to deserialize OldIndexerState for {}", - config.get_full_name() - ))?; - - IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: old_state.block_stream_synced_at, - enabled: old_state.enabled, - } - } else { - self.get_default_state(config) + let old_state: IndexerState = serde_json::from_str(&raw_state)?; + + let state = IndexerState { + account_id: old_state.account_id, + function_name: old_state.function_name, + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + provisioned_state: ProvisionedState::Provisioned, }; - self.set_state(config, state).await.context(format!( - "Failed to set state for {}", - config.get_full_name() - ))?; + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; } - tracing::info!("Migration complete"); - Ok(()) } @@ -101,6 +94,7 @@ impl IndexerStateManagerImpl { function_name: indexer_config.function_name.clone(), block_stream_synced_at: None, enabled: true, + provisioned_state: ProvisionedState::Unprovisioned, } } @@ -140,6 +134,38 @@ impl IndexerStateManagerImpl { Ok(()) } + pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + let mut indexer_state = self.get_state(indexer_config).await?; + + indexer_state.provisioned_state = ProvisionedState::Provisioning; + + self.set_state(indexer_config, indexer_state).await?; + + Ok(()) + } + + pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + let mut indexer_state = self.get_state(indexer_config).await?; + + indexer_state.provisioned_state = ProvisionedState::Provisioned; + + self.set_state(indexer_config, indexer_state).await?; + + Ok(()) + } + pub async fn set_provisioning_failure( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result<()> { + let mut indexer_state = self.get_state(indexer_config).await?; + + indexer_state.provisioned_state = ProvisionedState::Failed; + + self.set_state(indexer_config, indexer_state).await?; + + Ok(()) + } + pub async fn set_enabled( &self, indexer_config: &IndexerConfig, @@ -173,8 +199,6 @@ impl IndexerStateManagerImpl { mod tests { use super::*; - use std::collections::HashMap; - use mockall::predicate; use registry_types::{Rule, StartBlock, Status}; @@ -183,7 +207,7 @@ mod tests { let mut mock_redis_client = RedisClient::default(); mock_redis_client .expect_list_indexer_states() - .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true }).to_string()])) + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()])) .once(); mock_redis_client .expect_list_indexer_states() @@ -196,72 +220,6 @@ mod tests { assert!(indexer_manager.list().await.is_err()); } - #[tokio::test] - async fn migrate() { - let config1 = IndexerConfig::default(); - let config2 = IndexerConfig { - account_id: "darunrs.near".parse().unwrap(), - function_name: "test".to_string(), - ..Default::default() - }; - - let registry = IndexerRegistry::from(&[ - ( - "morgs.near".parse().unwrap(), - HashMap::from([("test".to_string(), config1.clone())]), - ), - ( - "darunrs.near".parse().unwrap(), - HashMap::from([("test".to_string(), config2.clone())]), - ), - ]); - - let mut mock_redis_client = RedisClient::default(); - mock_redis_client - .expect_indexer_states_set_exists() - .returning(|| Ok(false)) - .once(); - mock_redis_client - .expect_get_indexer_state() - .with(predicate::eq(config1.clone())) - .returning(|_| { - Ok(Some( - serde_json::json!({ "block_stream_synced_at": 200, "enabled": false }) - .to_string(), - )) - }) - .once(); - mock_redis_client - .expect_get_indexer_state() - .with(predicate::eq(config2.clone())) - .returning(|_| Ok(None)) - .once(); - mock_redis_client - .expect_set_indexer_state() - .with( - predicate::eq(config1), - predicate::eq( - "{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":200,\"enabled\":false}".to_string(), - ), - ) - .returning(|_, _| Ok(())) - .once(); - mock_redis_client - .expect_set_indexer_state() - .with( - predicate::eq(config2), - predicate::eq( - "{\"account_id\":\"darunrs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":null,\"enabled\":true}".to_string() - ), - ) - .returning(|_, _| Ok(())) - .once(); - - let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client); - - indexer_manager.migrate(®istry).await.unwrap(); - } - #[tokio::test] pub async fn disable_indexer() { let indexer_config = IndexerConfig { @@ -284,7 +242,7 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" }) .to_string(), )) }); @@ -292,7 +250,7 @@ mod tests { .expect_set_indexer_state() .with( predicate::always(), - predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false}".to_string()), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()), ) .returning(|_, _| Ok(())) .once(); diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 676d04196..a437e4187 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -4,15 +4,15 @@ use std::time::Duration; use near_primitives::types::AccountId; use tracing_subscriber::prelude::*; -use crate::block_streams_handler::BlockStreamsHandler; -use crate::executors_handler::ExecutorsHandler; +use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::data_layer::DataLayerHandler; +use crate::handlers::executors::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; use crate::redis::RedisClient; use crate::registry::Registry; use crate::synchroniser::Synchroniser; -mod block_streams_handler; -mod executors_handler; +mod handlers; mod indexer_config; mod indexer_state; mod redis; @@ -60,10 +60,12 @@ async fn main() -> anyhow::Result<()> { let redis_client = RedisClient::connect(&redis_url).await?; let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; + let data_layer_handler = DataLayerHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &indexer_state_manager, &redis_client, @@ -75,8 +77,7 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); - let indexer_registry = registry.fetch().await?; - indexer_state_manager.migrate(&indexer_registry).await?; + indexer_state_manager.migrate().await?; loop { tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index b18185edd..ec200e10c 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -2,10 +2,13 @@ use registry_types::StartBlock; use tracing::instrument; use crate::{ - block_streams_handler::{BlockStreamsHandler, StreamInfo}, - executors_handler::{ExecutorInfo, ExecutorsHandler}, + handlers::{ + block_streams::{BlockStreamsHandler, StreamInfo}, + data_layer::{DataLayerHandler, ProvisioningStatus}, + executors::{ExecutorInfo, ExecutorsHandler}, + }, indexer_config::IndexerConfig, - indexer_state::{IndexerState, IndexerStateManager}, + indexer_state::{IndexerState, IndexerStateManager, ProvisionedState}, redis::RedisClient, registry::Registry, }; @@ -26,6 +29,7 @@ pub enum SynchronisationState { pub struct Synchroniser<'a> { block_streams_handler: &'a BlockStreamsHandler, executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, registry: &'a Registry, state_manager: &'a IndexerStateManager, redis_client: &'a RedisClient, @@ -35,6 +39,7 @@ impl<'a> Synchroniser<'a> { pub fn new( block_streams_handler: &'a BlockStreamsHandler, executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, registry: &'a Registry, state_manager: &'a IndexerStateManager, redis_client: &'a RedisClient, @@ -42,6 +47,7 @@ impl<'a> Synchroniser<'a> { Self { block_streams_handler, executors_handler, + data_layer_handler, registry, state_manager, redis_client, @@ -49,7 +55,7 @@ impl<'a> Synchroniser<'a> { } async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { - let start_block = match config.start_block { + let height = match config.start_block { StartBlock::Height(height) => height, StartBlock::Latest => config.get_registry_version(), StartBlock::Continue => { @@ -60,7 +66,9 @@ impl<'a> Synchroniser<'a> { } }; - self.block_streams_handler.start(start_block, config).await + tracing::info!(height, "Starting block stream"); + + self.block_streams_handler.start(height, config).await } #[instrument( @@ -72,23 +80,11 @@ impl<'a> Synchroniser<'a> { ) )] async fn sync_new_indexer(&self, config: &IndexerConfig) -> anyhow::Result<()> { - tracing::info!("Starting executor"); - - if let Err(err) = self.executors_handler.start(config).await { - tracing::error!(?err, "Failed to start Executor"); - return Ok(()); - } - - tracing::info!("Starting block stream"); - - // FIX if this fails, then subsequent control loops will perpetually fail since the - // above will error with ALREADY_EXISTS - if let Err(err) = self.start_new_block_stream(config).await { - tracing::error!(?err, "Failed to start Block Stream"); - return Ok(()); - } + tracing::info!("Starting data layer provisioning"); - self.state_manager.set_synced(config).await?; + self.data_layer_handler + .start_provisioning_task(config) + .await?; Ok(()) } @@ -191,10 +187,6 @@ impl<'a> Synchroniser<'a> { } if state.block_stream_synced_at.is_none() { - // NOTE: A value of `None` would suggest that `state` was created before initialisation, - // which is currently not possible, but may be in future - tracing::warn!("Existing block stream has no previous sync state, treating as new"); - self.start_new_block_stream(config).await?; return Ok(()); @@ -211,6 +203,34 @@ impl<'a> Synchroniser<'a> { Ok(()) } + async fn handle_provisioning(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let task_status_result = self + .data_layer_handler + .check_provisioning_task_status(config) + .await; + + if let Err(error) = task_status_result { + tracing::warn!(?error, "Failed to check provisioning task status"); + + return Ok(()); + }; + + let _ = match task_status_result.unwrap() { + ProvisioningStatus::Complete => { + tracing::info!("Data layer provisioning complete"); + self.state_manager.set_provisioned(config).await + } + ProvisioningStatus::Pending => Ok(()), + _ => { + tracing::info!("Data layer provisioning failed"); + self.state_manager.set_provisioning_failure(config).await + } + } + .map_err(|err| tracing::warn!(?err, "Failed to set provisioning state")); + + Ok(()) + } + #[instrument( skip_all, fields( @@ -226,6 +246,15 @@ impl<'a> Synchroniser<'a> { executor: Option<&ExecutorInfo>, block_stream: Option<&StreamInfo>, ) -> anyhow::Result<()> { + match state.provisioned_state { + ProvisionedState::Unprovisioned | ProvisionedState::Provisioning => { + self.handle_provisioning(config).await?; + return Ok(()); + } + ProvisionedState::Failed => return Ok(()), + ProvisionedState::Provisioned => {} + } + if !state.enabled { if let Some(executor) = executor { self.executors_handler @@ -483,12 +512,14 @@ mod test { function_name: indexer.function_name.clone(), block_stream_synced_at: Some(indexer.get_registry_version()), enabled: true, + provisioned_state: ProvisionedState::Provisioned, }) .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { account_id: indexer.account_id.clone(), function_name: indexer.function_name.clone(), block_stream_synced_at: Some(indexer.get_registry_version()), enabled: true, + provisioned_state: ProvisionedState::Provisioned, })) .collect(); state_manager @@ -496,10 +527,12 @@ mod test { .returning(move || Ok(states.clone())); let redis_client = RedisClient::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -540,47 +573,19 @@ mod test { use super::*; #[tokio::test] - async fn start() { - let config1 = IndexerConfig::default(); - let config2 = IndexerConfig { - function_name: "test2".to_string(), - start_block: StartBlock::Latest, - ..Default::default() - }; + async fn triggers_data_layer_provisioning() { + let config = IndexerConfig::default(); let indexer_registry = IndexerRegistry::from(&[( - config1.account_id.clone(), - HashMap::from([ - (config1.function_name.clone(), config1.clone()), - (config2.function_name.clone(), config2.clone()), - ]), + config.account_id.clone(), + HashMap::from([(config.function_name.clone(), config.clone())]), )]); let mut block_streams_handler = BlockStreamsHandler::default(); block_streams_handler.expect_list().returning(|| Ok(vec![])); - block_streams_handler - .expect_start() - .with(eq(100), eq(config1.clone())) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(config2.get_registry_version()), eq(config2.clone())) - .returning(|_, _| Ok(())) - .once(); let mut executors_handler = ExecutorsHandler::default(); executors_handler.expect_list().returning(|| Ok(vec![])); - executors_handler - .expect_start() - .with(eq(config1.clone())) - .returning(|_| Ok(())) - .once(); - executors_handler - .expect_start() - .with(eq(config2.clone())) - .returning(|_| Ok(())) - .once(); let mut registry = Registry::default(); registry @@ -589,15 +594,12 @@ mod test { let mut state_manager = IndexerStateManager::default(); state_manager.expect_list().returning(|| Ok(vec![])); - state_manager - .expect_set_synced() - .with(eq(config1)) - .returning(|_| Ok(())) - .once(); - state_manager - .expect_set_synced() - .with(eq(config2)) - .returning(|_| Ok(())) + + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_start_provisioning_task() + .with(eq(config)) + .returning(|_| Ok(ProvisioningStatus::Pending)) .once(); let redis_client = RedisClient::default(); @@ -605,6 +607,7 @@ mod test { let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -612,97 +615,126 @@ mod test { synchroniser.sync().await.unwrap(); } + } + + mod existing { + use super::*; #[tokio::test] - async fn configures_block_stream() { - let config_with_latest = IndexerConfig { - start_block: StartBlock::Latest, - ..IndexerConfig::default() - }; - let height = 5; - let config_with_height = IndexerConfig { - start_block: StartBlock::Height(height), - ..IndexerConfig::default() - }; - let config_with_continue = IndexerConfig { - start_block: StartBlock::Continue, - ..IndexerConfig::default() + async fn waits_for_provisioning_to_complete() { + let config = IndexerConfig::default(); + + let indexer_registry = IndexerRegistry::from(&[( + config.account_id.clone(), + HashMap::from([(config.function_name.clone(), config.clone())]), + )]); + + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: true, + provisioned_state: ProvisionedState::Provisioning, }; - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with( - eq(config_with_continue.get_registry_version()), - eq(config_with_continue.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with( - eq(config_with_latest.get_registry_version()), - eq(config_with_latest.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(height), eq(config_with_height.clone())) - .returning(|_, _| Ok(())) - .once(); + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); let mut state_manager = IndexerStateManager::default(); state_manager - .expect_set_synced() - .with(eq(config_with_continue.clone())) - .returning(|_| Ok(())) - .once(); - state_manager - .expect_set_synced() - .with(eq(config_with_latest.clone())) - .returning(|_| Ok(())) - .once(); - state_manager - .expect_set_synced() - .with(eq(config_with_height.clone())) + .expect_set_provisioned() + .with(eq(config.clone())) .returning(|_| Ok(())) .once(); + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_check_provisioning_task_status() + .with(eq(config.clone())) + .returning(|_| Ok(ProvisioningStatus::Complete)); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler.expect_start().never(); + let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_start() - .returning(|_| Ok(())) - .times(3); + executors_handler.expect_start().never(); let redis_client = RedisClient::default(); - let registry = Registry::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, ); synchroniser - .sync_new_indexer(&config_with_latest) - .await - .unwrap(); - synchroniser - .sync_new_indexer(&config_with_height) + .sync_existing_indexer(&config, &state, None, None) .await .unwrap(); + } + + #[tokio::test] + async fn ignores_failed_provisioning() { + let config = IndexerConfig::default(); + + let indexer_registry = IndexerRegistry::from(&[( + config.account_id.clone(), + HashMap::from([(config.function_name.clone(), config.clone())]), + )]); + + let state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: true, + provisioned_state: ProvisionedState::Provisioning, + }; + + let mut registry = Registry::default(); + registry + .expect_fetch() + .returning(move || Ok(indexer_registry.clone())); + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_provisioning_failure() + .with(eq(config.clone())) + .returning(|_| Ok(())) + .once(); + + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_check_provisioning_task_status() + .with(eq(config.clone())) + .returning(|_| Ok(ProvisioningStatus::Failed)); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler.expect_start().never(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler.expect_start().never(); + + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + synchroniser - .sync_new_indexer(&config_with_continue) + .sync_existing_indexer(&config, &state, None, None) .await .unwrap(); } - } - - mod existing { - use super::*; #[tokio::test] async fn ignores_synced() { @@ -757,14 +789,17 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: true, + provisioned_state: ProvisionedState::Provisioned, }]) }); let redis_client = RedisClient::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -849,12 +884,16 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: true, + provisioned_state: ProvisionedState::Provisioned, }]) }); + let data_layer_handler = DataLayerHandler::default(); + let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -871,6 +910,7 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: None, enabled: true, + provisioned_state: ProvisionedState::Provisioned, }; let mut block_streams_handler = BlockStreamsHandler::default(); @@ -884,10 +924,12 @@ mod test { let state_manager = IndexerStateManager::default(); let executors_handler = ExecutorsHandler::default(); let registry = Registry::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -907,6 +949,7 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version() - 1), enabled: true, + provisioned_state: ProvisionedState::Provisioned, }; let mut block_streams_handler = BlockStreamsHandler::default(); @@ -926,10 +969,12 @@ mod test { let state_manager = IndexerStateManager::default(); let executors_handler = ExecutorsHandler::default(); let registry = Registry::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -949,6 +994,7 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: true, + provisioned_state: ProvisionedState::Provisioned, }; let last_published_block = 1; @@ -970,10 +1016,12 @@ mod test { let state_manager = IndexerStateManager::default(); let executors_handler = ExecutorsHandler::default(); let registry = Registry::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -1044,10 +1092,12 @@ mod test { let state_manager = IndexerStateManager::default(); let executors_handler = ExecutorsHandler::default(); let registry = Registry::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -1075,6 +1125,7 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: false, + provisioned_state: ProvisionedState::Provisioned, }; let executor = ExecutorInfo { executor_id: "executor_id".to_string(), @@ -1113,10 +1164,12 @@ mod test { let registry = Registry::default(); let redis_client = RedisClient::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, @@ -1145,6 +1198,7 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: false, + provisioned_state: ProvisionedState::Provisioned, }; let executor = ExecutorInfo { executor_id: "executor_id".to_string(), @@ -1183,10 +1237,12 @@ mod test { let registry = Registry::default(); let redis_client = RedisClient::default(); + let data_layer_handler = DataLayerHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, &executors_handler, + &data_layer_handler, ®istry, &state_manager, &redis_client, diff --git a/runner-client/build.rs b/runner-client/build.rs index 618200337..0ed89bc1c 100644 --- a/runner-client/build.rs +++ b/runner-client/build.rs @@ -1,5 +1,6 @@ fn main() -> Result<(), Box> { tonic_build::compile_protos("proto/runner.proto")?; + tonic_build::compile_protos("proto/data-layer.proto")?; Ok(()) } diff --git a/runner-client/examples/check_provisioning_task_status.rs b/runner-client/examples/check_provisioning_task_status.rs new file mode 100644 index 000000000..b5f6fc562 --- /dev/null +++ b/runner-client/examples/check_provisioning_task_status.rs @@ -0,0 +1,20 @@ +use tonic::Request; + +use runner::data_layer::data_layer_client::DataLayerClient; +use runner::data_layer::CheckProvisioningTaskStatusRequest; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = DataLayerClient::connect("http://localhost:7001").await?; + + let response = client + .check_provisioning_task_status(Request::new(CheckProvisioningTaskStatusRequest { + account_id: "morgs.near".to_string(), + function_name: "test2".to_string(), + })) + .await?; + + println!("{:#?}", response.into_inner()); + + Ok(()) +} diff --git a/runner-client/examples/start_provisioning_task.rs b/runner-client/examples/start_provisioning_task.rs new file mode 100644 index 000000000..24a938d15 --- /dev/null +++ b/runner-client/examples/start_provisioning_task.rs @@ -0,0 +1,21 @@ +use tonic::Request; + +use runner::data_layer::data_layer_client::DataLayerClient; +use runner::data_layer::ProvisionRequest; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = DataLayerClient::connect("http://localhost:7001").await?; + + let response = client + .start_provisioning_task(Request::new(ProvisionRequest { + account_id: "morgs.near".to_string(), + function_name: "test2".to_string(), + schema: "create table blocks();".to_string(), + })) + .await?; + + println!("{:#?}", response.into_inner()); + + Ok(()) +} diff --git a/runner-client/proto/data-layer.proto b/runner-client/proto/data-layer.proto new file mode 100644 index 000000000..909e3d28a --- /dev/null +++ b/runner-client/proto/data-layer.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package data_layer; + +service DataLayer { + // Starts async provisioning task + rpc StartProvisioningTask (ProvisionRequest) returns (ProvisionResponse); + + // Checks the provisioning status + rpc CheckProvisioningTaskStatus (CheckProvisioningTaskStatusRequest) returns (ProvisionResponse); +} + +message ProvisionRequest { + string account_id = 1; + string function_name = 2; + string schema = 3; +} + +message CheckProvisioningTaskStatusRequest { + string account_id = 1; + string function_name = 2; +} + +enum ProvisioningStatus { + UNSPECIFIED = 0; + PENDING = 1; + COMPLETE = 2; + FAILED = 3; +} + +message ProvisionResponse { + ProvisioningStatus status = 1; +} diff --git a/runner-client/src/lib.rs b/runner-client/src/lib.rs index c7a7a57fd..ca16435d8 100644 --- a/runner-client/src/lib.rs +++ b/runner-client/src/lib.rs @@ -3,3 +3,7 @@ mod runner { } pub use runner::*; + +pub mod data_layer { + tonic::include_proto!("data_layer"); +} diff --git a/runner/protos/data-layer.proto b/runner/protos/data-layer.proto index eaef8034a..909e3d28a 100644 --- a/runner/protos/data-layer.proto +++ b/runner/protos/data-layer.proto @@ -3,8 +3,8 @@ syntax = "proto3"; package data_layer; service DataLayer { - // Provisions the data layer (PostgreSQL + Hasura) - rpc Provision (ProvisionRequest) returns (ProvisionResponse); + // Starts async provisioning task + rpc StartProvisioningTask (ProvisionRequest) returns (ProvisionResponse); // Checks the provisioning status rpc CheckProvisioningTaskStatus (CheckProvisioningTaskStatusRequest) returns (ProvisionResponse); diff --git a/runner/src/indexer/__snapshots__/indexer.test.ts.snap b/runner/src/indexer/__snapshots__/indexer.test.ts.snap index a688b963a..bca20ca2d 100644 --- a/runner/src/indexer/__snapshots__/indexer.test.ts.snap +++ b/runner/src/indexer/__snapshots__/indexer.test.ts.snap @@ -35,8 +35,6 @@ exports[`Indexer unit tests Indexer.execute() allows imperative execution of Gra exports[`Indexer unit tests Indexer.execute() catches errors 1`] = `[]`; -exports[`Indexer unit tests Indexer.execute() logs provisioning failures 1`] = `[]`; - exports[`Indexer unit tests Indexer.execute() should execute all functions against the current block 1`] = ` [ [ diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index fced2b565..f2e4bad6c 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -909,92 +909,6 @@ describe('Indexer unit tests', () => { expect(indexerMeta.updateBlockHeight).not.toHaveBeenCalled(); }); - test('Indexer.execute() provisions a GraphQL endpoint with the specified schema', async () => { - const blockHeight = 82699904; - const mockFetch = jest.fn(() => ({ - status: 200, - json: async () => ({ - errors: null, - }), - })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; - const provisioner: any = { - getPgBouncerConnectionParameters: jest.fn().mockReturnValue(genericDbCredentials), - fetchUserApiProvisioningStatus: jest.fn().mockReturnValue(false), - provisionUserApi: jest.fn(), - provisionLogsAndMetadataIfNeeded: jest.fn(), - ensureConsistentHasuraState: jest.fn(), - }; - const indexerMeta = { - writeLogs: jest.fn(), - setStatus: jest.fn(), - updateBlockHeight: jest.fn() - } as unknown as IndexerMeta; - const indexer = new Indexer(simpleSchemaConfig, { - fetch: mockFetch as unknown as typeof fetch, - provisioner, - dmlHandler: genericMockDmlHandler, - indexerMeta, - }, undefined, config); - - await indexer.execute(mockBlock); - - expect(provisioner.fetchUserApiProvisioningStatus).toHaveBeenCalledWith(simpleSchemaConfig); - expect(indexerMeta.setStatus).toHaveBeenNthCalledWith(1, IndexerStatus.RUNNING); - expect(provisioner.provisionUserApi).toHaveBeenCalledTimes(1); - expect(provisioner.provisionUserApi).toHaveBeenCalledWith(simpleSchemaConfig); - // expect(provisioner.provisionLogsAndMetadataIfNeeded).toHaveBeenCalledTimes(1); - // expect(provisioner.ensureConsistentHasuraState).toHaveBeenCalledTimes(1); - expect(provisioner.getPgBouncerConnectionParameters).toHaveBeenCalledTimes(1); - }); - - test('Indexer.execute() skips provisioning if the endpoint exists', async () => { - const blockHeight = 82699904; - const mockFetch = jest.fn(() => ({ - status: 200, - json: async () => ({ - errors: null, - }), - })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; - const provisioner: any = { - getPgBouncerConnectionParameters: jest.fn().mockReturnValue(genericDbCredentials), - fetchUserApiProvisioningStatus: jest.fn().mockReturnValue(true), - provisionUserApi: jest.fn(), - provisionLogsAndMetadataIfNeeded: jest.fn(), - ensureConsistentHasuraState: jest.fn(), - }; - const indexer = new Indexer(simpleSchemaConfig, { - fetch: mockFetch as unknown as typeof fetch, - provisioner, - dmlHandler: genericMockDmlHandler, - indexerMeta: genericMockIndexerMeta, - }, undefined, config); - - await indexer.execute(mockBlock); - - expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); - expect(provisioner.getPgBouncerConnectionParameters).toHaveBeenCalledTimes(1); - // expect(provisioner.provisionLogsAndMetadataIfNeeded).toHaveBeenCalledTimes(1); - // expect(provisioner.ensureConsistentHasuraState).toHaveBeenCalledTimes(1); - }); - test('Indexer.execute() skips database credentials fetch second time onward', async () => { const blockHeight = 82699904; const mockFetch = jest.fn(() => ({ @@ -1096,57 +1010,6 @@ describe('Indexer unit tests', () => { expect(indexerMeta.updateBlockHeight).toHaveBeenCalledWith(blockHeight); }); - test('Indexer.execute() logs provisioning failures', async () => { - const blockHeight = 82699904; - const mockFetch = jest.fn(() => ({ - status: 200, - json: async () => ({ - errors: null, - }), - })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; - const error = new Error('something went wrong with provisioning'); - const provisioner: any = { - getPgBouncerConnectionParameters: jest.fn().mockReturnValue(genericDbCredentials), - fetchUserApiProvisioningStatus: jest.fn().mockReturnValue(false), - provisionUserApi: jest.fn().mockRejectedValue(error), - provisionLogsIfNeeded: jest.fn(), - provisionMetadataIfNeeded: jest.fn(), - ensureConsistentHasuraState: jest.fn(), - }; - const indexerMeta = { - writeLogs: jest.fn(), - setStatus: jest.fn(), - updateBlockHeight: jest.fn() - } as unknown as IndexerMeta; - const code = ` - context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); - `; - const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'morgs.near', 'test', 0, code, 'schema', LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { - fetch: mockFetch as unknown as typeof fetch, - provisioner, - dmlHandler: genericMockDmlHandler, - indexerMeta, - }, undefined, config); - - await expect(indexer.execute(mockBlock)).rejects.toThrow(error); - - expect(mockFetch.mock.calls).toMatchSnapshot(); - expect(indexerMeta.updateBlockHeight).not.toHaveBeenCalled(); - expect(provisioner.provisionLogsIfNeeded).not.toHaveBeenCalled(); - expect(provisioner.provisionMetadataIfNeeded).not.toHaveBeenCalled(); - expect(provisioner.getPgBouncerConnectionParameters).not.toHaveBeenCalled(); - }); - test('Indexer passes all relevant logs to writeLogs', async () => { const mockDebugIndexerMeta = { writeLogs: jest.fn(), diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 344deae86..97d9f6289 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -92,21 +92,6 @@ export default class Indexer { try { const runningMessage = `Running function ${this.indexerConfig.fullName()} on block ${blockHeight}, lag is: ${lag?.toString()}ms from block timestamp`; - try { - if (!await this.deps.provisioner.fetchUserApiProvisioningStatus(this.indexerConfig)) { - logEntries.push(LogEntry.systemInfo('Provisioning endpoint: starting', blockHeight)); - await this.deps.provisioner.provisionUserApi(this.indexerConfig); - logEntries.push(LogEntry.systemInfo('Provisioning endpoint: successful', blockHeight)); - } - } catch (e) { - const error = e as Error; - if (this.IS_FIRST_EXECUTION) { - this.logger.error('Provisioning endpoint: failure', error); - } - logEntries.push(LogEntry.systemError(`Provisioning endpoint failure: ${error.message}`, blockHeight)); - throw error; - } - logEntries.push(LogEntry.systemInfo(runningMessage, blockHeight)); // Cache database credentials after provisioning await wrapSpan(async () => { diff --git a/runner/src/server/services/data-layer/data-layer-service.test.ts b/runner/src/server/services/data-layer/data-layer-service.test.ts index 99963d30e..15fc29590 100644 --- a/runner/src/server/services/data-layer/data-layer-service.test.ts +++ b/runner/src/server/services/data-layer/data-layer-service.test.ts @@ -67,7 +67,7 @@ describe('DataLayerService', () => { }); describe('Provision', () => { - it('should return ALREADY_EXISTS if the task is already pending', (done) => { + it('should return ALREADY_EXISTS if the task exists', (done) => { const tasks = { 'testAccount:testFunction': { pending: true, completed: false, failed: false } as unknown as ProvisioningTask }; @@ -80,23 +80,23 @@ describe('DataLayerService', () => { done(); }; - createDataLayerService(undefined, tasks).Provision(call, callback); + createDataLayerService(undefined, tasks).StartProvisioningTask(call, callback); }); - it('should return ALREADY_EXISTS if the task has already completed', (done) => { + it('should return Complete if the task has already completed', (done) => { + const tasks: Record = {}; const provisioner = { fetchUserApiProvisioningStatus: jest.fn().mockResolvedValue(true) } as unknown as Provisioner; const call = { request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } } as unknown as ServerUnaryCall; - const callback = (error: any): void => { - expect(error.code).toBe(status.ALREADY_EXISTS); - expect(error.details).toBe('Provisioning task has already completed'); + const callback = (_error: any, response: any): void => { + expect(response.status).toBe(ProvisioningStatus.COMPLETE); done(); }; - createDataLayerService(provisioner).Provision(call, callback); + createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); }); it('should start a new provisioning task and return PENDING', (done) => { @@ -115,7 +115,7 @@ describe('DataLayerService', () => { done(); }; - createDataLayerService(provisioner, tasks).Provision(call, callback); + createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); }); it('should return INTERNAL error if checking provisioning status fails', (done) => { @@ -132,7 +132,7 @@ describe('DataLayerService', () => { done(); }; - createDataLayerService(provisioner, tasks).Provision(call, callback); + createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); }); }); }); diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index 65176de65..ca62a5ea2 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -67,14 +67,14 @@ export function createDataLayerService ( callback(null, { status: ProvisioningStatus.PENDING }); }, - Provision (call: ServerUnaryCall, callback: sendUnaryData): void { + StartProvisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { const { accountId, functionName, schema } = call.request; const provisioningConfig = new ProvisioningConfig(accountId, functionName, schema); const task = tasks[generateTaskId(accountId, functionName)]; - if (task?.pending) { + if (task) { const exists = new StatusBuilder() .withCode(status.ALREADY_EXISTS) .withDetails('Provisioning task already exists') @@ -86,11 +86,7 @@ export function createDataLayerService ( provisioner.fetchUserApiProvisioningStatus(provisioningConfig).then((isProvisioned) => { if (isProvisioned) { - const exists = new StatusBuilder() - .withCode(status.ALREADY_EXISTS) - .withDetails('Provisioning task has already completed') - .build(); - callback(exists); + callback(null, { status: ProvisioningStatus.COMPLETE }); return; } diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index f7be5ef1b..dda4511b8 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -126,6 +126,8 @@ describe('Indexer integration', () => { } ); + await provisioner.provisionUserApi(indexerConfig); + await indexer.execute(Block.fromStreamerMessage(block_115185108 as any as StreamerMessage)); const firstHeight = await indexerBlockHeightQuery('morgs_near_test', graphqlClient); @@ -139,7 +141,7 @@ describe('Indexer integration', () => { expect(secondHeight.value).toEqual('115185109'); const logs: any = await indexerLogsQuery('morgs_near_test', graphqlClient); - expect(logs.length).toEqual(4); + expect(logs.length).toEqual(2); const { morgs_near_test_blocks: blocks }: any = await graphqlClient.request(blocksIndexerQuery); expect(blocks.map(({ height }: any) => height)).toEqual([115185108, 115185109]); @@ -232,6 +234,8 @@ describe('Indexer integration', () => { } ); + await provisioner.provisionUserApi(indexerConfig); + await indexer.execute(Block.fromStreamerMessage(block_115185108 as any as StreamerMessage)); await indexer.execute(Block.fromStreamerMessage(block_115185109 as any as StreamerMessage));