Skip to content

Commit

Permalink
refactor: Move Data Layer provisioning from Runner to Coordinator (#805)
Browse files Browse the repository at this point in the history
This PR updates Coordinator to handle Data Layer provisioning, removing
the implicit step from Runner. Provisioning itself is still completed
within Runner, but Coordinator will trigger and monitor it.
Functionally, provisioning is the same, but there are some subtle
differences around how it is handled:
- Provisioning will now happen as soon as the Indexer is registered,
rather than when the first matched block is executed.
- Block Streams/Executors will not be started until provisioning is
successful, neither will start when either pending or failed.

A `provisioned_state` enum has been added to the Indexer state within
Redis. This is used to persist what stage of provisioning the Data Layer
is at, as well as ensuring we only provision once.

## Concerns with current implementation
Overall, I'm not happy with the implementation here, but feel it is the
best we can do given the current structure of Coordinator. As to not
block this feature I decided to go forward with this approach, and will
create a ticket to refactor/update later.

Provisioning is triggered within the "new" handler, and then polled
within the "existing" handler, which seems a little awkward. The
separated handling is necessary since no operation within the control
loop (i.e. `Synchroniser`) should block, as that would block
synchronisation for all other Indexers. So we need to trigger the
provisioning initially, and then poll the completion each subsequent
control loop.

I feel we have outgrown the current control loop, and am planning to
refactor later. Rather than have a single control loop for _all_
Indexers, I'm thinking we can have dedicated loops for each of them. We
could spawn a new task for each Indexer, which then manages its own
lifecycle. Then each Indexer is free to wait for as long as it wants,
without impacting other Indexers. This would allow us to handle the
blocking provisioning step much more elegantly.
  • Loading branch information
morgsmccauley authored Jun 19, 2024
1 parent 5462ec5 commit 973efd6
Show file tree
Hide file tree
Showing 19 changed files with 461 additions and 422 deletions.
File renamed without changes.
96 changes: 96 additions & 0 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#![cfg_attr(test, allow(dead_code))]

pub use runner::data_layer::ProvisioningStatus;

use anyhow::Context;
use runner::data_layer::data_layer_client::DataLayerClient;
use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest};
use tonic::transport::channel::Channel;
use tonic::{Request, Status};

use crate::indexer_config::IndexerConfig;

#[cfg(not(test))]
pub use DataLayerHandlerImpl as DataLayerHandler;
#[cfg(test)]
pub use MockDataLayerHandlerImpl as DataLayerHandler;

pub struct DataLayerHandlerImpl {
client: DataLayerClient<Channel>,
}

#[cfg_attr(test, mockall::automock)]
impl DataLayerHandlerImpl {
pub fn from_env() -> anyhow::Result<Self> {
let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?;
Self::connect(&runner_url)
}

pub fn connect(runner_url: &str) -> anyhow::Result<Self> {
let channel = Channel::from_shared(runner_url.to_string())
.context("Runner URL is invalid")?
.connect_lazy();
let client = DataLayerClient::new(channel);

Ok(Self { client })
}

pub async fn start_provisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = ProvisionRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
schema: indexer_config.schema.clone(),
};

let response = self
.client
.clone()
.start_provisioning_task(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::AlreadyExists {
return Ok(ProvisioningStatus::Pending);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};

Ok(status)
}

pub async fn check_provisioning_task_status(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = CheckProvisioningTaskStatusRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
};

let response = self
.client
.clone()
.check_provisioning_task_status(Request::new(request))
.await?;

let status = match response.into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};

Ok(status)
}
}
File renamed without changes.
3 changes: 3 additions & 0 deletions coordinator/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod block_streams;
pub mod data_layer;
pub mod executors;
188 changes: 73 additions & 115 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

use anyhow::Context;
use near_primitives::types::AccountId;
use std::str::FromStr;

use crate::indexer_config::IndexerConfig;
use crate::redis::RedisClient;
use crate::registry::IndexerRegistry;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ProvisionedState {
Unprovisioned,
Provisioning,
Provisioned,
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
}
Expand All @@ -20,6 +28,7 @@ pub struct IndexerState {
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
pub provisioned_state: ProvisionedState,
}

impl IndexerState {
Expand All @@ -31,17 +40,6 @@ impl IndexerState {
}
}

impl Default for IndexerState {
fn default() -> Self {
Self {
account_id: AccountId::from_str("morgs.near").unwrap(),
function_name: String::new(),
block_stream_synced_at: None,
enabled: true,
}
}
}

#[cfg(not(test))]
pub use IndexerStateManagerImpl as IndexerStateManager;
#[cfg(test)]
Expand All @@ -57,41 +55,36 @@ impl IndexerStateManagerImpl {
Self { redis_client }
}

pub async fn migrate(&self, registry: &IndexerRegistry) -> anyhow::Result<()> {
if self.redis_client.indexer_states_set_exists().await? {
return Ok(());
}
pub async fn migrate(&self) -> anyhow::Result<()> {
let raw_states = self.redis_client.list_indexer_states().await?;

for raw_state in raw_states {
if let Ok(state) = serde_json::from_str::<IndexerState>(&raw_state) {
tracing::info!(
"{}/{} already migrated, skipping",
state.account_id,
state.function_name
);
continue;
}

tracing::info!("Migrating {}", raw_state);

tracing::info!("Migrating indexer state");

for config in registry.iter() {
let raw_state = self.redis_client.get_indexer_state(config).await?;

let state = if let Some(raw_state) = raw_state {
let old_state: OldIndexerState =
serde_json::from_str(&raw_state).context(format!(
"Failed to deserialize OldIndexerState for {}",
config.get_full_name()
))?;

IndexerState {
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
}
} else {
self.get_default_state(config)
let old_state: IndexerState = serde_json::from_str(&raw_state)?;

let state = IndexerState {
account_id: old_state.account_id,
function_name: old_state.function_name,
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
provisioned_state: ProvisionedState::Provisioned,
};

self.set_state(config, state).await.context(format!(
"Failed to set state for {}",
config.get_full_name()
))?;
self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;
}

tracing::info!("Migration complete");

Ok(())
}

Expand All @@ -101,6 +94,7 @@ impl IndexerStateManagerImpl {
function_name: indexer_config.function_name.clone(),
block_stream_synced_at: None,
enabled: true,
provisioned_state: ProvisionedState::Unprovisioned,
}
}

Expand Down Expand Up @@ -140,6 +134,38 @@ impl IndexerStateManagerImpl {
Ok(())
}

pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioning;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}

pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioned;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}
pub async fn set_provisioning_failure(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Failed;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}

pub async fn set_enabled(
&self,
indexer_config: &IndexerConfig,
Expand Down Expand Up @@ -173,8 +199,6 @@ impl IndexerStateManagerImpl {
mod tests {
use super::*;

use std::collections::HashMap;

use mockall::predicate;
use registry_types::{Rule, StartBlock, Status};

Expand All @@ -183,7 +207,7 @@ mod tests {
let mut mock_redis_client = RedisClient::default();
mock_redis_client
.expect_list_indexer_states()
.returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true }).to_string()]))
.returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()]))
.once();
mock_redis_client
.expect_list_indexer_states()
Expand All @@ -196,72 +220,6 @@ mod tests {
assert!(indexer_manager.list().await.is_err());
}

#[tokio::test]
async fn migrate() {
let config1 = IndexerConfig::default();
let config2 = IndexerConfig {
account_id: "darunrs.near".parse().unwrap(),
function_name: "test".to_string(),
..Default::default()
};

let registry = IndexerRegistry::from(&[
(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), config1.clone())]),
),
(
"darunrs.near".parse().unwrap(),
HashMap::from([("test".to_string(), config2.clone())]),
),
]);

let mut mock_redis_client = RedisClient::default();
mock_redis_client
.expect_indexer_states_set_exists()
.returning(|| Ok(false))
.once();
mock_redis_client
.expect_get_indexer_state()
.with(predicate::eq(config1.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "block_stream_synced_at": 200, "enabled": false })
.to_string(),
))
})
.once();
mock_redis_client
.expect_get_indexer_state()
.with(predicate::eq(config2.clone()))
.returning(|_| Ok(None))
.once();
mock_redis_client
.expect_set_indexer_state()
.with(
predicate::eq(config1),
predicate::eq(
"{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":200,\"enabled\":false}".to_string(),
),
)
.returning(|_, _| Ok(()))
.once();
mock_redis_client
.expect_set_indexer_state()
.with(
predicate::eq(config2),
predicate::eq(
"{\"account_id\":\"darunrs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":null,\"enabled\":true}".to_string()
),
)
.returning(|_, _| Ok(()))
.once();

let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client);

indexer_manager.migrate(&registry).await.unwrap();
}

#[tokio::test]
pub async fn disable_indexer() {
let indexer_config = IndexerConfig {
Expand All @@ -284,15 +242,15 @@ mod tests {
.with(predicate::eq(indexer_config.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true })
serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" })
.to_string(),
))
});
redis_client
.expect_set_indexer_state()
.with(
predicate::always(),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false}".to_string()),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()),
)
.returning(|_, _| Ok(()))
.once();
Expand Down
Loading

0 comments on commit 973efd6

Please sign in to comment.