From 64d1ebbf12d4bce7bcbf54b560b530fe6197824f Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 20 Jun 2024 20:32:21 +1200 Subject: [PATCH] feat: Deprovision Data Layer on delete (#808) This PR removes Data Layer resources on Indexer Delete. To achieve this, the following has been added: - `Provisioner.deprovision()` method which removes: schema, cron jobs, and if necessary, Hasura source, database, and role - `DataLayerService.StartDeprovisioningTask` gRPC method - Calling the above from Coordinator within the delete lifecycle hook In addition to the above, I've slightly refactored DataLayerService to make the addition of de-provisioning more accomodating: - `StartDeprovisioningTask` and `StartProvisioningTask` now return opaque IDs rather than using `accountId`/`functionName` - to avoid conflicts with eachother - There is a single `GetTaskStatus` method which is used for both, before it was specific to provisioning As mentioned in #805, the Coordinator implementation is a little awkward due to the shared/non-blocking Control Loop. I'll look to refactor this later and hopefully improve on this. --- coordinator/src/handlers/data_layer.rs | 81 ++++---- coordinator/src/indexer_state.rs | 43 +++- coordinator/src/redis.rs | 4 + coordinator/src/synchroniser.rs | 188 ++++++++++++++---- .../examples/start_provisioning_task.rs | 4 +- runner-client/proto/data-layer.proto | 27 ++- runner/protos/data-layer.proto | 27 ++- .../__snapshots__/hasura-client.test.ts.snap | 25 +++ .../src/hasura-client/hasura-client.test.ts | 29 +++ runner/src/hasura-client/hasura-client.ts | 14 ++ runner/src/provisioner/provisioner.test.ts | 98 +++++++++ runner/src/provisioner/provisioner.ts | 94 +++++++++ .../data-layer/data-layer-service.test.ts | 90 ++++----- .../services/data-layer/data-layer-service.ts | 135 ++++++++----- runner/tests/integration.test.ts | 46 +++++ 15 files changed, 711 insertions(+), 194 deletions(-) diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 7db624648..68537b3c7 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -1,12 +1,14 @@ #![cfg_attr(test, allow(dead_code))] -pub use runner::data_layer::ProvisioningStatus; +use near_primitives::types::AccountId; + +pub use runner::data_layer::TaskStatus; use anyhow::Context; use runner::data_layer::data_layer_client::DataLayerClient; -use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest}; +use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest}; use tonic::transport::channel::Channel; -use tonic::{Request, Status}; +use tonic::Request; use crate::indexer_config::IndexerConfig; @@ -15,17 +17,14 @@ pub use DataLayerHandlerImpl as DataLayerHandler; #[cfg(test)] pub use MockDataLayerHandlerImpl as DataLayerHandler; +type TaskId = String; + pub struct DataLayerHandlerImpl { client: DataLayerClient, } #[cfg_attr(test, mockall::automock)] impl DataLayerHandlerImpl { - pub fn from_env() -> anyhow::Result { - let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?; - Self::connect(&runner_url) - } - pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -38,7 +37,7 @@ impl DataLayerHandlerImpl { pub async fn start_provisioning_task( &self, indexer_config: &IndexerConfig, - ) -> anyhow::Result { + ) -> anyhow::Result { let request = ProvisionRequest { account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.clone(), @@ -49,46 +48,52 @@ impl DataLayerHandlerImpl { .client .clone() .start_provisioning_task(Request::new(request)) - .await; - - if let Err(error) = response { - if error.code() == tonic::Code::AlreadyExists { - return Ok(ProvisioningStatus::Pending); - } - - return Err(error.into()); - } - - let status = match response.unwrap().into_inner().status { - 1 => ProvisioningStatus::Pending, - 2 => ProvisioningStatus::Complete, - 3 => ProvisioningStatus::Failed, - _ => ProvisioningStatus::Unspecified, - }; + .await?; - Ok(status) + Ok(response.into_inner().task_id) } - pub async fn check_provisioning_task_status( + pub async fn start_deprovisioning_task( &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result { - let request = CheckProvisioningTaskStatusRequest { - account_id: indexer_config.account_id.to_string(), - function_name: indexer_config.function_name.clone(), + account_id: AccountId, + function_name: String, + ) -> anyhow::Result { + let request = DeprovisionRequest { + account_id: account_id.to_string(), + function_name, }; let response = self .client .clone() - .check_provisioning_task_status(Request::new(request)) + .start_deprovisioning_task(Request::new(request)) .await?; - let status = match response.into_inner().status { - 1 => ProvisioningStatus::Pending, - 2 => ProvisioningStatus::Complete, - 3 => ProvisioningStatus::Failed, - _ => ProvisioningStatus::Unspecified, + Ok(response.into_inner().task_id) + } + + pub async fn get_task_status(&self, task_id: TaskId) -> anyhow::Result { + let request = GetTaskStatusRequest { task_id }; + + let response = self + .client + .clone() + .get_task_status(Request::new(request)) + .await; + + if let Err(error) = response { + if error.code() == tonic::Code::NotFound { + return Ok(TaskStatus::Failed); + } + + return Err(error.into()); + } + + let status = match response.unwrap().into_inner().status { + 1 => TaskStatus::Pending, + 2 => TaskStatus::Complete, + 3 => TaskStatus::Failed, + _ => anyhow::bail!("Received invalid task status"), }; Ok(status) diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 2dabcaa96..b6cb20be5 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -9,8 +9,9 @@ use crate::redis::RedisClient; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub enum ProvisionedState { Unprovisioned, - Provisioning, + Provisioning { task_id: String }, Provisioned, + Deprovisioning { task_id: String }, Failed, } @@ -31,13 +32,17 @@ pub struct IndexerState { pub provisioned_state: ProvisionedState, } +// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to +// construct the state key without it. But, this isn't ideal as we now have two places which +// define this key - we need to consolidate these somehow. impl IndexerState { - // FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to - // construct the state key without it. But, this isn't ideal as we now have two places which - // define this key - we need to consolidate these somehow. pub fn get_state_key(&self) -> String { format!("{}/{}:state", self.account_id, self.function_name) } + + pub fn get_redis_stream_key(&self) -> String { + format!("{}/{}:block_stream", self.account_id, self.function_name) + } } #[cfg(not(test))] @@ -105,6 +110,12 @@ impl IndexerStateManagerImpl { return Ok(serde_json::from_str(&raw_state)?); } + tracing::info!( + account_id = indexer_config.account_id.to_string(), + function_name = indexer_config.function_name.as_str(), + "Creating new state using default" + ); + Ok(self.get_default_state(indexer_config)) } @@ -134,10 +145,30 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn set_deprovisioning( + &self, + indexer_state: &IndexerState, + task_id: String, + ) -> anyhow::Result<()> { + let mut state = indexer_state.clone(); + + state.provisioned_state = ProvisionedState::Deprovisioning { task_id }; + + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; + + Ok(()) + } + + pub async fn set_provisioning( + &self, + indexer_config: &IndexerConfig, + task_id: String, + ) -> anyhow::Result<()> { let mut indexer_state = self.get_state(indexer_config).await?; - indexer_state.provisioned_state = ProvisionedState::Provisioning; + indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id }; self.set_state(indexer_config, indexer_state).await?; diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index 74f536c70..191b92f91 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -227,6 +227,10 @@ mockall::mock! { K: ToRedisArgs + Debug + Send + Sync + 'static, V: ToRedisArgs + Debug + Send + Sync + 'static; + pub async fn del(&self, key: K) -> anyhow::Result<()> + where + K: ToRedisArgs + Debug + Send + Sync + 'static; + pub async fn indexer_states_set_exists(&self) -> anyhow::Result; pub async fn sadd(&self, set: S, value: V) -> anyhow::Result<()> diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index 1156d89af..e464e241f 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -4,7 +4,7 @@ use tracing::instrument; use crate::{ handlers::{ block_streams::{BlockStreamsHandler, StreamInfo}, - data_layer::{DataLayerHandler, ProvisioningStatus}, + data_layer::{DataLayerHandler, TaskStatus}, executors::{ExecutorInfo, ExecutorsHandler}, }, indexer_config::IndexerConfig, @@ -82,17 +82,12 @@ impl<'a> Synchroniser<'a> { async fn sync_new_indexer(&self, config: &IndexerConfig) -> anyhow::Result<()> { tracing::info!("Starting data layer provisioning"); - self.data_layer_handler + let task_id = self + .data_layer_handler .start_provisioning_task(config) .await?; - self.state_manager - .set_provisioning(config) - .await - .map_err(|err| { - tracing::warn!(?err, "Failed to set provisioning state"); - err - })?; + self.state_manager.set_provisioning(config, task_id).await?; Ok(()) } @@ -211,11 +206,12 @@ impl<'a> Synchroniser<'a> { Ok(()) } - async fn handle_provisioning(&self, config: &IndexerConfig) -> anyhow::Result<()> { - let task_status_result = self - .data_layer_handler - .check_provisioning_task_status(config) - .await; + async fn ensure_provisioned( + &self, + config: &IndexerConfig, + task_id: String, + ) -> anyhow::Result<()> { + let task_status_result = self.data_layer_handler.get_task_status(task_id).await; if let Err(error) = task_status_result { tracing::warn!(?error, "Failed to check provisioning task status"); @@ -224,13 +220,13 @@ impl<'a> Synchroniser<'a> { }; let _ = match task_status_result.unwrap() { - ProvisioningStatus::Complete => { + TaskStatus::Complete => { tracing::info!("Data layer provisioning complete"); self.state_manager.set_provisioned(config).await } - ProvisioningStatus::Pending => Ok(()), + TaskStatus::Pending => Ok(()), _ => { - tracing::info!("Data layer provisioning failed"); + tracing::warn!("Data layer provisioning failed"); self.state_manager.set_provisioning_failure(config).await } } @@ -254,13 +250,16 @@ impl<'a> Synchroniser<'a> { executor: Option<&ExecutorInfo>, block_stream: Option<&StreamInfo>, ) -> anyhow::Result<()> { - match state.provisioned_state { - ProvisionedState::Unprovisioned | ProvisionedState::Provisioning => { - self.handle_provisioning(config).await?; + match &state.provisioned_state { + ProvisionedState::Provisioning { task_id } => { + self.ensure_provisioned(config, task_id.clone()).await?; return Ok(()); } ProvisionedState::Failed => return Ok(()), ProvisionedState::Provisioned => {} + ProvisionedState::Unprovisioned | ProvisionedState::Deprovisioning { .. } => { + anyhow::bail!("Provisioning task should have been started") + } } if !state.enabled { @@ -326,6 +325,38 @@ impl<'a> Synchroniser<'a> { .await?; } + if let ProvisionedState::Deprovisioning { task_id } = &state.provisioned_state { + match self + .data_layer_handler + .get_task_status(task_id.clone()) + .await? + { + TaskStatus::Complete => { + tracing::info!("Data layer deprovisioning complete"); + } + TaskStatus::Failed => { + tracing::info!("Data layer deprovisioning failed"); + } + TaskStatus::Unspecified => { + tracing::info!("Encountered unspecified deprovisioning task status"); + } + TaskStatus::Pending => return Ok(()), + } + } else { + let task_id = self + .data_layer_handler + .start_deprovisioning_task(state.account_id.clone(), state.function_name.clone()) + .await?; + + self.state_manager + .set_deprovisioning(state, task_id.clone()) + .await?; + + return Ok(()); + } + + self.redis_client.del(state.get_redis_stream_key()).await?; + self.state_manager.delete_state(state).await?; Ok(()) @@ -604,15 +635,15 @@ mod test { state_manager.expect_list().returning(|| Ok(vec![])); state_manager .expect_set_provisioning() - .with(eq(config.clone())) - .returning(|_| Ok(())) + .with(eq(config.clone()), eq("task_id".to_string())) + .returning(|_, _| Ok(())) .once(); let mut data_layer_handler = DataLayerHandler::default(); data_layer_handler .expect_start_provisioning_task() .with(eq(config)) - .returning(|_| Ok(ProvisioningStatus::Pending)) + .returning(|_| Ok("task_id".to_string())) .once(); let redis_client = RedisClient::default(); @@ -642,12 +673,16 @@ mod test { HashMap::from([(config.function_name.clone(), config.clone())]), )]); + let task_id = "task_id".to_string(); + let state = IndexerState { account_id: config.account_id.clone(), function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: true, - provisioned_state: ProvisionedState::Provisioning, + provisioned_state: ProvisionedState::Provisioning { + task_id: task_id.clone().to_string(), + }, }; let mut registry = Registry::default(); @@ -664,9 +699,9 @@ mod test { let mut data_layer_handler = DataLayerHandler::default(); data_layer_handler - .expect_check_provisioning_task_status() - .with(eq(config.clone())) - .returning(|_| Ok(ProvisioningStatus::Complete)); + .expect_get_task_status() + .with(eq(task_id)) + .returning(|_| Ok(TaskStatus::Complete)); let mut block_streams_handler = BlockStreamsHandler::default(); block_streams_handler.expect_start().never(); @@ -705,7 +740,9 @@ mod test { function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: true, - provisioned_state: ProvisionedState::Provisioning, + provisioned_state: ProvisionedState::Provisioning { + task_id: "task_id".to_string(), + }, }; let mut registry = Registry::default(); @@ -722,9 +759,9 @@ mod test { let mut data_layer_handler = DataLayerHandler::default(); data_layer_handler - .expect_check_provisioning_task_status() - .with(eq(config.clone())) - .returning(|_| Ok(ProvisioningStatus::Failed)); + .expect_get_task_status() + .with(eq("task_id".to_string())) + .returning(|_| Ok(TaskStatus::Failed)); let mut block_streams_handler = BlockStreamsHandler::default(); block_streams_handler.expect_start().never(); @@ -1204,14 +1241,16 @@ mod test { use super::*; #[tokio::test] - async fn stops_and_deletes() { + async fn stops_block_stream_and_executor() { let config = IndexerConfig::default(); let state = IndexerState { account_id: config.account_id.clone(), function_name: config.function_name.clone(), block_stream_synced_at: Some(config.get_registry_version()), enabled: false, - provisioned_state: ProvisionedState::Provisioned, + provisioned_state: ProvisionedState::Deprovisioning { + task_id: "task_id".to_string(), + }, }; let executor = ExecutorInfo { executor_id: "executor_id".to_string(), @@ -1242,15 +1281,86 @@ mod test { .once(); let mut state_manager = IndexerStateManager::default(); + state_manager.expect_delete_state().never(); + + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_get_task_status() + .with(eq("task_id".to_string())) + .returning(|_| Ok(TaskStatus::Pending)); + + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let synchroniser = Synchroniser::new( + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + synchroniser + .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) + .await + .unwrap(); + } + + #[tokio::test] + async fn cleans_indexer_resources() { + let config = IndexerConfig::default(); + let provisioned_state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: false, + provisioned_state: ProvisionedState::Provisioned, + }; + let deprovisioning_state = IndexerState { + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + block_stream_synced_at: Some(config.get_registry_version()), + enabled: false, + provisioned_state: ProvisionedState::Deprovisioning { + task_id: "task_id".to_string(), + }, + }; + + let mut state_manager = IndexerStateManager::default(); + state_manager + .expect_set_deprovisioning() + .with(eq(provisioned_state.clone()), eq("task_id".to_string())) + .returning(|_, _| Ok(())); state_manager .expect_delete_state() - .with(eq(state.clone())) + .with(eq(deprovisioning_state.clone())) + .returning(|_| Ok(())) + .once(); + + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_start_deprovisioning_task() + .with( + eq(config.clone().account_id), + eq(config.clone().function_name), + ) + .returning(|_, _| Ok("task_id".to_string())); + data_layer_handler + .expect_get_task_status() + .with(eq("task_id".to_string())) + .returning(|_| Ok(TaskStatus::Complete)); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_del::() + .with(eq(config.get_redis_stream_key())) .returning(|_| Ok(())) .once(); let registry = Registry::default(); - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); let synchroniser = Synchroniser::new( &block_streams_handler, @@ -1262,7 +1372,11 @@ mod test { ); synchroniser - .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) + .sync_deleted_indexer(&provisioned_state, None, None) + .await + .unwrap(); + synchroniser + .sync_deleted_indexer(&deprovisioning_state, None, None) .await .unwrap(); } diff --git a/runner-client/examples/start_provisioning_task.rs b/runner-client/examples/start_provisioning_task.rs index 24a938d15..316d62c6e 100644 --- a/runner-client/examples/start_provisioning_task.rs +++ b/runner-client/examples/start_provisioning_task.rs @@ -9,8 +9,8 @@ async fn main() -> Result<(), Box> { let response = client .start_provisioning_task(Request::new(ProvisionRequest { - account_id: "morgs.near".to_string(), - function_name: "test2".to_string(), + account_id: "test.near".to_string(), + function_name: "data_layer_example".to_string(), schema: "create table blocks();".to_string(), })) .await?; diff --git a/runner-client/proto/data-layer.proto b/runner-client/proto/data-layer.proto index 909e3d28a..cd8fb2c25 100644 --- a/runner-client/proto/data-layer.proto +++ b/runner-client/proto/data-layer.proto @@ -4,10 +4,18 @@ package data_layer; service DataLayer { // Starts async provisioning task - rpc StartProvisioningTask (ProvisionRequest) returns (ProvisionResponse); + rpc StartProvisioningTask (ProvisionRequest) returns (StartTaskResponse); - // Checks the provisioning status - rpc CheckProvisioningTaskStatus (CheckProvisioningTaskStatusRequest) returns (ProvisionResponse); + // Start async deprovisioning task + rpc StartDeprovisioningTask (DeprovisionRequest) returns (StartTaskResponse); + + // Checks the status of provisioning/deprovisioning + rpc GetTaskStatus (GetTaskStatusRequest) returns (GetTaskStatusResponse); + +} + +message StartTaskResponse { + string task_id = 1; } message ProvisionRequest { @@ -16,18 +24,23 @@ message ProvisionRequest { string schema = 3; } -message CheckProvisioningTaskStatusRequest { +message DeprovisionRequest { string account_id = 1; string function_name = 2; } -enum ProvisioningStatus { + +message GetTaskStatusRequest { + string task_id = 1; +} + +enum TaskStatus { UNSPECIFIED = 0; PENDING = 1; COMPLETE = 2; FAILED = 3; } -message ProvisionResponse { - ProvisioningStatus status = 1; +message GetTaskStatusResponse { + TaskStatus status = 1; } diff --git a/runner/protos/data-layer.proto b/runner/protos/data-layer.proto index 909e3d28a..cd8fb2c25 100644 --- a/runner/protos/data-layer.proto +++ b/runner/protos/data-layer.proto @@ -4,10 +4,18 @@ package data_layer; service DataLayer { // Starts async provisioning task - rpc StartProvisioningTask (ProvisionRequest) returns (ProvisionResponse); + rpc StartProvisioningTask (ProvisionRequest) returns (StartTaskResponse); - // Checks the provisioning status - rpc CheckProvisioningTaskStatus (CheckProvisioningTaskStatusRequest) returns (ProvisionResponse); + // Start async deprovisioning task + rpc StartDeprovisioningTask (DeprovisionRequest) returns (StartTaskResponse); + + // Checks the status of provisioning/deprovisioning + rpc GetTaskStatus (GetTaskStatusRequest) returns (GetTaskStatusResponse); + +} + +message StartTaskResponse { + string task_id = 1; } message ProvisionRequest { @@ -16,18 +24,23 @@ message ProvisionRequest { string schema = 3; } -message CheckProvisioningTaskStatusRequest { +message DeprovisionRequest { string account_id = 1; string function_name = 2; } -enum ProvisioningStatus { + +message GetTaskStatusRequest { + string task_id = 1; +} + +enum TaskStatus { UNSPECIFIED = 0; PENDING = 1; COMPLETE = 2; FAILED = 3; } -message ProvisionResponse { - ProvisioningStatus status = 1; +message GetTaskStatusResponse { + TaskStatus status = 1; } diff --git a/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap b/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap index b1b518094..a4aeba4d4 100644 --- a/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap +++ b/runner/src/hasura-client/__snapshots__/hasura-client.test.ts.snap @@ -212,6 +212,31 @@ exports[`HasuraClient creates a schema 1`] = ` ] `; +exports[`HasuraClient drops a datasource 1`] = ` +{ + "args": { + "cascade": true, + "name": "morgs_near", + }, + "type": "pg_drop_source", +} +`; + +exports[`HasuraClient drops a schema 1`] = ` +[ + [ + "mock-hasura-endpoint/v2/query", + { + "body": "{"type":"run_sql","args":{"sql":"DROP schema IF EXISTS schemaName CASCADE","read_only":false,"source":"dbName"}}", + "headers": { + "X-Hasura-Admin-Secret": "mock-hasura-admin-secret", + }, + "method": "POST", + }, + ], +] +`; + exports[`HasuraClient gets table names within a schema 1`] = ` { "args": { diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/hasura-client/hasura-client.test.ts index 5c9e17be8..819530b9d 100644 --- a/runner/src/hasura-client/hasura-client.test.ts +++ b/runner/src/hasura-client/hasura-client.test.ts @@ -41,6 +41,20 @@ describe('HasuraClient', () => { expect(mockFetch.mock.calls).toMatchSnapshot(); }); + it('drops a schema', async () => { + const mockFetch = jest + .fn() + .mockResolvedValue({ + status: 200, + text: () => JSON.stringify({}) + }); + const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch }, config); + + await client.dropSchema('dbName', 'schemaName'); + + expect(mockFetch.mock.calls).toMatchSnapshot(); + }); + it('checks if a schema exists within source', async () => { const mockFetch = jest .fn() @@ -172,6 +186,21 @@ describe('HasuraClient', () => { expect(JSON.parse(mockFetch.mock.calls[0][1].body)).toMatchSnapshot(); }); + it('drops a datasource', async () => { + const mockFetch = jest + .fn() + .mockResolvedValue({ + status: 200, + text: () => JSON.stringify({}) + }); + const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch }, config); + + await client.dropDatasource('morgs_near'); + + expect(mockFetch.mock.calls[0][1].headers['X-Hasura-Admin-Secret']).toBe(config.adminSecret); + expect(JSON.parse(mockFetch.mock.calls[0][1].body)).toMatchSnapshot(); + }); + it('adds a datasource', async () => { const mockFetch = jest .fn() diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/hasura-client/hasura-client.ts index 6efb3c986..8b50242f2 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/hasura-client/hasura-client.ts @@ -209,6 +209,13 @@ export default class HasuraClient { return result.length > 1; } + async dropSchema (source: string, schemaName: string): Promise { + return await this.executeSql( + `DROP schema IF EXISTS ${schemaName} CASCADE`, + { source, readOnly: false } + ); + } + async createSchema (source: string, schemaName: string): Promise { return await this.executeSql(`CREATE schema ${schemaName}`, { source, @@ -443,4 +450,11 @@ export default class HasuraClient { }, }); } + + async dropDatasource (databaseName: string): Promise { + return await this.executeMetadataRequest('pg_drop_source', { + name: databaseName, + cascade: true, + }); + } } diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index ea6cf0b21..c27016832 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -40,8 +40,10 @@ describe('Provisioner', () => { trackForeignKeyRelationships: jest.fn().mockReturnValueOnce(null), addPermissionsToTables: jest.fn().mockReturnValueOnce(null), addDatasource: jest.fn().mockReturnValueOnce(null), + dropDatasource: jest.fn().mockReturnValueOnce(null), executeSqlOnSchema: jest.fn().mockReturnValueOnce(null), createSchema: jest.fn().mockReturnValueOnce(null), + dropSchema: jest.fn().mockReturnValueOnce(null), doesSourceExist: jest.fn().mockReturnValueOnce(false), doesSchemaExist: jest.fn().mockReturnValueOnce(false), untrackTables: jest.fn().mockReturnValueOnce(null), @@ -71,6 +73,102 @@ describe('Provisioner', () => { indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); }); + describe('deprovision', () => { + it('removes schema level resources', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [{ schema_name: 'another_one' }] }); // list schemas + + await provisioner.deprovision(indexerConfig); + + expect(hasuraClient.dropSchema).toBeCalledWith(indexerConfig.databaseName(), indexerConfig.schemaName()); + expect(userPgClientQuery.mock.calls).toEqual([ + ["SELECT cron.unschedule('morgs_near_test_function_sys_logs_create_partition');"], + ["SELECT cron.unschedule('morgs_near_test_function_sys_logs_delete_partition');"], + ["SELECT schema_name FROM information_schema.schemata WHERE schema_owner = 'morgs_near'"], + ]); + }); + + it('removes database level resources', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [] }); // list schemas + + await provisioner.deprovision(indexerConfig); + + expect(hasuraClient.dropSchema).toBeCalledWith(indexerConfig.databaseName(), indexerConfig.schemaName()); + expect(userPgClientQuery.mock.calls).toEqual([ + ["SELECT cron.unschedule('morgs_near_test_function_sys_logs_create_partition');"], + ["SELECT cron.unschedule('morgs_near_test_function_sys_logs_delete_partition');"], + ["SELECT schema_name FROM information_schema.schemata WHERE schema_owner = 'morgs_near'"], + ]); + expect(hasuraClient.dropDatasource).toBeCalledWith(indexerConfig.databaseName()); + expect(adminPgClient.query).toBeCalledWith('DROP DATABASE IF EXISTS morgs_near (FORCE)'); + expect(cronPgClient.query).toBeCalledWith('REVOKE USAGE ON SCHEMA cron FROM morgs_near CASCADE'); + expect(cronPgClient.query).toBeCalledWith('REVOKE EXECUTE ON FUNCTION cron.schedule_in_database FROM morgs_near;'); + expect(adminPgClient.query).toBeCalledWith('DROP ROLE IF EXISTS morgs_near'); + }); + + it('handles revoke cron failures', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [] }); // list schemas + + cronPgClient.query = jest.fn() + .mockRejectedValue(new Error('failed revoke')); + + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to revoke cron access: failed revoke'); + }); + + it('handles drop role failures', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [] }); // list schemas + + adminPgClient.query = jest.fn() + .mockResolvedValueOnce(null) + .mockRejectedValue(new Error('failed to drop role')); + + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to drop role: failed to drop role'); + }); + + it('handles drop database failures', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [] }); // list schemas + + adminPgClient.query = jest.fn().mockRejectedValue(new Error('failed to drop db')); + + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to drop database: failed to drop db'); + }); + + it('handles drop datasource failures', async () => { + userPgClientQuery = jest.fn() + .mockResolvedValueOnce(null) // unschedule create partition job + .mockResolvedValueOnce(null) // unschedule delete partition job + .mockResolvedValueOnce({ rows: [] }); // list schemas + + hasuraClient.dropDatasource = jest.fn().mockRejectedValue(new Error('failed to drop datasource')); + + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to drop datasource: failed to drop'); + }); + + it('handles drop schema failures', async () => { + hasuraClient.dropSchema = jest.fn().mockRejectedValue(new Error('failed to drop schema')); + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to drop schema: failed to drop'); + }); + + it('handles remove log job failures', async () => { + userPgClientQuery = jest.fn().mockResolvedValueOnce(null).mockRejectedValueOnce(new Error('failed to remove jobs')); + await expect(provisioner.deprovision(indexerConfig)).rejects.toThrow('Failed to deprovision: Failed to unschedule log partition jobs: failed to remove jobs'); + }); + }); + describe('isUserApiProvisioned', () => { it('returns false if datasource doesnt exists', async () => { hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index 04ab238e5..f23d8ca97 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -234,6 +234,100 @@ export default class Provisioner { return await wrapError(async () => await this.hasuraClient.addDatasource(userName, password, databaseName), 'Failed to add datasource'); } + async dropSchemaAndMetadata (databaseName: string, schemaName: string): Promise { + await wrapError(async () => { + // Need to drop via Hasura to ensure metadata is cleaned up + await this.hasuraClient.dropSchema(databaseName, schemaName); + }, 'Failed to drop schema'); + } + + async removeLogPartitionJobs (userName: string, schemaName: string): Promise { + await wrapError( + async () => { + const userCronConnectionParameters = { + ...(await this.getPostgresConnectionParameters(userName)), + database: this.config.cronDatabase + }; + const userCronPgClient = new this.PgClient(userCronConnectionParameters); + + await userCronPgClient.query( + this.pgFormat( + "SELECT cron.unschedule('%I_sys_logs_create_partition');", + schemaName, + ) + ); + await userCronPgClient.query( + this.pgFormat( + "SELECT cron.unschedule('%I_sys_logs_delete_partition');", + schemaName, + ) + ); + + await userCronPgClient.end(); + }, + 'Failed to unschedule log partition jobs' + ); + } + + async listUserOwnedSchemas (userName: string): Promise { + return await wrapError(async () => { + const userDbConnectionParameters = await this.getPostgresConnectionParameters(userName); + const userPgClient = new this.PgClient(userDbConnectionParameters); + + const result = await userPgClient.query( + this.pgFormat('SELECT schema_name FROM information_schema.schemata WHERE schema_owner = %L', userName) + ); + + await userPgClient.end(); + + return result.rows.map((row) => row.schema_name); + }, 'Failed to list schemas'); + } + + async dropDatabase (databaseName: string): Promise { + await wrapError(async () => { + await this.adminDefaultPgClient.query(this.pgFormat('DROP DATABASE IF EXISTS %I (FORCE)', databaseName)); + }, 'Failed to drop database'); + } + + async dropDatasource (databaseName: string): Promise { + await wrapError(async () => { + await this.hasuraClient.dropDatasource(databaseName); + }, 'Failed to drop datasource'); + } + + async dropRole (userName: string): Promise { + await wrapError(async () => { + await this.adminDefaultPgClient.query(this.pgFormat('DROP ROLE IF EXISTS %I', userName)); + }, 'Failed to drop role'); + } + + async revokeCronAccess (userName: string): Promise { + await wrapError( + async () => { + await this.adminCronPgClient.query(this.pgFormat('REVOKE USAGE ON SCHEMA cron FROM %I CASCADE', userName)); + await this.adminCronPgClient.query(this.pgFormat('REVOKE EXECUTE ON FUNCTION cron.schedule_in_database FROM %I;', userName)); + }, + 'Failed to revoke cron access' + ); + } + + public async deprovision (config: ProvisioningConfig): Promise { + await wrapError(async () => { + await this.dropSchemaAndMetadata(config.userName(), config.schemaName()); + await this.removeLogPartitionJobs(config.userName(), config.schemaName()); + + const schemas = await this.listUserOwnedSchemas(config.userName()); + + if (schemas.length === 0) { + await this.dropDatasource(config.databaseName()); + await this.dropDatabase(config.databaseName()); + await this.revokeCronAccess(config.userName()); + await this.dropRole(config.userName()); + } + }, 'Failed to deprovision'); + } + async provisionUserApi (indexerConfig: ProvisioningConfig): Promise { // replace any with actual type const userName = indexerConfig.userName(); const databaseName = indexerConfig.databaseName(); diff --git a/runner/src/server/services/data-layer/data-layer-service.test.ts b/runner/src/server/services/data-layer/data-layer-service.test.ts index 15fc29590..bc4904514 100644 --- a/runner/src/server/services/data-layer/data-layer-service.test.ts +++ b/runner/src/server/services/data-layer/data-layer-service.test.ts @@ -1,14 +1,14 @@ import { type ServerUnaryCall, status } from '@grpc/grpc-js'; -import { createDataLayerService, type ProvisioningTask } from './data-layer-service'; -import { ProvisioningStatus } from '../../../generated/data_layer/ProvisioningStatus'; +import { createDataLayerService, type AsyncTask } from './data-layer-service'; +import { TaskStatus } from '../../../generated/data_layer/TaskStatus'; import type Provisioner from '../../../provisioner'; describe('DataLayerService', () => { - describe('CheckProvisioningTaskStatus', () => { + describe('GetTaskStatus', () => { it('should return NOT_FOUND if the task does not exist', (done) => { const call = { - request: { accountId: 'testAccount', functionName: 'testFunction' } + request: { taskId: 'id' } } as unknown as ServerUnaryCall; const callback = (error: any): void => { @@ -17,122 +17,122 @@ describe('DataLayerService', () => { done(); }; - createDataLayerService().CheckProvisioningTaskStatus(call, callback); + createDataLayerService().GetTaskStatus(call, callback); }); it('should return PENDING if the task is pending', (done) => { const tasks = { - 'testAccount:testFunction': { pending: true, completed: false, failed: false } as unknown as ProvisioningTask + id: { pending: true, completed: false, failed: false } as unknown as AsyncTask }; const call = { - request: { accountId: 'testAccount', functionName: 'testFunction' } + request: { taskId: 'id' } } as unknown as ServerUnaryCall; const callback = (_error: any, response: any): void => { - expect(response.status).toBe(ProvisioningStatus.PENDING); + expect(response.status).toBe(TaskStatus.PENDING); done(); }; - createDataLayerService(undefined, tasks).CheckProvisioningTaskStatus(call, callback); + createDataLayerService(undefined, tasks).GetTaskStatus(call, callback); }); it('should return COMPLETE if the task is completed', (done) => { const tasks = { - 'testAccount:testFunction': { pending: false, completed: true, failed: false } as unknown as ProvisioningTask + id: { pending: false, completed: true, failed: false } as unknown as AsyncTask }; const call = { - request: { accountId: 'testAccount', functionName: 'testFunction' } + request: { taskId: 'id' } } as unknown as ServerUnaryCall; const callback = (_error: any, response: any): void => { - expect(response.status).toBe(ProvisioningStatus.COMPLETE); + expect(response.status).toBe(TaskStatus.COMPLETE); done(); }; - createDataLayerService(undefined, tasks).CheckProvisioningTaskStatus(call, callback); + createDataLayerService(undefined, tasks).GetTaskStatus(call, callback); }); it('should return FAILED if the task has failed', (done) => { const tasks = { - 'testAccount:testFunction': { pending: false, completed: false, failed: true } as unknown as ProvisioningTask + id: { pending: false, completed: false, failed: true } as unknown as AsyncTask }; const call = { - request: { accountId: 'testAccount', functionName: 'testFunction' } + request: { taskId: 'id' } } as unknown as ServerUnaryCall; const callback = (_error: any, response: any): void => { - expect(response.status).toBe(ProvisioningStatus.FAILED); + expect(response.status).toBe(TaskStatus.FAILED); done(); }; - createDataLayerService(undefined, tasks).CheckProvisioningTaskStatus(call, callback); + createDataLayerService(undefined, tasks).GetTaskStatus(call, callback); }); }); - describe('Provision', () => { - it('should return ALREADY_EXISTS if the task exists', (done) => { - const tasks = { - 'testAccount:testFunction': { pending: true, completed: false, failed: false } as unknown as ProvisioningTask + describe('StartProvisioningTask', () => { + it('should return the current task if it exists', (done) => { + const tasks: Record = { + '8291150845651941809f8f3db28eeb7fd8acdfeb422cb07c10178020070836b8': { pending: false, completed: true, failed: false } as unknown as AsyncTask }; const call = { request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'schema' } } as unknown as ServerUnaryCall; - const callback = (error: any): void => { - expect(error.code).toBe(status.ALREADY_EXISTS); - expect(error.details).toBe('Provisioning task already exists'); + const callback = (_error: any, response: any): void => { + expect(tasks[response.taskId]).toBeDefined(); + expect(tasks[response.taskId].completed).toBe(true); done(); }; createDataLayerService(undefined, tasks).StartProvisioningTask(call, callback); }); - it('should return Complete if the task has already completed', (done) => { + it('should start a new provisioning task', (done) => { const tasks: Record = {}; const provisioner = { - fetchUserApiProvisioningStatus: jest.fn().mockResolvedValue(true) + provisionUserApi: jest.fn().mockResolvedValue(null) } as unknown as Provisioner; const call = { request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } } as unknown as ServerUnaryCall; const callback = (_error: any, response: any): void => { - expect(response.status).toBe(ProvisioningStatus.COMPLETE); + expect(tasks[response.taskId]).toBeDefined(); + expect(tasks[response.taskId].pending).toBe(true); done(); }; createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); }); + }); - it('should start a new provisioning task and return PENDING', (done) => { - const tasks: Record = {}; - const provisioner = { - fetchUserApiProvisioningStatus: jest.fn().mockResolvedValue(false), - provisionUserApi: jest.fn().mockResolvedValue(null) - } as unknown as Provisioner; + describe('StartDeprovisioningTask', () => { + it('should return ALREADY_EXISTS if the task exists', (done) => { + const tasks = { + f92a9f97d2609849e6837b483d8210c7b308c6f615a691449087ec00db1eef06: { pending: true, completed: false, failed: false } as unknown as AsyncTask + }; const call = { - request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } + request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'schema' } } as unknown as ServerUnaryCall; - const callback = (_error: any, response: any): void => { - expect(response.status).toBe(ProvisioningStatus.PENDING); - expect(tasks['testAccount:testFunction']).toBeDefined(); - expect(tasks['testAccount:testFunction'].pending).toBe(true); + const callback = (error: any): void => { + expect(error.code).toBe(status.ALREADY_EXISTS); + expect(error.details).toBe('Deprovisioning task already exists'); done(); }; - createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); + createDataLayerService(undefined, tasks).StartDeprovisioningTask(call, callback); }); - it('should return INTERNAL error if checking provisioning status fails', (done) => { + it('should start a new deprovisioning task', (done) => { const tasks: Record = {}; const provisioner = { - fetchUserApiProvisioningStatus: jest.fn().mockRejectedValue(new Error('boom')) + deprovision: jest.fn().mockResolvedValue(null) } as unknown as Provisioner; const call = { request: { accountId: 'testAccount', functionName: 'testFunction', schema: 'testSchema' } } as unknown as ServerUnaryCall; - const callback = (error: any): void => { - expect(error.code).toBe(status.INTERNAL); - expect(error.details).toBe('boom'); + const callback = (_error: any, response: any): void => { + expect(tasks[response.taskId]).toBeDefined(); + expect(tasks[response.taskId].pending).toBe(true); done(); }; - createDataLayerService(provisioner, tasks).StartProvisioningTask(call, callback); + createDataLayerService(provisioner, tasks).StartDeprovisioningTask(call, callback); }); }); }); diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index 18312568f..3b3d46ece 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -1,21 +1,27 @@ +import crypto from 'crypto'; + import { type ServerUnaryCall, type sendUnaryData, status, StatusBuilder } from '@grpc/grpc-js'; import Provisioner from '../../../provisioner'; import { ProvisioningConfig } from '../../../indexer-config/indexer-config'; import parentLogger from '../../../logger'; -import { type CheckProvisioningTaskStatusRequest__Output } from '../../../generated/data_layer/CheckProvisioningTaskStatusRequest'; +import { type GetTaskStatusRequest__Output } from '../../../generated/data_layer/GetTaskStatusRequest'; +import { type GetTaskStatusResponse } from '../../../generated/data_layer/GetTaskStatusResponse'; import { type DataLayerHandlers } from '../../../generated/data_layer/DataLayer'; +import { type StartTaskResponse } from '../../../generated/data_layer/StartTaskResponse'; import { type ProvisionRequest__Output } from '../../../generated/data_layer/ProvisionRequest'; -import { type ProvisionResponse } from '../../../generated/data_layer/ProvisionResponse'; -import { ProvisioningStatus } from '../../../generated/data_layer/ProvisioningStatus'; +import { type DeprovisionRequest__Output } from '../../../generated/data_layer/DeprovisionRequest'; +import { TaskStatus } from '../../../generated/data_layer/TaskStatus'; -export class ProvisioningTask { +export class AsyncTask { public failed: boolean; public pending: boolean; public completed: boolean; - constructor (public readonly promise: Promise) { + constructor ( + public readonly promise: Promise + ) { promise.then(() => { this.completed = true; }).catch((error) => { @@ -31,19 +37,36 @@ export class ProvisioningTask { } } -type ProvisioningTasks = Record; +type AsyncTasks = Record; + +enum TaskType { + PROVISION = 'PROVISION', + DEPROVISION = 'DEPROVISION' +} + +const hash = (...args: string[]): string => { + const hash = crypto.createHash('sha256'); + hash.update(args.join(':')); + return hash.digest('hex'); +}; + +const createLogger = (config: ProvisioningConfig): typeof parentLogger => { + const logger = parentLogger.child({ + accountId: config.accountId, + functionName: config.functionName, + service: 'DataLayerService' + }); -const generateTaskId = (accountId: string, functionName: string): string => `${accountId}:${functionName}`; + return logger; +}; export function createDataLayerService ( provisioner: Provisioner = new Provisioner(), - tasks: ProvisioningTasks = {} + tasks: AsyncTasks = {} ): DataLayerHandlers { return { - CheckProvisioningTaskStatus (call: ServerUnaryCall, callback: sendUnaryData): void { - const { accountId, functionName } = call.request; - - const task = tasks[generateTaskId(accountId, functionName)]; + GetTaskStatus (call: ServerUnaryCall, callback: sendUnaryData): void { + const task = tasks[call.request.taskId]; if (!task) { const notFound = new StatusBuilder() @@ -56,70 +79,78 @@ export function createDataLayerService ( } if (task.completed) { - callback(null, { status: ProvisioningStatus.COMPLETE }); + callback(null, { status: TaskStatus.COMPLETE }); return; } if (task.failed) { - callback(null, { status: ProvisioningStatus.FAILED }); + callback(null, { status: TaskStatus.FAILED }); return; } - callback(null, { status: ProvisioningStatus.PENDING }); + callback(null, { status: TaskStatus.PENDING }); }, - StartProvisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { + StartProvisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { const { accountId, functionName, schema } = call.request; const provisioningConfig = new ProvisioningConfig(accountId, functionName, schema); - const logger = parentLogger.child({ - service: 'DataLayerService', - accountId: provisioningConfig.accountId, - functionName: provisioningConfig.functionName, - }); + const logger = createLogger(provisioningConfig); + + const taskId = hash(accountId, functionName, schema, TaskType.PROVISION); + + const task = tasks[taskId]; + + if (task) { + callback(null, { taskId }); + + return; + }; + + logger.info(`Starting provisioning task: ${taskId}`); + + tasks[taskId] = new AsyncTask( + provisioner + .provisionUserApi(provisioningConfig) + .then(() => { + logger.info('Successfully provisioned Data Layer'); + }) + .catch((err) => { + logger.error('Failed to provision Data Layer', err); + throw err; + }) + ); + + callback(null, { taskId }); + }, + + StartDeprovisioningTask (call: ServerUnaryCall, callback: sendUnaryData): void { + const { accountId, functionName } = call.request; - const task = tasks[generateTaskId(accountId, functionName)]; + const provisioningConfig = new ProvisioningConfig(accountId, functionName, 'todo'); + + const logger = createLogger(provisioningConfig); + + const taskId = hash(accountId, functionName, TaskType.DEPROVISION); + + const task = tasks[taskId]; if (task) { const exists = new StatusBuilder() .withCode(status.ALREADY_EXISTS) - .withDetails('Provisioning task already exists') + .withDetails('Deprovisioning task already exists') .build(); callback(exists); return; }; - provisioner.fetchUserApiProvisioningStatus(provisioningConfig).then((isProvisioned) => { - if (isProvisioned) { - callback(null, { status: ProvisioningStatus.COMPLETE }); - - return; - } - - logger.info('Provisioning Data Layer'); - - tasks[generateTaskId(accountId, functionName)] = new ProvisioningTask( - provisioner - .provisionUserApi(provisioningConfig) - .then(() => { - logger.info('Successfully provisioned Data Layer'); - }) - .catch((err) => { - logger.error('Failed to provision Data Layer', err); - throw err; - }) - ); - - callback(null, { status: ProvisioningStatus.PENDING }); - }).catch((error) => { - const internalError = new StatusBuilder() - .withCode(status.INTERNAL) - .withDetails(error.message) - .build(); - callback(internalError); - }); + logger.info(`Starting deprovisioning task: ${taskId}`); + + tasks[taskId] = new AsyncTask(provisioner.deprovision(provisioningConfig)); + + callback(null, { taskId }); } }; } diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index dda4511b8..3a74fbc8f 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -245,6 +245,52 @@ describe('Indexer integration', () => { const { morgs_near_test_context_db_indexer_storage: totalRows }: any = await graphqlClient.request(queryAllRows); expect(totalRows.length).toEqual(3); // Two inserts, and the overwritten upsert }); + + it('can provision and deprovision', async () => { + const testConfig1 = new IndexerConfig( + 'test:stream', + 'provisioning.near', // must be unique to prevent conflicts with other tests + 'test-provisioning1', + 0, + '', + 'CREATE TABLE blocks (height numeric)', + LogLevel.INFO + ); + + const testConfig2 = new IndexerConfig( + 'test:stream', + 'provisioning.near', // must be unique to prevent conflicts with other tests + 'test-provisioning2', + 0, + '', + 'CREATE TABLE blocks (height numeric)', + LogLevel.INFO + ); + + await provisioner.provisionUserApi(testConfig1); + await provisioner.provisionUserApi(testConfig2); + + const params = await provisioner.getPostgresConnectionParameters(testConfig1.userName()); + const userPgClient = new PgClient(params); + + await expect(pgClient.query('SELECT 1 FROM pg_database WHERE datname = $1', ['provisioning_near']).then(({ rows }) => rows)).resolves.toHaveLength(1); + await expect(userPgClient.query('SELECT schema_name FROM information_schema.schemata WHERE schema_name like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(2); + await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(4); + await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(true); + + await provisioner.deprovision(testConfig1); + + await expect(pgClient.query('SELECT 1 FROM pg_database WHERE datname = $1', ['provisioning_near']).then(({ rows }) => rows)).resolves.toHaveLength(1); + await expect(userPgClient.query('SELECT schema_name FROM information_schema.schemata WHERE schema_name like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(1); + await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(2); + await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(true); + + await provisioner.deprovision(testConfig2); + + await expect(pgClient.query('SELECT 1 FROM pg_database WHERE datname = $1', ['provisioning_near']).then(({ rows }) => rows)).resolves.toHaveLength(0); + await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(0); + await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(false); + }); }); async function indexerLogsQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise {