Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add verifier to check readiness #118

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions crates/examples/examples/small_network_with_default.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use configuration::NetworkConfigBuilder;
use orchestrator::{AddNodeOpts, Orchestrator};
use provider::NativeProvider;
Expand Down Expand Up @@ -37,13 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: add check to ensure if unique
network.add_node("new1", opts, None).await?;

tokio::time::sleep(Duration::from_secs(2)).await;

// Example of some opertions that you can do
// with `nodes` (e.g pause, resume, restart)

tokio::time::sleep(Duration::from_secs(10)).await;

// Get a ref to the node
let node = network.get_node("alice")?;

Expand All @@ -57,8 +51,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// node.pause().await?;
// println!("node new1 paused!");

tokio::time::sleep(Duration::from_secs(2)).await;

// node.resume().await?;
// println!("node new1 resumed!");

Expand Down
10 changes: 7 additions & 3 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod errors;
mod generators;
mod network;
mod network_helper;
mod network_spec;
mod shared;
mod spawner;
Expand Down Expand Up @@ -73,8 +74,9 @@ where
// create namespace
let ns = self.provider.create_namespace().await?;

println!("ns: {:#?}", ns.id());
println!("base_dir: {:#?}", ns.base_dir());
println!("\n\n");
println!("🧰 ns: {:#?}", ns.id());
println!("🧰 base_dir: {:#?}", ns.base_dir());

// TODO: noop for native
// Static setup
Expand Down Expand Up @@ -211,6 +213,7 @@ where
scoped_fs: &scoped_fs,
parachain: None,
bootnodes_addr: &vec![],
wait_ready: false,
};

let global_files_to_inject = vec![TransferedFile {
Expand Down Expand Up @@ -372,7 +375,8 @@ where

// - add-ons (introspector/tracing/etc)

// - verify nodes (clean metrics cache?)
// verify nodes
network_helper::verifier::verify_nodes(&network.nodes()).await?;

// - write zombie.json state file (we should defined in a way we can load later)

Expand Down
5 changes: 5 additions & 0 deletions crates/orchestrator/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl<T: FileSystem> Network<T> {
scoped_fs: &scoped_fs,
parachain: para_spec,
bootnodes_addr: &vec![],
wait_ready: true,
};

let mut global_files_to_inject = vec![TransferedFile {
Expand Down Expand Up @@ -190,6 +191,10 @@ impl<T: FileSystem> Network<T> {
}
}

pub fn nodes(&self) -> Vec<&NetworkNode> {
self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
}

// Internal API
pub(crate) fn add_running_node(&mut self, node: NetworkNode, para_id: Option<u32>) {
if let Some(para_id) = para_id {
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/network_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod verifier;
34 changes: 34 additions & 0 deletions crates/orchestrator/src/network_helper/verifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Duration;

use tokio::time::timeout;

use crate::network::node::NetworkNode;

pub async fn verify_nodes(nodes: &[&NetworkNode]) -> Result<(), anyhow::Error> {
timeout(Duration::from_secs(90), check_nodes(nodes))
.await
.map_err(|_| anyhow::anyhow!("one or more nodes are not ready!"))
}

// TODO: we should inject in someway the logic to make the request
// in order to allow us to `mock` and easily test this.
pepoviola marked this conversation as resolved.
Show resolved Hide resolved
// maybe moved to the provider with a NodeStatus, and some helpers like wait_running, wait_ready, etc... ? to be discussed
async fn check_nodes(nodes: &[&NetworkNode]) {
loop {
let tasks: Vec<_> = nodes
.iter()
.map(|node| {
// TODO: move to logger
// println!("getting from {}", node.name);
reqwest::get(node.prometheus_uri.clone())
})
.collect();

let all_ready = futures::future::try_join_all(tasks).await;
if all_ready.is_ok() {
return;
}

tokio::time::sleep(Duration::from_millis(500)).await;
}
}
4 changes: 4 additions & 0 deletions crates/orchestrator/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub struct SpawnNodeCtx<'a, T: FileSystem> {
pub(crate) parachain: Option<&'a ParachainSpec>,
/// The string represenation of the bootnode addres to pass to nodes
pub(crate) bootnodes_addr: &'a Vec<String>,
/// Flag to wait node is ready or not
/// Ready state means we can query prometheus internal server
pub(crate) wait_ready: bool,
}

pub async fn spawn_node<'a, T>(
Expand Down Expand Up @@ -153,6 +156,7 @@ where
node.name
);
println!("🚀 {} : metrics link {prometheus_uri}", node.name);
println!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name);
println!("\n");
Ok(NetworkNode::new(
node.name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/tx_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod register_para;
pub mod validator_actions;
66 changes: 66 additions & 0 deletions crates/orchestrator/src/tx_helper/register_para.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::str::FromStr;

use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};
use support::fs::FileSystem;

use crate::{shared::types::RegisterParachainOptions, ScopedFilesystem};


pub async fn register(
options: RegisterParachainOptions,
scoped_fs: &ScopedFilesystem<'_, impl FileSystem>,
) -> Result<(), anyhow::Error> {
println!("Registering parachain: {:?}", options);
// get the seed
let sudo: Keypair;
if let Some(possible_seed) = options.seed {
sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
} else {
let uri = SecretUri::from_str("//Alice")?;
sudo = Keypair::from_uri(&uri)?;
}

let genesis_state = scoped_fs
.read_to_string(options.state_path)
.await
.expect("State Path should be ok by this point.");
let wasm_data = scoped_fs
.read_to_string(options.wasm_path)
.await
.expect("Wasm Path should be ok by this point.");

let api = OnlineClient::<SubstrateConfig>::from_url(options.node_ws_url).await?;

let schedule_para = subxt::dynamic::tx(
"ParasSudoWrapper",
"sudo_schedule_para_initialize",
vec![
Value::primitive(options.id.into()),
Value::named_composite([
(
"genesis_head",
Value::from_bytes(hex::decode(&genesis_state[2..])?),
),
(
"validation_code",
Value::from_bytes(hex::decode(&wasm_data[2..])?),
),
("para_kind", Value::bool(options.onboard_as_para)),
]),
],
);

let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![schedule_para.into_value()]);

// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;

let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}
48 changes: 48 additions & 0 deletions crates/orchestrator/src/tx_helper/validator_actions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::str::FromStr;

use subxt::{dynamic::Value, OnlineClient, SubstrateConfig};
use subxt_signer::{sr25519::Keypair, SecretUri};


pub async fn register(
validator_ids: Vec<String>,
node_ws_url: &str,
) -> Result<(), anyhow::Error> {
println!("Registering validators: {:?}", validator_ids);
// get the seed
// let sudo: Keypair;
// if let Some(possible_seed) = options.seed {
// sudo = Keypair::from_seed(possible_seed).expect("seed should return a Keypair.");
// } else {
let uri = SecretUri::from_str("//Alice")?;
let sudo = Keypair::from_uri(&uri)?;
// }

println!("pse");
let api = OnlineClient::<SubstrateConfig>::from_url(node_ws_url).await?;
println!("pse connected");

// let bytes: Vec<Value> = validator_ids.iter().map(|id| Value::from_bytes(id)).collect();
// println!("{:?}", bytes);

let register_call = subxt::dynamic::tx(
"ValidatorManager",
"register_validators",
vec![Value::unnamed_composite(vec![Value::from_bytes(validator_ids.first().unwrap().as_bytes())])],
);

let sudo_call = subxt::dynamic::tx("Sudo", "sudo", vec![register_call.into_value()]);

println!("pse1");
// TODO: uncomment below and fix the sign and submit (and follow afterwards until
// finalized block) to register the parachain
let result = api
.tx()
.sign_and_submit_then_watch_default(&sudo_call, &sudo)
.await?;

println!("result: {:#?}", result);
let result = result.wait_for_in_block().await?;
println!("In block: {:#?}", result.block_hash());
Ok(())
}