Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move Data Layer provisioning from Runner to Coordinator #805

Merged
merged 13 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#![cfg_attr(test, allow(dead_code))]

pub use runner::data_layer::ProvisioningStatus;

use anyhow::Context;
use runner::data_layer::data_layer_client::DataLayerClient;
use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest};
use tonic::transport::channel::Channel;
use tonic::{Request, Status};

use crate::indexer_config::IndexerConfig;

#[cfg(not(test))]
pub use DataLayerHandlerImpl as DataLayerHandler;
#[cfg(test)]
pub use MockDataLayerHandlerImpl as DataLayerHandler;

pub struct DataLayerHandlerImpl {
client: DataLayerClient<Channel>,
}

#[cfg_attr(test, mockall::automock)]
impl DataLayerHandlerImpl {
pub fn from_env() -> anyhow::Result<Self> {
let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?;
Self::connect(&runner_url)
}

pub fn connect(runner_url: &str) -> anyhow::Result<Self> {
let channel = Channel::from_shared(runner_url.to_string())
.context("Runner URL is invalid")?
.connect_lazy();
let client = DataLayerClient::new(channel);

Ok(Self { client })
}

pub async fn start_provisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = ProvisionRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
schema: indexer_config.schema.clone(),
};

let response = self
.client
.clone()
.start_provisioning_task(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::AlreadyExists {
return Ok(ProvisioningStatus::Pending);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};

Ok(status)
}

pub async fn check_provisioning_task_status(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = CheckProvisioningTaskStatusRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
};

let response = self
.client
.clone()
.check_provisioning_task_status(Request::new(request))
.await?;

let status = match response.into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};

Ok(status)
}
}
3 changes: 3 additions & 0 deletions coordinator/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod block_streams;
pub mod data_layer;
pub mod executors;
188 changes: 73 additions & 115 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

use anyhow::Context;
use near_primitives::types::AccountId;
use std::str::FromStr;

use crate::indexer_config::IndexerConfig;
use crate::redis::RedisClient;
use crate::registry::IndexerRegistry;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ProvisionedState {
Unprovisioned,
Provisioning,
Provisioned,
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
}
Expand All @@ -20,6 +28,7 @@ pub struct IndexerState {
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
pub provisioned_state: ProvisionedState,
}

impl IndexerState {
Expand All @@ -31,17 +40,6 @@ impl IndexerState {
}
}

impl Default for IndexerState {
fn default() -> Self {
Self {
account_id: AccountId::from_str("morgs.near").unwrap(),
function_name: String::new(),
block_stream_synced_at: None,
enabled: true,
}
}
}

#[cfg(not(test))]
pub use IndexerStateManagerImpl as IndexerStateManager;
#[cfg(test)]
Expand All @@ -57,41 +55,36 @@ impl IndexerStateManagerImpl {
Self { redis_client }
}

pub async fn migrate(&self, registry: &IndexerRegistry) -> anyhow::Result<()> {
if self.redis_client.indexer_states_set_exists().await? {
return Ok(());
}
pub async fn migrate(&self) -> anyhow::Result<()> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration to add provisioned_state enum, defaulting to Provisioned. There is some risk here if an Indexer hasn't actually been provisioned, but I believe it's unlikely and can handle manually if so.

let raw_states = self.redis_client.list_indexer_states().await?;

for raw_state in raw_states {
if let Ok(state) = serde_json::from_str::<IndexerState>(&raw_state) {
tracing::info!(
"{}/{} already migrated, skipping",
state.account_id,
state.function_name
);
continue;
}

tracing::info!("Migrating {}", raw_state);

tracing::info!("Migrating indexer state");

for config in registry.iter() {
let raw_state = self.redis_client.get_indexer_state(config).await?;

let state = if let Some(raw_state) = raw_state {
let old_state: OldIndexerState =
serde_json::from_str(&raw_state).context(format!(
"Failed to deserialize OldIndexerState for {}",
config.get_full_name()
))?;

IndexerState {
account_id: config.account_id.clone(),
function_name: config.function_name.clone(),
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
}
} else {
self.get_default_state(config)
let old_state: IndexerState = serde_json::from_str(&raw_state)?;

let state = IndexerState {
account_id: old_state.account_id,
function_name: old_state.function_name,
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
provisioned_state: ProvisionedState::Provisioned,
};

self.set_state(config, state).await.context(format!(
"Failed to set state for {}",
config.get_full_name()
))?;
self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;
}

tracing::info!("Migration complete");

Ok(())
}

Expand All @@ -101,6 +94,7 @@ impl IndexerStateManagerImpl {
function_name: indexer_config.function_name.clone(),
block_stream_synced_at: None,
enabled: true,
provisioned_state: ProvisionedState::Unprovisioned,
}
}

Expand Down Expand Up @@ -140,6 +134,38 @@ impl IndexerStateManagerImpl {
Ok(())
}

pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioning;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}

pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioned;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}
pub async fn set_provisioning_failure(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Failed;

self.set_state(indexer_config, indexer_state).await?;

Ok(())
}

pub async fn set_enabled(
&self,
indexer_config: &IndexerConfig,
Expand Down Expand Up @@ -173,8 +199,6 @@ impl IndexerStateManagerImpl {
mod tests {
use super::*;

use std::collections::HashMap;

use mockall::predicate;
use registry_types::{Rule, StartBlock, Status};

Expand All @@ -183,7 +207,7 @@ mod tests {
let mut mock_redis_client = RedisClient::default();
mock_redis_client
.expect_list_indexer_states()
.returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true }).to_string()]))
.returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()]))
.once();
mock_redis_client
.expect_list_indexer_states()
Expand All @@ -196,72 +220,6 @@ mod tests {
assert!(indexer_manager.list().await.is_err());
}

#[tokio::test]
async fn migrate() {
let config1 = IndexerConfig::default();
let config2 = IndexerConfig {
account_id: "darunrs.near".parse().unwrap(),
function_name: "test".to_string(),
..Default::default()
};

let registry = IndexerRegistry::from(&[
(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), config1.clone())]),
),
(
"darunrs.near".parse().unwrap(),
HashMap::from([("test".to_string(), config2.clone())]),
),
]);

let mut mock_redis_client = RedisClient::default();
mock_redis_client
.expect_indexer_states_set_exists()
.returning(|| Ok(false))
.once();
mock_redis_client
.expect_get_indexer_state()
.with(predicate::eq(config1.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "block_stream_synced_at": 200, "enabled": false })
.to_string(),
))
})
.once();
mock_redis_client
.expect_get_indexer_state()
.with(predicate::eq(config2.clone()))
.returning(|_| Ok(None))
.once();
mock_redis_client
.expect_set_indexer_state()
.with(
predicate::eq(config1),
predicate::eq(
"{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":200,\"enabled\":false}".to_string(),
),
)
.returning(|_, _| Ok(()))
.once();
mock_redis_client
.expect_set_indexer_state()
.with(
predicate::eq(config2),
predicate::eq(
"{\"account_id\":\"darunrs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":null,\"enabled\":true}".to_string()
),
)
.returning(|_, _| Ok(()))
.once();

let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client);

indexer_manager.migrate(&registry).await.unwrap();
}

#[tokio::test]
pub async fn disable_indexer() {
let indexer_config = IndexerConfig {
Expand All @@ -284,15 +242,15 @@ mod tests {
.with(predicate::eq(indexer_config.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true })
serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" })
.to_string(),
))
});
redis_client
.expect_set_indexer_state()
.with(
predicate::always(),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false}".to_string()),
predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()),
)
.returning(|_, _| Ok(()))
.once();
Expand Down
Loading
Loading