diff --git a/crates/examples/examples/small_network_with_default.rs b/crates/examples/examples/small_network_with_default.rs index a9d7ab850..7b9afef50 100644 --- a/crates/examples/examples/small_network_with_default.rs +++ b/crates/examples/examples/small_network_with_default.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use configuration::NetworkConfigBuilder; use orchestrator::{AddNodeOpts, Orchestrator}; use provider::NativeProvider; @@ -37,13 +35,9 @@ async fn main() -> Result<(), Box> { // TODO: add check to ensure if unique network.add_node("new1", opts, None).await?; - tokio::time::sleep(Duration::from_secs(2)).await; - // Example of some opertions that you can do // with `nodes` (e.g pause, resume, restart) - tokio::time::sleep(Duration::from_secs(10)).await; - // Get a ref to the node let node = network.get_node("alice")?; @@ -57,8 +51,6 @@ async fn main() -> Result<(), Box> { // node.pause().await?; // println!("node new1 paused!"); - tokio::time::sleep(Duration::from_secs(2)).await; - // node.resume().await?; // println!("node new1 resumed!"); diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index a01c8b207..df49613f8 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -4,6 +4,7 @@ mod errors; mod generators; mod network; +mod network_helper; mod network_spec; mod shared; mod spawner; @@ -73,8 +74,9 @@ where // create namespace let ns = self.provider.create_namespace().await?; - println!("ns: {:#?}", ns.id()); - println!("base_dir: {:#?}", ns.base_dir()); + println!("\n\n"); + println!("🧰 ns: {:#?}", ns.id()); + println!("🧰 base_dir: {:#?}", ns.base_dir()); // TODO: noop for native // Static setup @@ -211,6 +213,7 @@ where scoped_fs: &scoped_fs, parachain: None, bootnodes_addr: &vec![], + wait_ready: false, }; let global_files_to_inject = vec![TransferedFile { @@ -372,7 +375,8 @@ where // - add-ons (introspector/tracing/etc) - // - verify nodes (clean metrics cache?) + // verify nodes + network_helper::verifier::verify_nodes(&network.nodes()).await?; // - write zombie.json state file (we should defined in a way we can load later) diff --git a/crates/orchestrator/src/network.rs b/crates/orchestrator/src/network.rs index 3654ab112..4b1b1d115 100644 --- a/crates/orchestrator/src/network.rs +++ b/crates/orchestrator/src/network.rs @@ -142,6 +142,7 @@ impl Network { scoped_fs: &scoped_fs, parachain: para_spec, bootnodes_addr: &vec![], + wait_ready: true, }; let mut global_files_to_inject = vec![TransferedFile { @@ -190,6 +191,10 @@ impl Network { } } + pub fn nodes(&self) -> Vec<&NetworkNode> { + self.nodes_by_name.values().collect::>() + } + // Internal API pub(crate) fn add_running_node(&mut self, node: NetworkNode, para_id: Option) { if let Some(para_id) = para_id { diff --git a/crates/orchestrator/src/network_helper.rs b/crates/orchestrator/src/network_helper.rs new file mode 100644 index 000000000..9a0722027 --- /dev/null +++ b/crates/orchestrator/src/network_helper.rs @@ -0,0 +1 @@ +pub mod verifier; diff --git a/crates/orchestrator/src/network_helper/verifier.rs b/crates/orchestrator/src/network_helper/verifier.rs new file mode 100644 index 000000000..12d41cb41 --- /dev/null +++ b/crates/orchestrator/src/network_helper/verifier.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use tokio::time::timeout; + +use crate::network::node::NetworkNode; + +pub async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> { + timeout(Duration::from_secs(90), check_nodes(nodes)) + .await + .map_err(|_| anyhow::anyhow!("one or more nodes are not ready!")) +} + +// TODO: we should inject in someway the logic to make the request +// in order to allow us to `mock` and easily test this. +// maybe moved to the provider with a NodeStatus, and some helpers like wait_running, wait_ready, etc... ? to be discussed +async fn check_nodes(nodes: &[&NetworkNode]) { + loop { + let tasks: Vec<_> = nodes + .iter() + .map(|node| { + // TODO: move to logger + // println!("getting from {}", node.name); + reqwest::get(node.prometheus_uri.clone()) + }) + .collect(); + + let all_ready = futures::future::try_join_all(tasks).await; + if all_ready.is_ok() { + return; + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } +} diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs index 61b28b9c0..a0271d340 100644 --- a/crates/orchestrator/src/spawner.rs +++ b/crates/orchestrator/src/spawner.rs @@ -32,6 +32,9 @@ pub struct SpawnNodeCtx<'a, T: FileSystem> { pub(crate) parachain: Option<&'a ParachainSpec>, /// The string represenation of the bootnode addres to pass to nodes pub(crate) bootnodes_addr: &'a Vec, + /// Flag to wait node is ready or not + /// Ready state means we can query prometheus internal server + pub(crate) wait_ready: bool, } pub async fn spawn_node<'a, T>( @@ -153,6 +156,7 @@ where node.name ); println!("🚀 {} : metrics link {prometheus_uri}", node.name); + println!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name); println!("\n"); Ok(NetworkNode::new( node.name.clone(), diff --git a/crates/orchestrator/src/tx_helper.rs b/crates/orchestrator/src/tx_helper.rs new file mode 100644 index 000000000..02707e1d7 --- /dev/null +++ b/crates/orchestrator/src/tx_helper.rs @@ -0,0 +1,2 @@ +pub mod register_para; +pub mod validator_actions; \ No newline at end of file diff --git a/crates/orchestrator/src/tx_helper/register_para.rs b/crates/orchestrator/src/tx_helper/register_para.rs new file mode 100644 index 000000000..55aaf6da5 --- /dev/null +++ b/crates/orchestrator/src/tx_helper/register_para.rs @@ -0,0 +1,66 @@ +use std::str::FromStr; + +use subxt::{dynamic::Value, OnlineClient, SubstrateConfig}; +use subxt_signer::{sr25519::Keypair, SecretUri}; +use support::fs::FileSystem; + +use crate::{shared::types::RegisterParachainOptions, ScopedFilesystem}; + + +pub async fn register( + options: RegisterParachainOptions, + scoped_fs: &ScopedFilesystem<'_, impl FileSystem>, +) -> Result<(), anyhow::Error> { + println!("Registering parachain: {:?}", options); + // get the seed + let sudo: Keypair; + if let Some(possible_seed) = options.seed { + sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair."); + } else { + let uri = SecretUri::from_str("//Alice")?; + sudo = Keypair::from_uri(&uri)?; + } + + let genesis_state = scoped_fs + .read_to_string(options.state_path) + .await + .expect("State Path should be ok by this point."); + let wasm_data = scoped_fs + .read_to_string(options.wasm_path) + .await + .expect("Wasm Path should be ok by this point."); + + let api = OnlineClient::::from_url(options.node_ws_url).await?; + + let schedule_para = subxt::dynamic::tx( + "ParasSudoWrapper", + "sudo_schedule_para_initialize", + vec![ + Value::primitive(options.id.into()), + Value::named_composite([ + ( + "genesis_head", + Value::from_bytes(hex::decode(&genesis_state[2..])?), + ), + ( + "validation_code", + Value::from_bytes(hex::decode(&wasm_data[2..])?), + ), + ("para_kind", Value::bool(options.onboard_as_para)), + ]), + ], + ); + + let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![schedule_para.into_value()]); + + // TODO: uncomment below and fix the sign and submit (and follow afterwards until + // finalized block) to register the parachain + let result = api + .tx() + .sign_and_submit_then_watch_default(&sudo_call, &sudo) + .await?; + + let result = result.wait_for_in_block().await?; + println!("In block: {:#?}", result.block_hash()); + Ok(()) +} \ No newline at end of file diff --git a/crates/orchestrator/src/tx_helper/validator_actions.rs b/crates/orchestrator/src/tx_helper/validator_actions.rs new file mode 100644 index 000000000..ca01d8aed --- /dev/null +++ b/crates/orchestrator/src/tx_helper/validator_actions.rs @@ -0,0 +1,48 @@ +use std::str::FromStr; + +use subxt::{dynamic::Value, OnlineClient, SubstrateConfig}; +use subxt_signer::{sr25519::Keypair, SecretUri}; + + +pub async fn register( + validator_ids: Vec, + node_ws_url: &str, +) -> Result<(), anyhow::Error> { + println!("Registering validators: {:?}", validator_ids); + // get the seed + // let sudo: Keypair; + // if let Some(possible_seed) = options.seed { + // sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair."); + // } else { + let uri = SecretUri::from_str("//Alice")?; + let sudo = Keypair::from_uri(&uri)?; + // } + + println!("pse"); + let api = OnlineClient::::from_url(node_ws_url).await?; + println!("pse connected"); + + // let bytes: Vec = validator_ids.iter().map(|id| Value::from_bytes(id)).collect(); + // println!("{:?}", bytes); + + let register_call = subxt::dynamic::tx( + "ValidatorManager", + "register_validators", + vec![Value::unnamed_composite(vec![Value::from_bytes(validator_ids.first().unwrap().as_bytes())])], + ); + + let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![register_call.into_value()]); + + println!("pse1"); + // TODO: uncomment below and fix the sign and submit (and follow afterwards until + // finalized block) to register the parachain + let result = api + .tx() + .sign_and_submit_then_watch_default(&sudo_call, &sudo) + .await?; + + println!("result: {:#?}", result); + let result = result.wait_for_in_block().await?; + println!("In block: {:#?}", result.block_hash()); + Ok(()) +} \ No newline at end of file