Skip to content

Commit

Permalink
feat: add verifier to check readiness (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepoviola authored Oct 6, 2023
1 parent 2a221b4 commit eec89c2
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 11 deletions.
8 changes: 0 additions & 8 deletions crates/examples/examples/small_network_with_default.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use configuration::NetworkConfigBuilder;
use orchestrator::{AddNodeOpts, Orchestrator};
use provider::NativeProvider;
Expand Down Expand Up @@ -37,13 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: add check to ensure if unique
network.add_node("new1", opts, None).await?;

tokio::time::sleep(Duration::from_secs(2)).await;

// Example of some opertions that you can do
// with `nodes` (e.g pause, resume, restart)

tokio::time::sleep(Duration::from_secs(10)).await;

// Get a ref to the node
let node = network.get_node("alice")?;

Expand All @@ -57,8 +51,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// node.pause().await?;
// println!("node new1 paused!");

tokio::time::sleep(Duration::from_secs(2)).await;

// node.resume().await?;
// println!("node new1 resumed!");

Expand Down
10 changes: 7 additions & 3 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod errors;
mod generators;
mod network;
mod network_helper;
mod network_spec;
mod shared;
mod spawner;
Expand Down Expand Up @@ -73,8 +74,9 @@ where
// create namespace
let ns = self.provider.create_namespace().await?;

println!("ns: {:#?}", ns.id());
println!("base_dir: {:#?}", ns.base_dir());
println!("\n\n");
println!("🧰 ns: {:#?}", ns.id());
println!("🧰 base_dir: {:#?}", ns.base_dir());

// TODO: noop for native
// Static setup
Expand Down Expand Up @@ -211,6 +213,7 @@ where
scoped_fs: &scoped_fs,
parachain: None,
bootnodes_addr: &vec![],
wait_ready: false,
};

let global_files_to_inject = vec![TransferedFile {
Expand Down Expand Up @@ -372,7 +375,8 @@ where

// - add-ons (introspector/tracing/etc)

// - verify nodes (clean metrics cache?)
// verify nodes
network_helper::verifier::verify_nodes(&network.nodes()).await?;

// - write zombie.json state file (we should defined in a way we can load later)

Expand Down
5 changes: 5 additions & 0 deletions crates/orchestrator/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl<T: FileSystem> Network<T> {
scoped_fs: &scoped_fs,
parachain: para_spec,
bootnodes_addr: &vec![],
wait_ready: true,
};

let mut global_files_to_inject = vec![TransferedFile {
Expand Down Expand Up @@ -190,6 +191,10 @@ impl<T: FileSystem> Network<T> {
}
}

pub fn nodes(&self) -> Vec<&NetworkNode> {
self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
}

// Internal API
pub(crate) fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
if let Some(para_id) = para_id {
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/network_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod verifier;
34 changes: 34 additions & 0 deletions crates/orchestrator/src/network_helper/verifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Duration;

use tokio::time::timeout;

use crate::network::node::NetworkNode;

pub async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> {
timeout(Duration::from_secs(90), check_nodes(nodes))
.await
.map_err(|_| anyhow::anyhow!("one or more nodes are not ready!"))
}

// TODO: we should inject in someway the logic to make the request
// in order to allow us to `mock` and easily test this.
// maybe moved to the provider with a NodeStatus, and some helpers like wait_running, wait_ready, etc... ? to be discussed
async fn check_nodes(nodes: &[&NetworkNode]) {
loop {
let tasks: Vec<_> = nodes
.iter()
.map(|node| {
// TODO: move to logger
// println!("getting from {}", node.name);
reqwest::get(node.prometheus_uri.clone())
})
.collect();

let all_ready = futures::future::try_join_all(tasks).await;
if all_ready.is_ok() {
return;
}

tokio::time::sleep(Duration::from_millis(500)).await;
}
}
4 changes: 4 additions & 0 deletions crates/orchestrator/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub struct SpawnNodeCtx<'a, T: FileSystem> {
pub(crate) parachain: Option<&'a ParachainSpec>,
/// The string represenation of the bootnode addres to pass to nodes
pub(crate) bootnodes_addr: &'a Vec<String>,
/// Flag to wait node is ready or not
/// Ready state means we can query prometheus internal server
pub(crate) wait_ready: bool,
}

pub async fn spawn_node<'a, T>(
Expand Down Expand Up @@ -153,6 +156,7 @@ where
node.name
);
println!("🚀 {} : metrics link {prometheus_uri}", node.name);
println!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name);
println!("\n");
Ok(NetworkNode::new(
node.name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tx_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod register_para;
pub mod validator_actions;
66 changes: 66 additions & 0 deletions crates/orchestrator/src/tx_helper/register_para.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::str::FromStr;

use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};
use support::fs::FileSystem;

use crate::{shared::types::RegisterParachainOptions, ScopedFilesystem};


pub async fn register(
options: RegisterParachainOptions,
scoped_fs: &ScopedFilesystem<'_, impl FileSystem>,
) -> Result<(), anyhow::Error> {
println!("Registering parachain: {:?}", options);
// get the seed
let sudo: Keypair;
if let Some(possible_seed) = options.seed {
sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
} else {
let uri = SecretUri::from_str("//Alice")?;
sudo = Keypair::from_uri(&uri)?;
}

let genesis_state = scoped_fs
.read_to_string(options.state_path)
.await
.expect("State Path should be ok by this point.");
let wasm_data = scoped_fs
.read_to_string(options.wasm_path)
.await
.expect("Wasm Path should be ok by this point.");

let api = OnlineClient::<SubstrateConfig>::from_url(options.node_ws_url).await?;

let schedule_para = subxt::dynamic::tx(
"ParasSudoWrapper",
"sudo_schedule_para_initialize",
vec![
Value::primitive(options.id.into()),
Value::named_composite([
(
"genesis_head",
Value::from_bytes(hex::decode(&genesis_state[2..])?),
),
(
"validation_code",
Value::from_bytes(hex::decode(&wasm_data[2..])?),
),
("para_kind", Value::bool(options.onboard_as_para)),
]),
],
);

let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![schedule_para.into_value()]);

// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;

let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}
48 changes: 48 additions & 0 deletions crates/orchestrator/src/tx_helper/validator_actions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::str::FromStr;

use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};


pub async fn register(
validator_ids: Vec<String>,
node_ws_url: &str,
) -> Result<(), anyhow::Error> {
println!("Registering validators: {:?}", validator_ids);
// get the seed
// let sudo: Keypair;
// if let Some(possible_seed) = options.seed {
// sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
// } else {
let uri = SecretUri::from_str("//Alice")?;
let sudo = Keypair::from_uri(&uri)?;
// }

println!("pse");
let api = OnlineClient::<SubstrateConfig>::from_url(node_ws_url).await?;
println!("pse connected");

// let bytes: Vec<Value> = validator_ids.iter().map(|id| Value::from_bytes(id)).collect();
// println!("{:?}", bytes);

let register_call = subxt::dynamic::tx(
"ValidatorManager",
"register_validators",
vec![Value::unnamed_composite(vec![Value::from_bytes(validator_ids.first().unwrap().as_bytes())])],
);

let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![register_call.into_value()]);

println!("pse1");
// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;

println!("result: {:#?}", result);
let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}

0 comments on commit eec89c2

Please sign in to comment.