Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor to support zombie-bite #236

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pjs-rs = { version = "0.1.2", optional = true }
uuid = { workspace = true }
regex = { workspace = true }
glob-match = { workspace = true }
async-trait = { workspace = true }

# Zombienet deps
configuration = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/orchestrator/src/generators/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ impl ChainSpec {
self.raw_path.as_deref()
}

pub fn set_asset_location(&mut self, location: AssetLocation) {
self.asset_location = Some(location)
}

pub async fn read_chain_id<'a, T>(
&self,
scoped_fs: &ScopedFilesystem<'a, T>,
Expand Down
109 changes: 44 additions & 65 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub mod errors;
mod generators;
pub mod network;
mod network_helper;
pub mod network_helper;
mod network_spec;
#[cfg(feature = "pjs")]
pub mod pjs_helper;
Expand All @@ -21,7 +21,9 @@ use configuration::{NetworkConfig, RegistrationStrategy};
use errors::OrchestratorError;
use generators::errors::GeneratorError;
use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
use network_spec::{node::NodeSpec, parachain::ParachainSpec, NetworkSpec};
// re-exported
pub use network_spec::NetworkSpec;
use network_spec::{node::NodeSpec, parachain::ParachainSpec};
use provider::{
types::{ProviderCapabilities, TransferedFile},
DynProvider,
Expand All @@ -31,7 +33,6 @@ use tokio::time::timeout;
use tracing::{debug, info, trace};

use crate::{
generators::chain_spec::ParaGenesisConfig,
shared::{constants::P2P_PORT, types::RegisterParachainOptions},
spawner::SpawnNodeCtx,
};
Expand Down Expand Up @@ -70,6 +71,20 @@ where
res?
}

pub async fn spawn_from_spec(
&self,
network_spec: NetworkSpec,
) -> Result<Network<T>, OrchestratorError> {
let global_timeout = network_spec.global_settings.network_spawn_timeout();
let res = timeout(
Duration::from_secs(global_timeout as u64),
self.spawn_inner(network_spec),
)
.await
.map_err(|_| OrchestratorError::GlobalTimeOut(global_timeout));
res?
}

async fn spawn_inner(
&self,
mut network_spec: NetworkSpec,
Expand Down Expand Up @@ -118,41 +133,12 @@ where
.chain_spec
.read_chain_id(&scoped_fs)
.await?;
let relay_chain_name = network_spec.relaychain.chain.as_str();
// TODO: if we don't need to register this para we can skip it
for para in network_spec.parachains.iter_mut() {
let chain_spec_raw_path = para
.build_chain_spec(&relay_chain_id, &ns, &scoped_fs)
.await?;
debug!("parachain chain-spec built!");

// TODO: this need to be abstracted in a single call to generate_files.
if network_spec.global_settings.base_dir().is_some() {
scoped_fs.create_dir_all(para.id.to_string()).await?;
} else {
scoped_fs.create_dir(para.id.to_string()).await?;
};

// create wasm/state
para.genesis_state
.build(
chain_spec_raw_path.clone(),
format!("{}/genesis-state", para.id),
&ns,
&scoped_fs,
)
.await?;
debug!("parachain genesis state built!");
para.genesis_wasm
.build(
chain_spec_raw_path,
format!("{}/genesis-wasm", para.id),
&ns,
&scoped_fs,
)
.await?;
debug!("parachain genesis wasm built!");
}
let relay_chain_name = network_spec.relaychain.chain.as_str().to_owned();
let base_dir_exists = network_spec.global_settings.base_dir().is_some();
network_spec
.build_parachain_artifacts(ns.clone(), &scoped_fs, &relay_chain_id, base_dir_exists)
.await?;

// Gather the parachains to register in genesis and the ones to register with extrinsic
let (para_to_register_in_genesis, para_to_register_with_extrinsic): (
Expand All @@ -168,20 +154,7 @@ where

let mut para_artifacts = vec![];
for para in para_to_register_in_genesis {
let genesis_config = ParaGenesisConfig {
state_path: para.genesis_state.artifact_path().ok_or(
OrchestratorError::InvariantError(
"artifact path for state must be set at this point",
),
)?,
wasm_path: para.genesis_wasm.artifact_path().ok_or(
OrchestratorError::InvariantError(
"artifact path for wasm must be set at this point",
),
)?,
id: para.id,
as_parachain: para.onboard_as_parachain,
};
let genesis_config = para.get_genesis_config()?;
para_artifacts.push(genesis_config)
}

Expand Down Expand Up @@ -210,7 +183,7 @@ where
let mut ctx = SpawnNodeCtx {
chain_id: &relay_chain_id,
parachain_id: None,
chain: relay_chain_name,
chain: relay_chain_name.as_str(),
role: ZombieRole::Node,
ns: &ns,
scoped_fs: &scoped_fs,
Expand Down Expand Up @@ -477,7 +450,7 @@ pub struct ScopedFilesystem<'a, FS: FileSystem> {
}

impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
fn new(fs: &'a FS, base_dir: &'a str) -> Self {
pub fn new(fs: &'a FS, base_dir: &'a str) -> Self {
Self { fs, base_dir }
}

Expand All @@ -497,11 +470,13 @@ impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
}

async fn read_to_string(&self, file: impl AsRef<Path>) -> Result<String, FileSystemError> {
let full_path = PathBuf::from(format!(
"{}/{}",
self.base_dir,
file.as_ref().to_string_lossy()
));
let file = file.as_ref();

let full_path = if file.is_absolute() {
file.to_owned()
} else {
PathBuf::from(format!("{}/{}", self.base_dir, file.to_string_lossy()))
};
let content = self.fs.read_to_string(full_path).await?;
Ok(content)
}
Expand Down Expand Up @@ -529,12 +504,15 @@ impl<'a, FS: FileSystem> ScopedFilesystem<'a, FS> {
path: impl AsRef<Path>,
contents: impl AsRef<[u8]> + Send,
) -> Result<(), FileSystemError> {
let path = PathBuf::from(format!(
"{}/{}",
self.base_dir,
path.as_ref().to_string_lossy()
));
self.fs.write(path, contents).await.map_err(Into::into)
let path = path.as_ref();

let full_path = if path.is_absolute() {
path.to_owned()
} else {
PathBuf::from(format!("{}/{}", self.base_dir, path.to_string_lossy()))
};

self.fs.write(full_path, contents).await.map_err(Into::into)
}
}

Expand All @@ -548,8 +526,9 @@ pub enum ZombieRole {
Companion,
}

// re-export
// re-exports
pub use network::{AddCollatorOptions, AddNodeOptions};
pub use network_helper::metrics;
#[cfg(feature = "pjs")]
pub use pjs_helper::PjsResult;

Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/network_helper.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod metrics;
pub mod verifier;
62 changes: 62 additions & 0 deletions crates/orchestrator/src/network_helper/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::collections::HashMap;

use async_trait::async_trait;
use reqwest::Url;

#[async_trait]
pub trait MetricsHelper {
async fn metric(&self, metric_name: &str) -> Result<f64, anyhow::Error>;
async fn metric_with_url(
metric: impl AsRef<str> + Send,
endpoint: impl Into<Url> + Send,
) -> Result<f64, anyhow::Error>;
}

pub struct Metrics {
endpoint: Url,
}

impl Metrics {
fn new(endpoint: impl Into<Url>) -> Self {
Self {
endpoint: endpoint.into(),
}
}

async fn fetch_metrics(
endpoint: impl AsRef<str>,
) -> Result<HashMap<String, f64>, anyhow::Error> {
let response = reqwest::get(endpoint.as_ref()).await?;
Ok(prom_metrics_parser::parse(&response.text().await?)?)
}

fn get_metric(
metrics_map: HashMap<String, f64>,
metric_name: &str,
) -> Result<f64, anyhow::Error> {
let treat_not_found_as_zero = true;
if let Some(val) = metrics_map.get(metric_name) {
Ok(*val)
} else if treat_not_found_as_zero {
Ok(0_f64)
} else {
Err(anyhow::anyhow!("MetricNotFound: {metric_name}"))
}
}
}

#[async_trait]
impl MetricsHelper for Metrics {
async fn metric(&self, metric_name: &str) -> Result<f64, anyhow::Error> {
let metrics_map = Metrics::fetch_metrics(self.endpoint.as_str()).await?;
Metrics::get_metric(metrics_map, metric_name)
}

async fn metric_with_url(
metric_name: impl AsRef<str> + Send,
endpoint: impl Into<Url> + Send,
) -> Result<f64, anyhow::Error> {
let metrics_map = Metrics::fetch_metrics(endpoint.into()).await?;
Metrics::get_metric(metrics_map, metric_name.as_ref())
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/network_helper/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::trace;

use crate::network::node::NetworkNode;

pub async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> {
pub(crate) async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> {
timeout(Duration::from_secs(90), check_nodes(nodes))
.await
.map_err(|_| anyhow::anyhow!("one or more nodes are not ready!"))
Expand Down
67 changes: 64 additions & 3 deletions crates/orchestrator/src/network_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{

use configuration::{GlobalSettings, HrmpChannelConfig, NetworkConfig};
use futures::future::try_join_all;
use provider::{ProviderError, ProviderNamespace};
use support::constants::THIS_IS_A_BUG;
use provider::{DynNamespace, ProviderError, ProviderNamespace};
use support::{constants::THIS_IS_A_BUG, fs::FileSystem};
use tracing::debug;

use crate::errors::OrchestratorError;
use crate::{errors::OrchestratorError, ScopedFilesystem};

pub mod node;
pub mod parachain;
Expand Down Expand Up @@ -123,6 +123,67 @@ impl NetworkSpec {
Ok(output)
}

pub fn relaychain(&self) -> &RelaychainSpec {
&self.relaychain
}

pub fn relaychain_mut(&mut self) -> &mut RelaychainSpec {
&mut self.relaychain
}

pub fn parachains_iter(&self) -> impl Iterator<Item = &ParachainSpec> {
self.parachains.iter()
}

pub fn parachains_iter_mut(&mut self) -> impl Iterator<Item = &mut ParachainSpec> {
self.parachains.iter_mut()
}

pub fn set_global_settings(&mut self, global_settings: GlobalSettings) {
self.global_settings = global_settings;
}

pub async fn build_parachain_artifacts<'a, T: FileSystem>(
&mut self,
ns: DynNamespace,
scoped_fs: &ScopedFilesystem<'a, T>,
relaychain_id: &str,
base_dir_exists: bool,
) -> Result<(), anyhow::Error> {
for para in self.parachains.iter_mut() {
let chain_spec_raw_path = para.build_chain_spec(relaychain_id, &ns, scoped_fs).await?;
debug!("parachain chain-spec built!");

if base_dir_exists {
scoped_fs.create_dir_all(para.id.to_string()).await?;
} else {
scoped_fs.create_dir(para.id.to_string()).await?;
};

// create wasm/state
para.genesis_state
.build(
chain_spec_raw_path.clone(),
format!("{}/genesis-state", para.id),
&ns,
scoped_fs,
)
.await?;
debug!("parachain genesis state built!");
para.genesis_wasm
.build(
chain_spec_raw_path,
format!("{}/genesis-wasm", para.id),
&ns,
scoped_fs,
)
.await?;
debug!("parachain genesis wasm built!");
}

Ok(())
}

// collect mutable references to all nodes from relaychain and parachains
fn collect_network_nodes(&mut self) -> Vec<&mut NodeSpec> {
vec![
Expand Down
Loading
Loading