diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 24838d516..000000000 --- a/TODO.md +++ /dev/null @@ -1,7 +0,0 @@ -- Implement `DataLayerService` gRPC, to provide control over provisioning, ignoring de-provisioning for now -- Remove implicit provisioning from execution -- Refactor Coordinator executor/block_stream synchronization so that it is a single stateful process? -- Integrate `DataLayerService` in to Coordinator and add synchronization step to manage this -- Add gRPC method to deprovision data layer -- Refactor Coordinator synchronization so that we capture indexer "removals" -- Add de-provisioning to Coordinator synchronization diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 8d3fd9643..a8d1eddb9 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -18,8 +18,6 @@ struct OldIndexerState { block_stream_synced_at: Option, } -// NOTE We'll need to add more fields here - is there a way to gracefully handle non-existant -// fields during serde deserialization? it's annoying to always have to migrate this #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct IndexerState { pub block_stream_synced_at: Option, @@ -44,7 +42,6 @@ pub struct IndexerStateManagerImpl { redis_client: RedisClient, } -// NOTE we probably need a "list" method, which means storing all state ids in a Redis set #[cfg_attr(test, mockall::automock)] impl IndexerStateManagerImpl { pub fn new(redis_client: RedisClient) -> Self { diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index bdf732071..f19d5e5e8 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -68,26 +68,11 @@ async fn main() -> anyhow::Result<()> { .migrate_state_if_needed(&indexer_registry) .await?; - // NOTE Rather than filtering them here, we can pass `IndexerState` to the sync methods, - // and let them decide what to do. That would be a bit cleaner? - // - // This will also allow us to determine when an Indexer has been deleted, rather than - // implicitly relying on the existance of executors/block_streams. This is going to be - // important for deprovisioning. let indexer_registry = indexer_state_manager .filter_disabled_indexers(&indexer_registry) .await?; tokio::try_join!( - // NOTE this may need to be regactored in to a combined "synchronise" function. - // The addition of DataLayer provisioning makes the process a bit more stateful, i.e. - // we need to do provisioning first, wait till it completes, and can then kick off - // executor/block_stream sync processes - // - // It's probably still helpful to encapsulate the block_stream/executor sync methods, - // as they are quite involved, but call them from an overall synchronise method - // - // We'll need to store the `ProvisioningStatus` in Redis, so we know when to poll synchronise_executors(&indexer_registry, &executors_handler), synchronise_block_streams( &indexer_registry, diff --git a/runner/protos/data-layer.proto b/runner/protos/data-layer.proto index 17bf72673..b3ce42a36 100644 --- a/runner/protos/data-layer.proto +++ b/runner/protos/data-layer.proto @@ -2,19 +2,7 @@ syntax = "proto3"; package data_layer; -// NOTE this will eventually be expanded to handle more granular operations -// such as truncating the database, or perhaps running migrations service DataLayer { - // NOTE As this process can take a while, we need to handle this asynchronously. - // Therefore, this will trigger the provisioning process and return immediately. - // The client can then poll the CheckProvisioningStatus method to determine when the - // provisioning process has completed. - // - // Maybe we should call this TriggerProvisioning instead of Provision? - // - // Need to figure out how this process will actually be kicked off asynchronously, - // can we just fire a promise or do we need worker threads? - // Provisions the data layer (PostgreSQL + Hasura) rpc Provision (ProvisionRequest) returns (ProvisionResponse); @@ -23,8 +11,6 @@ service DataLayer { } message ProvisionRequest { - // TODO This is only a partial `IndexerConfig`, which may pose an issue as - // all the provisioning methods expect a full `IndexerConfig` string account_id = 1; string function_name = 2; string schema = 3; diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 18f9f3497..344deae86 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -48,9 +48,6 @@ const defaultConfig: Config = { hasuraEndpoint: process.env.HASURA_ENDPOINT, }; -// NOTE We need to remove provisioning from here, -// I'm thinking we remove knowledge of it entirely? and just inject he db parameters -// in from `StreamHandler`? Maybe a good opportunity to rename `StreamHandler` in to something more appropriate export default class Indexer { DEFAULT_HASURA_ROLE: string; IS_FIRST_EXECUTION: boolean = true; diff --git a/runner/src/server/services/data-layer/data-layer-service.test.ts b/runner/src/server/services/data-layer/data-layer-service.test.ts index 8689ca157..a92d1d049 100644 --- a/runner/src/server/services/data-layer/data-layer-service.test.ts +++ b/runner/src/server/services/data-layer/data-layer-service.test.ts @@ -1,6 +1,6 @@ import { type ServerUnaryCall, status } from '@grpc/grpc-js'; -import { createDataLayerService, type ProvisioningTask } from './data-layer-service'; // Adjust this path accordingly +import { createDataLayerService, type ProvisioningTask } from './data-layer-service'; import { ProvisioningStatus } from '../../../generated/ProvisioningStatus'; import type Provisioner from '../../../provisioner';