diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 8feb59fdf..b13312c4b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -37,8 +37,12 @@ jobs:
- name: Build
run: cargo build
- - name: Tests
- run: cargo test
+ - name: Tests (except provider crate)
+ run: cargo test --workspace --exclude provider
+
+ - name: Tests (provider crate)
+ # there should be a unique test thread for native provider tests (asserting spawned processes count)
+ run: cargo test -p provider -- --test-threads 1
coverage:
name: Zombienet SDK - coverage
@@ -60,7 +64,8 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- name: Collect coverage data
- run: cargo llvm-cov nextest --lcov --output-path lcov.info
+ # there should be a unique test thread for native provider tests (asserting spawned processes count)
+ run: cargo llvm-cov nextest -j 1 --lcov --output-path lcov.info
- name: Report code coverage
uses: Nef10/lcov-reporter-action@v0.4.0
diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml
index a40eee4ad..d22fb55ce 100644
--- a/.github/workflows/documentation.yml
+++ b/.github/workflows/documentation.yml
@@ -35,13 +35,15 @@ jobs:
cargo doc --no-deps
echo "" > target/doc/index.html
+
+
- name: Move docs
run: |
mkdir -p ./doc
mv ./target/doc/* ./doc
git config user.email "github-action@users.noreply.github.com"
git config user.name "GitHub Action"
- git config user.password ${{ secrets.GH_PAGES_TOKEN }}
+ git config user.password "${{ secrets.GH_PAGES_TOKEN }}"
git checkout --orphan gh-pages
mkdir to_delete
shopt -s extglob
diff --git a/Cargo.toml b/Cargo.toml
index d56376d75..5d53e6cd9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,3 +23,6 @@ regex = "1.8"
lazy_static = "1.4"
multiaddr = "0.18"
url = "2.3"
+uuid = "1.4"
+nix = "0.27"
+procfs = "0.15"
diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml
index ad20f731e..56d70d80a 100644
--- a/crates/configuration/Cargo.toml
+++ b/crates/configuration/Cargo.toml
@@ -7,7 +7,7 @@ edition = "2021"
regex = { workspace = true }
lazy_static = { workspace = true }
multiaddr = { workspace = true }
-url = { workspace = true }
+url = { workspace = true, features = ["serde"] }
thiserror = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true, features = ["derive"] }
diff --git a/crates/configuration/src/global_settings.rs b/crates/configuration/src/global_settings.rs
index bc1ef853d..607951e89 100644
--- a/crates/configuration/src/global_settings.rs
+++ b/crates/configuration/src/global_settings.rs
@@ -1,7 +1,7 @@
use std::{error::Error, fmt::Display, net::IpAddr, str::FromStr};
use multiaddr::Multiaddr;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use crate::shared::{
errors::{ConfigError, FieldError},
@@ -10,9 +10,9 @@ use crate::shared::{
};
/// Global settings applied to an entire network.
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct GlobalSettings {
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
bootnodes_addresses: Vec,
// TODO: parse both case in zombienet node version to avoid renamed ?
#[serde(rename = "timeout")]
@@ -44,7 +44,6 @@ impl GlobalSettings {
}
/// A global settings builder, used to build [`GlobalSettings`] declaratively with fields validation.
-#[derive(Debug)]
pub struct GlobalSettingsBuilder {
config: GlobalSettings,
errors: Vec,
diff --git a/crates/configuration/src/hrmp_channel.rs b/crates/configuration/src/hrmp_channel.rs
index 19139371c..228ff2ef4 100644
--- a/crates/configuration/src/hrmp_channel.rs
+++ b/crates/configuration/src/hrmp_channel.rs
@@ -1,11 +1,11 @@
use std::marker::PhantomData;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use crate::shared::{macros::states, types::ParaId};
/// HRMP channel configuration, with fine-grained configuration options.
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct HrmpChannelConfig {
sender: ParaId,
recipient: ParaId,
@@ -42,7 +42,6 @@ states! {
}
/// HRMP channel configuration builder, used to build an [`HrmpChannelConfig`] declaratively with fields validation.
-#[derive(Debug)]
pub struct HrmpChannelConfigBuilder {
config: HrmpChannelConfig,
_state: PhantomData,
diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs
index 624ac6909..3908b3691 100644
--- a/crates/configuration/src/lib.rs
+++ b/crates/configuration/src/lib.rs
@@ -1,3 +1,4 @@
+#![allow(clippy::expect_fun_call)]
mod global_settings;
mod hrmp_channel;
mod network;
@@ -11,3 +12,5 @@ pub use hrmp_channel::{HrmpChannelConfig, HrmpChannelConfigBuilder};
pub use network::{NetworkConfig, NetworkConfigBuilder};
pub use parachain::{ParachainConfig, ParachainConfigBuilder};
pub use relaychain::{RelaychainConfig, RelaychainConfigBuilder};
+// re-export shared
+pub use shared::{node::NodeConfig, types};
diff --git a/crates/configuration/src/network.rs b/crates/configuration/src/network.rs
index b58976829..f5a6160c3 100644
--- a/crates/configuration/src/network.rs
+++ b/crates/configuration/src/network.rs
@@ -1,25 +1,35 @@
-use std::{cell::RefCell, marker::PhantomData, rc::Rc};
+use std::{cell::RefCell, fs, marker::PhantomData, rc::Rc};
+use anyhow::anyhow;
use regex::Regex;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use crate::{
global_settings::{GlobalSettings, GlobalSettingsBuilder},
hrmp_channel::{self, HrmpChannelConfig, HrmpChannelConfigBuilder},
parachain::{self, ParachainConfig, ParachainConfigBuilder},
relaychain::{self, RelaychainConfig, RelaychainConfigBuilder},
- shared::{helpers::merge_errors_vecs, macros::states, types::ValidationContext},
+ shared::{
+ constants::{
+ NO_ERR_DEF_BUILDER, RELAY_NOT_NONE, RW_FAILED, THIS_IS_A_BUG, VALIDATION_CHECK,
+ VALID_REGEX,
+ },
+ helpers::merge_errors_vecs,
+ macros::states,
+ node::NodeConfig,
+ types::{Arg, AssetLocation, Chain, Command, Image, ValidationContext},
+ },
};
/// A network configuration, composed of a relaychain, parachains and HRMP channels.
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NetworkConfig {
#[serde(rename = "settings")]
global_settings: GlobalSettings,
relaychain: Option,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
parachains: Vec,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
hrmp_channels: Vec,
}
@@ -33,7 +43,7 @@ impl NetworkConfig {
pub fn relaychain(&self) -> &RelaychainConfig {
self.relaychain
.as_ref()
- .expect("typestate should ensure the relaychain isn't None at this point, this is a bug please report it: https://github.com/paritytech/zombienet-sdk/issues")
+ .expect(&format!("{}, {}", RELAY_NOT_NONE, THIS_IS_A_BUG))
}
/// The parachains of the network.
@@ -48,11 +58,110 @@ impl NetworkConfig {
pub fn dump_to_toml(&self) -> Result {
// This regex is used to replace the "" enclosed u128 value to a raw u128 because u128 is not supported for TOML serialization/deserialization.
- let re = Regex::new(r#""U128%(?\d+)""#).expect("regex should be valid, this is a bug please report it: https://github.com/paritytech/zombienet-sdk/issues");
+ let re = Regex::new(r#""U128%(?\d+)""#)
+ .expect(&format!("{} {}", VALID_REGEX, THIS_IS_A_BUG));
let toml_string = toml::to_string_pretty(&self)?;
Ok(re.replace_all(&toml_string, "$u128_value").to_string())
}
+
+ pub fn load_from_toml(path: &str) -> Result {
+ let file_str = fs::read_to_string(path).expect(&format!("{} {}", RW_FAILED, THIS_IS_A_BUG));
+ let re: Regex = Regex::new(r"(?(initial_)?balance)\s+=\s+(?\d+)")
+ .expect(&format!("{} {}", VALID_REGEX, THIS_IS_A_BUG));
+
+ let mut network_config: NetworkConfig = toml::from_str(
+ re.replace_all(&file_str, "$field_name = \"$u128_value\"")
+ .as_ref(),
+ )?;
+
+ // All unwraps below are safe, because we ensure that the relaychain is not None at this point
+ if network_config.relaychain.is_none() {
+ Err(anyhow!("Relay chain does not exist."))?
+ }
+
+ // retrieve the defaults relaychain for assigning to nodes if needed
+ let relaychain_default_command: Option =
+ network_config.relaychain().default_command().cloned();
+
+ let relaychain_default_image: Option =
+ network_config.relaychain().default_image().cloned();
+
+ let relaychain_default_db_snapshot: Option =
+ network_config.relaychain().default_db_snapshot().cloned();
+
+ let default_args: Vec = network_config
+ .relaychain()
+ .default_args()
+ .into_iter()
+ .cloned()
+ .collect();
+
+ let mut nodes: Vec = network_config
+ .relaychain()
+ .nodes()
+ .into_iter()
+ .cloned()
+ .collect();
+
+ // Validation checks for relay
+ TryInto::::try_into(network_config.relaychain().chain().as_str())?;
+ if relaychain_default_image.is_some() {
+ TryInto::::try_into(relaychain_default_image.clone().expect(VALIDATION_CHECK))?;
+ }
+ if relaychain_default_command.is_some() {
+ TryInto::::try_into(
+ relaychain_default_command.clone().expect(VALIDATION_CHECK),
+ )?;
+ }
+
+ for node in nodes.iter_mut() {
+ if relaychain_default_command.is_some() {
+ // we modify only nodes which don't already have a command
+ if node.command.is_none() {
+ node.command = relaychain_default_command.clone();
+ }
+ }
+
+ if relaychain_default_image.is_some() && node.image.is_none() {
+ node.image = relaychain_default_image.clone();
+ }
+
+ if relaychain_default_db_snapshot.is_some() && node.db_snapshot.is_none() {
+ node.db_snapshot = relaychain_default_db_snapshot.clone();
+ }
+
+ if !default_args.is_empty() && node.args().is_empty() {
+ node.set_args(default_args.clone());
+ }
+ }
+
+ network_config
+ .relaychain
+ .as_mut()
+ .expect(&format!("{}, {}", NO_ERR_DEF_BUILDER, THIS_IS_A_BUG))
+ .set_nodes(nodes);
+
+ // Validation checks for parachains
+ network_config.parachains().iter().for_each(|parachain| {
+ let _ = TryInto::::try_into(
+ parachain
+ .chain()
+ .ok_or("chain name must exist")
+ .unwrap()
+ .as_str(),
+ );
+
+ if parachain.default_image().is_some() {
+ let _ = TryInto::::try_into(parachain.default_image().unwrap().as_str());
+ }
+ if parachain.default_command().is_some() {
+ let _ = TryInto::::try_into(parachain.default_command().unwrap().as_str());
+ }
+ });
+
+ Ok(network_config)
+ }
}
states! {
@@ -132,7 +241,6 @@ states! {
///
/// assert!(network_config.is_ok())
/// ```
-#[derive(Debug)]
pub struct NetworkConfigBuilder {
config: NetworkConfig,
validation_context: Rc>,
@@ -144,9 +252,9 @@ impl Default for NetworkConfigBuilder {
fn default() -> Self {
Self {
config: NetworkConfig {
- global_settings: GlobalSettingsBuilder::new().build().expect(
- "should have no errors for default builder. this is a bug, please report it",
- ),
+ global_settings: GlobalSettingsBuilder::new()
+ .build()
+ .expect(&format!("{}, {}", NO_ERR_DEF_BUILDER, THIS_IS_A_BUG)),
relaychain: None,
parachains: vec![],
hrmp_channels: vec![],
@@ -852,4 +960,436 @@ mod tests {
fs::read_to_string("./testing/snapshots/0002-overridden-defaults.toml").unwrap();
assert_eq!(got, expected);
}
+
+ #[test]
+ fn the_toml_config_should_be_imported_and_match_a_network() {
+ let load_from_toml =
+ NetworkConfig::load_from_toml("./testing/snapshots/0000-small-network.toml").unwrap();
+
+ let expected = NetworkConfigBuilder::new()
+ .with_relaychain(|relaychain| {
+ relaychain
+ .with_chain("rococo-local")
+ .with_default_command("polkadot")
+ .with_default_image("docker.io/parity/polkadot:latest")
+ .with_default_args(vec![("-lparachain", "debug").into()])
+ .with_node(|node| {
+ node.with_name("alice")
+ .validator(true)
+ .invulnerable(true)
+ .validator(true)
+ .bootnode(false)
+ .with_initial_balance(2000000000000)
+ })
+ .with_node(|node| {
+ node.with_name("bob")
+ .with_args(vec![("--database", "paritydb-experimental").into()])
+ .validator(true)
+ .invulnerable(false)
+ .bootnode(true)
+ .with_initial_balance(2000000000000)
+ })
+ })
+ .build()
+ .unwrap();
+
+ // We need to assert parts of the network config separately because the expected one contains the chain default context which
+ // is used for dumbing to tomp while the
+ // while loaded
+ assert_eq!(
+ expected.relaychain().chain(),
+ load_from_toml.relaychain().chain()
+ );
+ assert_eq!(
+ expected.relaychain().default_args(),
+ load_from_toml.relaychain().default_args()
+ );
+ assert_eq!(
+ expected.relaychain().default_command(),
+ load_from_toml.relaychain().default_command()
+ );
+ assert_eq!(
+ expected.relaychain().default_image(),
+ load_from_toml.relaychain().default_image()
+ );
+
+ // Check the nodes without the Chain Default Context
+ expected
+ .relaychain()
+ .nodes()
+ .iter()
+ .zip(load_from_toml.relaychain().nodes().iter())
+ .for_each(|(expected_node, loaded_node)| {
+ assert_eq!(expected_node.name(), loaded_node.name());
+ assert_eq!(expected_node.command(), loaded_node.command());
+ assert_eq!(expected_node.args(), loaded_node.args());
+ assert_eq!(
+ expected_node.is_invulnerable(),
+ loaded_node.is_invulnerable()
+ );
+ assert_eq!(expected_node.is_validator(), loaded_node.is_validator());
+ assert_eq!(expected_node.is_bootnode(), loaded_node.is_bootnode());
+ assert_eq!(
+ expected_node.initial_balance(),
+ loaded_node.initial_balance()
+ );
+ });
+ }
+
+ #[test]
+ fn the_toml_config_should_be_imported_and_match_a_network_with_parachains() {
+ let load_from_toml =
+ NetworkConfig::load_from_toml("./testing/snapshots/0001-big-network.toml").unwrap();
+
+ let expected = NetworkConfigBuilder::new()
+ .with_relaychain(|relaychain| {
+ relaychain
+ .with_chain("polkadot")
+ .with_default_command("polkadot")
+ .with_default_image("docker.io/parity/polkadot:latest")
+ .with_default_resources(|resources| {
+ resources
+ .with_request_cpu(100000)
+ .with_request_memory("500M")
+ .with_limit_cpu("10Gi")
+ .with_limit_memory("4000M")
+ })
+ .with_node(|node| {
+ node.with_name("alice")
+ .with_initial_balance(1_000_000_000)
+ .validator(true)
+ .bootnode(true)
+ .invulnerable(true)
+ })
+ .with_node(|node| {
+ node.with_name("bob")
+ .validator(true)
+ .invulnerable(true)
+ .bootnode(true)
+ })
+ })
+ .with_parachain(|parachain| {
+ parachain
+ .with_id(1000)
+ .with_chain("myparachain")
+ .with_chain_spec_path("/path/to/my/chain/spec.json")
+ .with_registration_strategy(RegistrationStrategy::UsingExtrinsic)
+ .onboard_as_parachain(false)
+ .with_default_db_snapshot("https://storage.com/path/to/db_snapshot.tgz")
+ .with_collator(|collator| {
+ collator
+ .with_name("john")
+ .bootnode(true)
+ .validator(true)
+ .invulnerable(true)
+ .with_initial_balance(5_000_000_000)
+ })
+ .with_collator(|collator| {
+ collator
+ .with_name("charles")
+ .bootnode(true)
+ .invulnerable(true)
+ .with_initial_balance(0)
+ })
+ .with_collator(|collator| {
+ collator
+ .with_name("frank")
+ .validator(true)
+ .bootnode(true)
+ .with_initial_balance(1_000_000_000)
+ })
+ })
+ .with_parachain(|parachain| {
+ parachain
+ .with_id(2000)
+ .with_chain("myotherparachain")
+ .with_chain_spec_path("/path/to/my/other/chain/spec.json")
+ .with_collator(|collator| {
+ collator
+ .with_name("mike")
+ .bootnode(true)
+ .validator(true)
+ .invulnerable(true)
+ .with_initial_balance(5_000_000_000)
+ })
+ .with_collator(|collator| {
+ collator
+ .with_name("georges")
+ .bootnode(true)
+ .invulnerable(true)
+ .with_initial_balance(0)
+ })
+ .with_collator(|collator| {
+ collator
+ .with_name("victor")
+ .validator(true)
+ .bootnode(true)
+ .with_initial_balance(1_000_000_000)
+ })
+ })
+ .with_hrmp_channel(|hrmp_channel| {
+ hrmp_channel
+ .with_sender(1000)
+ .with_recipient(2000)
+ .with_max_capacity(150)
+ .with_max_message_size(5000)
+ })
+ .with_hrmp_channel(|hrmp_channel| {
+ hrmp_channel
+ .with_sender(2000)
+ .with_recipient(1000)
+ .with_max_capacity(200)
+ .with_max_message_size(8000)
+ })
+ .build()
+ .unwrap();
+
+ // Check the relay chain
+ assert_eq!(
+ expected.relaychain().default_resources(),
+ load_from_toml.relaychain().default_resources()
+ );
+
+ // Check the nodes without the Chain Default Context
+ expected
+ .relaychain()
+ .nodes()
+ .iter()
+ .zip(load_from_toml.relaychain().nodes().iter())
+ .for_each(|(expected_node, loaded_node)| {
+ assert_eq!(expected_node.name(), loaded_node.name());
+ assert_eq!(expected_node.command(), loaded_node.command());
+ assert_eq!(expected_node.args(), loaded_node.args());
+ assert_eq!(expected_node.is_validator(), loaded_node.is_validator());
+ assert_eq!(expected_node.is_bootnode(), loaded_node.is_bootnode());
+ assert_eq!(
+ expected_node.initial_balance(),
+ loaded_node.initial_balance()
+ );
+ assert_eq!(
+ expected_node.is_invulnerable(),
+ loaded_node.is_invulnerable()
+ );
+ });
+
+ expected
+ .parachains()
+ .iter()
+ .zip(load_from_toml.parachains().iter())
+ .for_each(|(expected_parachain, loaded_parachain)| {
+ assert_eq!(expected_parachain.id(), loaded_parachain.id());
+ assert_eq!(expected_parachain.chain(), loaded_parachain.chain());
+ assert_eq!(
+ expected_parachain.chain_spec_path(),
+ loaded_parachain.chain_spec_path()
+ );
+ assert_eq!(
+ expected_parachain.registration_strategy(),
+ loaded_parachain.registration_strategy()
+ );
+ assert_eq!(
+ expected_parachain.onboard_as_parachain(),
+ loaded_parachain.onboard_as_parachain()
+ );
+ assert_eq!(
+ expected_parachain.default_db_snapshot(),
+ loaded_parachain.default_db_snapshot()
+ );
+ assert_eq!(
+ expected_parachain.default_command(),
+ loaded_parachain.default_command()
+ );
+ assert_eq!(
+ expected_parachain.default_image(),
+ loaded_parachain.default_image()
+ );
+ assert_eq!(
+ expected_parachain.collators().len(),
+ loaded_parachain.collators().len()
+ );
+ expected_parachain
+ .collators()
+ .iter()
+ .zip(loaded_parachain.collators().iter())
+ .for_each(|(expected_collator, loaded_collator)| {
+ assert_eq!(expected_collator.name(), loaded_collator.name());
+ assert_eq!(expected_collator.command(), loaded_collator.command());
+ assert_eq!(expected_collator.image(), loaded_collator.image());
+ assert_eq!(
+ expected_collator.is_validator(),
+ loaded_collator.is_validator()
+ );
+ assert_eq!(
+ expected_collator.is_bootnode(),
+ loaded_collator.is_bootnode()
+ );
+ assert_eq!(
+ expected_collator.is_invulnerable(),
+ loaded_collator.is_invulnerable()
+ );
+ assert_eq!(
+ expected_collator.initial_balance(),
+ loaded_collator.initial_balance()
+ );
+ });
+ });
+
+ expected
+ .hrmp_channels()
+ .iter()
+ .zip(load_from_toml.hrmp_channels().iter())
+ .for_each(|(expected_hrmp_channel, loaded_hrmp_channel)| {
+ assert_eq!(expected_hrmp_channel.sender(), loaded_hrmp_channel.sender());
+ assert_eq!(
+ expected_hrmp_channel.recipient(),
+ loaded_hrmp_channel.recipient()
+ );
+ assert_eq!(
+ expected_hrmp_channel.max_capacity(),
+ loaded_hrmp_channel.max_capacity()
+ );
+ assert_eq!(
+ expected_hrmp_channel.max_message_size(),
+ loaded_hrmp_channel.max_message_size()
+ );
+ });
+ }
+
+ #[test]
+ fn the_toml_config_should_be_imported_and_match_a_network_with_overriden_defaults() {
+ let load_from_toml =
+ NetworkConfig::load_from_toml("./testing/snapshots/0002-overridden-defaults.toml")
+ .unwrap();
+
+ let expected = NetworkConfigBuilder::new()
+ .with_relaychain(|relaychain| {
+ relaychain
+ .with_chain("polkadot")
+ .with_default_command("polkadot")
+ .with_default_image("docker.io/parity/polkadot:latest")
+ .with_default_args(vec![("-name", "value").into(), "--flag".into()])
+ .with_default_db_snapshot("https://storage.com/path/to/db_snapshot.tgz")
+ .with_default_resources(|resources| {
+ resources
+ .with_request_cpu(100000)
+ .with_request_memory("500M")
+ .with_limit_cpu("10Gi")
+ .with_limit_memory("4000M")
+ })
+ .with_node(|node| {
+ node.with_name("alice")
+ .with_initial_balance(1_000_000_000)
+ .validator(true)
+ .bootnode(true)
+ .invulnerable(true)
+ })
+ .with_node(|node| {
+ node.with_name("bob")
+ .validator(true)
+ .invulnerable(true)
+ .bootnode(true)
+ .with_image("mycustomimage:latest")
+ .with_command("my-custom-command")
+ .with_db_snapshot("https://storage.com/path/to/other/db_snapshot.tgz")
+ .with_resources(|resources| {
+ resources
+ .with_request_cpu(1000)
+ .with_request_memory("250Mi")
+ .with_limit_cpu("5Gi")
+ .with_limit_memory("2Gi")
+ })
+ .with_args(vec![("-myothername", "value").into()])
+ })
+ })
+ .with_parachain(|parachain| {
+ parachain
+ .with_id(1000)
+ .with_chain("myparachain")
+ .with_chain_spec_path("/path/to/my/chain/spec.json")
+ .with_default_db_snapshot("https://storage.com/path/to/other_snapshot.tgz")
+ .with_default_command("my-default-command")
+ .with_default_image("mydefaultimage:latest")
+ .with_collator(|collator| {
+ collator
+ .with_name("john")
+ .bootnode(true)
+ .validator(true)
+ .invulnerable(true)
+ .with_initial_balance(5_000_000_000)
+ .with_command("my-non-default-command")
+ .with_image("anotherimage:latest")
+ })
+ .with_collator(|collator| {
+ collator
+ .with_name("charles")
+ .bootnode(true)
+ .invulnerable(true)
+ .with_initial_balance(0)
+ })
+ })
+ .build()
+ .unwrap();
+
+ expected
+ .parachains()
+ .iter()
+ .zip(load_from_toml.parachains().iter())
+ .for_each(|(expected_parachain, loaded_parachain)| {
+ assert_eq!(expected_parachain.id(), loaded_parachain.id());
+ assert_eq!(expected_parachain.chain(), loaded_parachain.chain());
+ assert_eq!(
+ expected_parachain.chain_spec_path(),
+ loaded_parachain.chain_spec_path()
+ );
+ assert_eq!(
+ expected_parachain.registration_strategy(),
+ loaded_parachain.registration_strategy()
+ );
+ assert_eq!(
+ expected_parachain.onboard_as_parachain(),
+ loaded_parachain.onboard_as_parachain()
+ );
+ assert_eq!(
+ expected_parachain.default_db_snapshot(),
+ loaded_parachain.default_db_snapshot()
+ );
+ assert_eq!(
+ expected_parachain.default_command(),
+ loaded_parachain.default_command()
+ );
+ assert_eq!(
+ expected_parachain.default_image(),
+ loaded_parachain.default_image()
+ );
+ assert_eq!(
+ expected_parachain.collators().len(),
+ loaded_parachain.collators().len()
+ );
+ expected_parachain
+ .collators()
+ .iter()
+ .zip(loaded_parachain.collators().iter())
+ .for_each(|(expected_collator, loaded_collator)| {
+ assert_eq!(expected_collator.name(), loaded_collator.name());
+ assert_eq!(expected_collator.command(), loaded_collator.command());
+ assert_eq!(expected_collator.image(), loaded_collator.image());
+ assert_eq!(
+ expected_collator.is_validator(),
+ loaded_collator.is_validator()
+ );
+ assert_eq!(
+ expected_collator.is_bootnode(),
+ loaded_collator.is_bootnode()
+ );
+ assert_eq!(
+ expected_collator.is_invulnerable(),
+ loaded_collator.is_invulnerable()
+ );
+ assert_eq!(
+ expected_collator.initial_balance(),
+ loaded_collator.initial_balance()
+ );
+ });
+ });
+ }
}
diff --git a/crates/configuration/src/parachain.rs b/crates/configuration/src/parachain.rs
index 2bb2d205c..4bc0907a8 100644
--- a/crates/configuration/src/parachain.rs
+++ b/crates/configuration/src/parachain.rs
@@ -1,17 +1,24 @@
use std::{cell::RefCell, error::Error, fmt::Display, marker::PhantomData, rc::Rc};
use multiaddr::Multiaddr;
-use serde::{ser::SerializeStruct, Serialize};
-
-use crate::shared::{
- errors::{ConfigError, FieldError},
- helpers::{ensure_parachain_id_unique, merge_errors, merge_errors_vecs},
- macros::states,
- node::{self, NodeConfig, NodeConfigBuilder},
- resources::{Resources, ResourcesBuilder},
- types::{
- Arg, AssetLocation, Chain, ChainDefaultContext, Command, Image, ValidationContext, U128,
+use serde::{
+ de::{self, Visitor},
+ ser::SerializeStruct,
+ Deserialize, Serialize,
+};
+
+use crate::{
+ shared::{
+ errors::{ConfigError, FieldError},
+ helpers::{ensure_parachain_id_unique, merge_errors, merge_errors_vecs},
+ macros::states,
+ node::{self, NodeConfig, NodeConfigBuilder},
+ resources::{Resources, ResourcesBuilder},
+ types::{
+ Arg, AssetLocation, Chain, ChainDefaultContext, Command, Image, ValidationContext, U128,
+ },
},
+ utils::default_as_true,
};
#[derive(Debug, Clone, PartialEq)]
@@ -36,14 +43,67 @@ impl Serialize for RegistrationStrategy {
}
}
+struct RegistrationStrategyVisitor;
+
+impl<'de> Visitor<'de> for RegistrationStrategyVisitor {
+ type Value = RegistrationStrategy;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("struct RegistrationStrategy")
+ }
+
+ fn visit_map(self, mut map: A) -> Result
+ where
+ A: serde::de::MapAccess<'de>,
+ {
+ let mut add_to_genesis = false;
+ let mut register_para = false;
+
+ while let Some(key) = map.next_key::()? {
+ match key.as_str() {
+ "add_to_genesis" => add_to_genesis = map.next_value()?,
+ "register_para" => register_para = map.next_value()?,
+ _ => {
+ return Err(de::Error::unknown_field(
+ &key,
+ &["add_to_genesis", "register_para"],
+ ))
+ },
+ }
+ }
+
+ match (add_to_genesis, register_para) {
+ (true, false) => Ok(RegistrationStrategy::InGenesis),
+ (false, true) => Ok(RegistrationStrategy::UsingExtrinsic),
+ _ => Err(de::Error::missing_field("add_to_genesis or register_para")),
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for RegistrationStrategy {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: serde::Deserializer<'de>,
+ {
+ deserializer.deserialize_struct(
+ "RegistrationStrategy",
+ &["add_to_genesis", "register_para"],
+ RegistrationStrategyVisitor,
+ )
+ }
+}
+
/// A parachain configuration, composed of collators and fine-grained configuration options.
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ParachainConfig {
id: u32,
chain: Option,
#[serde(flatten)]
registration_strategy: Option,
- #[serde(skip_serializing_if = "super::utils::is_true")]
+ #[serde(
+ skip_serializing_if = "super::utils::is_true",
+ default = "default_as_true"
+ )]
onboard_as_parachain: bool,
#[serde(rename = "balance")]
initial_balance: U128,
@@ -51,7 +111,7 @@ pub struct ParachainConfig {
default_image: Option,
default_resources: Option,
default_db_snapshot: Option,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
default_args: Vec,
genesis_wasm_path: Option,
genesis_wasm_generator: Option,
@@ -60,9 +120,9 @@ pub struct ParachainConfig {
chain_spec_path: Option,
#[serde(rename = "cumulus_based")]
is_cumulus_based: bool,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
bootnodes_addresses: Vec,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
collators: Vec,
}
@@ -165,7 +225,6 @@ states! {
}
/// A parachain configuration builder, used to build a [`ParachainConfig`] declaratively with fields validation.
-#[derive(Debug)]
pub struct ParachainConfigBuilder {
config: ParachainConfig,
validation_context: Rc>,
@@ -624,7 +683,7 @@ impl ParachainConfigBuilder {
#[cfg(test)]
mod tests {
use super::*;
- use crate::NetworkConfigBuilder;
+ use crate::{NetworkConfig, NetworkConfigBuilder};
#[test]
fn parachain_config_builder_should_succeeds_and_returns_a_new_parachain_config() {
@@ -680,6 +739,7 @@ mod tests {
assert_eq!(collator2.command().unwrap().as_str(), "command2");
assert!(collator2.is_validator());
assert_eq!(parachain_config.chain().unwrap().as_str(), "mychainname");
+
assert_eq!(
parachain_config.registration_strategy().unwrap(),
&RegistrationStrategy::UsingExtrinsic
@@ -1037,6 +1097,36 @@ mod tests {
});
}
+ #[test]
+ fn import_toml_registration_strategy_should_deserialize() {
+ let load_from_toml =
+ NetworkConfig::load_from_toml("./testing/snapshots/0001-big-network.toml").unwrap();
+
+ for parachain in load_from_toml.parachains().iter() {
+ if parachain.id() == 1000 {
+ assert_eq!(
+ parachain.registration_strategy(),
+ Some(&RegistrationStrategy::UsingExtrinsic)
+ );
+ }
+ if parachain.id() == 2000 {
+ assert_eq!(
+ parachain.registration_strategy(),
+ Some(&RegistrationStrategy::InGenesis)
+ );
+ }
+ }
+
+ let load_from_toml_small = NetworkConfig::load_from_toml(
+ "./testing/snapshots/0003-small-network_w_parachain.toml",
+ )
+ .unwrap();
+
+ let parachain = load_from_toml_small.parachains()[0];
+
+ assert_eq!(parachain.registration_strategy(), None);
+ }
+
#[test]
fn onboard_as_parachain_should_default_to_true() {
let config = ParachainConfigBuilder::new(Default::default())
diff --git a/crates/configuration/src/relaychain.rs b/crates/configuration/src/relaychain.rs
index bb302904a..b087ddcf7 100644
--- a/crates/configuration/src/relaychain.rs
+++ b/crates/configuration/src/relaychain.rs
@@ -1,8 +1,9 @@
use std::{cell::RefCell, error::Error, fmt::Debug, marker::PhantomData, rc::Rc};
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use crate::shared::{
+ constants::{DEFAULT_TYPESTATE, THIS_IS_A_BUG},
errors::{ConfigError, FieldError},
helpers::{merge_errors, merge_errors_vecs},
macros::states,
@@ -12,19 +13,19 @@ use crate::shared::{
};
/// A relay chain configuration, composed of nodes and fine-grained configuration options.
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RelaychainConfig {
chain: Chain,
default_command: Option,
default_image: Option,
default_resources: Option,
default_db_snapshot: Option,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
default_args: Vec,
chain_spec_path: Option,
random_nominators_count: Option,
max_nominations: Option,
- #[serde(skip_serializing_if = "std::vec::Vec::is_empty")]
+ #[serde(skip_serializing_if = "std::vec::Vec::is_empty", default)]
nodes: Vec,
}
@@ -78,6 +79,10 @@ impl RelaychainConfig {
pub fn nodes(&self) -> Vec<&NodeConfig> {
self.nodes.iter().collect::>()
}
+
+ pub(crate) fn set_nodes(&mut self, nodes: Vec) {
+ self.nodes = nodes;
+ }
}
states! {
@@ -87,7 +92,6 @@ states! {
}
/// A relay chain configuration builder, used to build a [`RelaychainConfig`] declaratively with fields validation.
-#[derive(Debug)]
pub struct RelaychainConfigBuilder {
config: RelaychainConfig,
validation_context: Rc>,
@@ -101,7 +105,7 @@ impl Default for RelaychainConfigBuilder {
config: RelaychainConfig {
chain: "default"
.try_into()
- .expect("'default' overriding should be ensured by typestate. this is a bug, please report it: https://github.com/paritytech/zombienet-sdk/issues"),
+ .expect(&format!("{} {}", DEFAULT_TYPESTATE, THIS_IS_A_BUG)),
default_command: None,
default_image: None,
default_resources: None,
diff --git a/crates/configuration/src/shared.rs b/crates/configuration/src/shared.rs
index bb1d7bf19..36b71c43e 100644
--- a/crates/configuration/src/shared.rs
+++ b/crates/configuration/src/shared.rs
@@ -1,3 +1,4 @@
+pub mod constants;
pub mod errors;
pub mod helpers;
pub mod macros;
diff --git a/crates/configuration/src/shared/constants.rs b/crates/configuration/src/shared/constants.rs
new file mode 100644
index 000000000..9d5f302be
--- /dev/null
+++ b/crates/configuration/src/shared/constants.rs
@@ -0,0 +1,14 @@
+pub const VALID_REGEX: &str = "regex should be valid ";
+pub const BORROWABLE: &str = "must be borrowable as mutable ";
+pub const RELAY_NOT_NONE: &str = "typestate should ensure the relaychain isn't None at this point ";
+pub const SHOULD_COMPILE: &str = "should compile with success ";
+pub const INFAILABLE: &str = "infaillible ";
+pub const NO_ERR_DEF_BUILDER: &str = "should have no errors for default builder ";
+pub const RW_FAILED: &str = "should be able to read/write - failed ";
+pub const DEFAULT_TYPESTATE: &str = "'default' overriding should be ensured by typestate ";
+pub const VALIDATION_CHECK: &str = "validation failed ";
+
+pub const PREFIX_CANT_BE_NONE: &str = "name prefix can't be None if a value exists ";
+
+pub const THIS_IS_A_BUG: &str =
+ "- this is a bug please report it: https://github.com/paritytech/zombienet-sdk/issues";
diff --git a/crates/configuration/src/shared/helpers.rs b/crates/configuration/src/shared/helpers.rs
index b70565b22..d47e4bec7 100644
--- a/crates/configuration/src/shared/helpers.rs
+++ b/crates/configuration/src/shared/helpers.rs
@@ -1,6 +1,7 @@
use std::{cell::RefCell, rc::Rc};
use super::{
+ constants::{BORROWABLE, THIS_IS_A_BUG},
errors::ValidationError,
types::{ParaId, Port, ValidationContext},
};
@@ -31,7 +32,7 @@ pub fn ensure_node_name_unique(
) -> Result<(), anyhow::Error> {
let mut context = validation_context
.try_borrow_mut()
- .expect("must be borrowable as mutable, this is a bug please report it: https://github.com/paritytech/zombienet-sdk/issues");
+ .expect(&format!("{}, {}", BORROWABLE, THIS_IS_A_BUG));
if !context.used_nodes_names.contains(&node_name) {
context.used_nodes_names.push(node_name);
@@ -47,7 +48,7 @@ pub fn ensure_port_unique(
) -> Result<(), anyhow::Error> {
let mut context = validation_context
.try_borrow_mut()
- .expect("must be borrowable as mutable, this is a bug please report it: https://github.com/paritytech/zombienet-sdk/issues");
+ .expect(&format!("{}, {}", BORROWABLE, THIS_IS_A_BUG));
if !context.used_ports.contains(&port) {
context.used_ports.push(port);
diff --git a/crates/configuration/src/shared/node.rs b/crates/configuration/src/shared/node.rs
index cb8e65f2e..d1e0bb747 100644
--- a/crates/configuration/src/shared/node.rs
+++ b/crates/configuration/src/shared/node.rs
@@ -1,7 +1,7 @@
use std::{cell::RefCell, error::Error, fmt::Display, marker::PhantomData, rc::Rc};
use multiaddr::Multiaddr;
-use serde::{ser::SerializeStruct, Serialize};
+use serde::{ser::SerializeStruct, Deserialize, Serialize};
use super::{
errors::FieldError,
@@ -33,7 +33,7 @@ use crate::shared::{
/// }
/// )
/// ```
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EnvVar {
/// The name of the environment variable.
pub name: String,
@@ -52,27 +52,36 @@ impl From<(&str, &str)> for EnvVar {
}
/// A node configuration, with fine-grained configuration options.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct NodeConfig {
name: String,
- image: Option,
- command: Option,
+ pub(crate) image: Option,
+ pub(crate) command: Option,
+ #[serde(default)]
args: Vec,
- is_validator: bool,
- is_invulnerable: bool,
- is_bootnode: bool,
+ #[serde(alias = "validator")]
+ pub(crate) is_validator: bool,
+ #[serde(alias = "invulnerable")]
+ pub(crate) is_invulnerable: bool,
+ #[serde(alias = "bootnode")]
+ pub(crate) is_bootnode: bool,
+ #[serde(alias = "balance")]
+ #[serde(default)]
initial_balance: U128,
+ #[serde(default)]
env: Vec,
+ #[serde(default)]
bootnodes_addresses: Vec,
- resources: Option,
+ pub(crate) resources: Option,
ws_port: Option,
rpc_port: Option,
prometheus_port: Option,
p2p_port: Option,
p2p_cert_hash: Option,
- db_snapshot: Option,
+ pub(crate) db_snapshot: Option,
+ #[serde(default)]
// used to skip serialization of fields with defaults to avoid duplication
- chain_context: ChainDefaultContext,
+ pub(crate) chain_context: ChainDefaultContext,
}
impl Serialize for NodeConfig {
@@ -162,6 +171,11 @@ impl NodeConfig {
self.args.iter().collect()
}
+ /// Arguments to use for node.
+ pub(crate) fn set_args(&mut self, args: Vec) {
+ self.args = args;
+ }
+
/// Whether the node is a validator.
pub fn is_validator(&self) -> bool {
self.is_validator
@@ -234,7 +248,6 @@ states! {
}
/// A node configuration builder, used to build a [`NodeConfig`] declaratively with fields validation.
-#[derive(Debug)]
pub struct NodeConfigBuilder {
config: NodeConfig,
validation_context: Rc>,
diff --git a/crates/configuration/src/shared/resources.rs b/crates/configuration/src/shared/resources.rs
index 46f3a4aab..8537b188d 100644
--- a/crates/configuration/src/shared/resources.rs
+++ b/crates/configuration/src/shared/resources.rs
@@ -2,12 +2,17 @@ use std::error::Error;
use lazy_static::lazy_static;
use regex::Regex;
-use serde::{ser::SerializeStruct, Serialize};
+use serde::{
+ de::{self},
+ ser::SerializeStruct,
+ Deserialize, Serialize,
+};
use super::{
errors::{ConversionError, FieldError},
helpers::merge_errors,
};
+use crate::shared::constants::{SHOULD_COMPILE, THIS_IS_A_BUG};
/// A resource quantity used to define limits (k8s/podman only).
/// It can be constructed from a `&str` or u64, if it fails, it returns a [`ConversionError`].
@@ -28,7 +33,7 @@ use super::{
/// assert_eq!(quantity3.as_str(), "1Gi");
/// assert_eq!(quantity4.as_str(), "10000");
/// ```
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ResourceQuantity(String);
impl ResourceQuantity {
@@ -43,7 +48,7 @@ impl TryFrom<&str> for ResourceQuantity {
fn try_from(value: &str) -> Result {
lazy_static! {
static ref RE: Regex = Regex::new(r"^\d+(.\d+)?(m|K|M|G|T|P|E|Ki|Mi|Gi|Ti|Pi|Ei)?$")
- .expect("should compile with success. this is a bug, please report it: https://github.com/paritytech/zombienet-sdk/issues");
+ .expect(&format!("{}, {}", SHOULD_COMPILE, THIS_IS_A_BUG));
}
if !RE.is_match(value) {
@@ -72,6 +77,12 @@ pub struct Resources {
limit_cpu: Option,
}
+#[derive(Serialize, Deserialize)]
+struct ResourcesField {
+ memory: Option,
+ cpu: Option,
+}
+
impl Serialize for Resources {
fn serialize(&self, serializer: S) -> Result
where
@@ -79,12 +90,6 @@ impl Serialize for Resources {
{
let mut state = serializer.serialize_struct("Resources", 2)?;
- #[derive(Serialize)]
- struct ResourcesField {
- memory: Option,
- cpu: Option,
- }
-
if self.request_memory.is_some() || self.request_memory.is_some() {
state.serialize_field(
"requests",
@@ -113,6 +118,52 @@ impl Serialize for Resources {
}
}
+struct ResourcesVisitor;
+
+impl<'de> de::Visitor<'de> for ResourcesVisitor {
+ type Value = Resources;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("a resources object")
+ }
+
+ fn visit_map(self, mut map: A) -> Result
+ where
+ A: de::MapAccess<'de>,
+ {
+ let mut resources: Resources = Resources::default();
+
+ while let Some((key, value)) = map.next_entry::()? {
+ match key.as_str() {
+ "requests" => {
+ resources.request_memory = value.memory;
+ resources.request_cpu = value.cpu;
+ },
+ "limits" => {
+ resources.limit_memory = value.memory;
+ resources.limit_cpu = value.cpu;
+ },
+ _ => {
+ return Err(de::Error::unknown_field(
+ &key,
+ &["requests", "limits", "cpu", "memory"],
+ ))
+ },
+ }
+ }
+ Ok(resources)
+ }
+}
+
+impl<'de> Deserialize<'de> for Resources {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: serde::Deserializer<'de>,
+ {
+ deserializer.deserialize_any(ResourcesVisitor)
+ }
+}
+
impl Resources {
/// Memory limit applied to requests.
pub fn request_memory(&self) -> Option<&ResourceQuantity> {
@@ -249,6 +300,7 @@ impl ResourcesBuilder {
#[allow(non_snake_case)]
mod tests {
use super::*;
+ use crate::NetworkConfig;
macro_rules! impl_resources_quantity_unit_test {
($val:literal) => {{
@@ -350,6 +402,18 @@ mod tests {
assert_eq!(resources.limit_memory().unwrap().as_str(), "2G");
}
+ #[test]
+ fn resources_config_toml_import_should_succeeds_and_returns_a_resources_config() {
+ let load_from_toml =
+ NetworkConfig::load_from_toml("./testing/snapshots/0001-big-network.toml").unwrap();
+
+ let resources = load_from_toml.relaychain().default_resources().unwrap();
+ assert_eq!(resources.request_memory().unwrap().as_str(), "500M");
+ assert_eq!(resources.request_cpu().unwrap().as_str(), "100000");
+ assert_eq!(resources.limit_cpu().unwrap().as_str(), "10Gi");
+ assert_eq!(resources.limit_memory().unwrap().as_str(), "4000M");
+ }
+
#[test]
fn resources_config_builder_should_fails_and_returns_an_error_if_couldnt_parse_request_memory()
{
diff --git a/crates/configuration/src/shared/types.rs b/crates/configuration/src/shared/types.rs
index 347870415..fc2baa245 100644
--- a/crates/configuration/src/shared/types.rs
+++ b/crates/configuration/src/shared/types.rs
@@ -1,11 +1,17 @@
-use std::{fmt::Display, path::PathBuf, str::FromStr};
+use std::{
+ error::Error,
+ fmt::{self, Display},
+ path::PathBuf,
+ str::FromStr,
+};
use lazy_static::lazy_static;
use regex::Regex;
-use serde::Serialize;
+use serde::{de, Deserialize, Deserializer, Serialize};
use url::Url;
use super::{errors::ConversionError, resources::Resources};
+use crate::shared::constants::{INFAILABLE, PREFIX_CANT_BE_NONE, SHOULD_COMPILE, THIS_IS_A_BUG};
/// An alias for a duration in seconds.
pub type Duration = u32;
@@ -18,7 +24,7 @@ pub type ParaId = u32;
/// Custom type wrapping u128 to add custom Serialization/Deserialization logic because it's not supported
/// issue tracking the problem:
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Default, Debug, Clone, PartialEq)]
pub struct U128(pub(crate) u128);
impl From for U128 {
@@ -27,6 +33,14 @@ impl From for U128 {
}
}
+impl TryFrom<&str> for U128 {
+ type Error = Box;
+
+ fn try_from(value: &str) -> Result {
+ Ok(Self(value.to_string().parse::()?))
+ }
+}
+
impl Serialize for U128 {
fn serialize(&self, serializer: S) -> Result
where
@@ -38,6 +52,32 @@ impl Serialize for U128 {
}
}
+struct U128Visitor;
+
+impl<'de> de::Visitor<'de> for U128Visitor {
+ type Value = U128;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("an integer between 0 and 2^128 − 1.")
+ }
+
+ fn visit_str(self, v: &str) -> Result
+ where
+ E: de::Error,
+ {
+ v.try_into().map_err(de::Error::custom)
+ }
+}
+
+impl<'de> Deserialize<'de> for U128 {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_str(U128Visitor)
+ }
+}
+
/// A chain name.
/// It can be constructed for an `&str`, if it fails, it will returns a [`ConversionError`].
///
@@ -53,7 +93,7 @@ impl Serialize for U128 {
/// assert_eq!(kusama.as_str(), "kusama");
/// assert_eq!(myparachain.as_str(), "myparachain");
/// ```
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Chain(String);
impl TryFrom<&str> for Chain {
@@ -95,7 +135,7 @@ impl Chain {
/// assert_eq!(image3.as_str(), "myrepo.com/name:version");
/// assert_eq!(image4.as_str(), "10.15.43.155/name:version");
/// ```
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Image(String);
impl TryFrom<&str> for Image {
@@ -110,7 +150,7 @@ impl TryFrom<&str> for Image {
static ref RE: Regex = Regex::new(&format!(
"^({IP_PART}|{HOSTNAME_PART}/)?{TAG_NAME_PART}(:{TAG_VERSION_PART})?$",
))
- .expect("should compile with success. this is a bug, please report it: https://github.com/paritytech/zombienet-sdk/issues");
+ .expect(&format!("{}, {}", SHOULD_COMPILE, THIS_IS_A_BUG));
};
if !RE.is_match(value) {
@@ -143,7 +183,7 @@ impl Image {
/// assert_eq!(command1.as_str(), "mycommand");
/// assert_eq!(command2.as_str(), "myothercommand");
/// ```
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Command(String);
impl TryFrom<&str> for Command {
@@ -208,7 +248,7 @@ impl From<&str> for AssetLocation {
}
Self::FilePath(
- PathBuf::from_str(value).expect("infaillible. this is a bug, please report it"),
+ PathBuf::from_str(value).expect(&format!("{}, {}", INFAILABLE, THIS_IS_A_BUG)),
)
}
}
@@ -231,6 +271,32 @@ impl Serialize for AssetLocation {
}
}
+struct AssetLocationVisitor;
+
+impl<'de> de::Visitor<'de> for AssetLocationVisitor {
+ type Value = AssetLocation;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("a string")
+ }
+
+ fn visit_str(self, v: &str) -> Result
+ where
+ E: de::Error,
+ {
+ Ok(AssetLocation::from(v))
+ }
+}
+
+impl<'de> Deserialize<'de> for AssetLocation {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_any(AssetLocationVisitor)
+ }
+}
+
/// A CLI argument passed to an executed command, can be an option with an assigned value or a simple flag to enable/disable a feature.
/// A flag arg can be constructed from a `&str` and a option arg can be constructed from a `(&str, &str)`.
///
@@ -276,6 +342,54 @@ impl Serialize for Arg {
}
}
+struct ArgVisitor;
+
+impl<'de> de::Visitor<'de> for ArgVisitor {
+ type Value = Arg;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("a string")
+ }
+
+ fn visit_str(self, v: &str) -> Result
+ where
+ E: de::Error,
+ {
+ let re = Regex::new("^(?(?-{1,2})(?[a-zA-Z]+(-[a-zA-Z]+)*))((?=| )(?.+))?$").unwrap();
+ let captures = re.captures(v);
+
+ if let Some(captures) = captures {
+ if let Some(value) = captures.name("value") {
+ return Ok(Arg::Option(
+ captures
+ .name("name_prefix")
+ .expect(&format!("{} {}", PREFIX_CANT_BE_NONE, THIS_IS_A_BUG))
+ .as_str()
+ .to_string(),
+ value.as_str().to_string(),
+ ));
+ }
+
+ if let Some(name_prefix) = captures.name("name_prefix") {
+ return Ok(Arg::Flag(name_prefix.as_str().to_string()));
+ }
+ }
+
+ Err(de::Error::custom(
+ "the provided argument is invalid and doesn't match Arg::Option or Arg::Flag",
+ ))
+ }
+}
+
+impl<'de> Deserialize<'de> for Arg {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_any(ArgVisitor)
+ }
+}
+
#[derive(Debug, Default, Clone)]
pub struct ValidationContext {
pub used_ports: Vec,
@@ -283,12 +397,13 @@ pub struct ValidationContext {
pub used_parachain_ids: Vec,
}
-#[derive(Default, Debug, Clone, PartialEq)]
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
pub struct ChainDefaultContext {
pub(crate) default_command: Option,
pub(crate) default_image: Option,
pub(crate) default_resources: Option,
pub(crate) default_db_snapshot: Option,
+ #[serde(default)]
pub(crate) default_args: Vec,
}
diff --git a/crates/configuration/src/utils.rs b/crates/configuration/src/utils.rs
index 5c5a184d3..ced19103e 100644
--- a/crates/configuration/src/utils.rs
+++ b/crates/configuration/src/utils.rs
@@ -1,3 +1,7 @@
-pub fn is_true(value: &bool) -> bool {
+pub(crate) fn is_true(value: &bool) -> bool {
*value
}
+
+pub(crate) fn default_as_true() -> bool {
+ true
+}
diff --git a/crates/configuration/testing/snapshots/0003-small-network_w_parachain.toml b/crates/configuration/testing/snapshots/0003-small-network_w_parachain.toml
new file mode 100644
index 000000000..b998a14c0
--- /dev/null
+++ b/crates/configuration/testing/snapshots/0003-small-network_w_parachain.toml
@@ -0,0 +1,40 @@
+[settings]
+timeout = 1000
+node_spawn_timeout = 300
+
+[relaychain]
+chain = "rococo-local"
+default_command = "polkadot"
+default_image = "docker.io/parity/polkadot:latest"
+default_args = ["-lparachain=debug"]
+
+[[relaychain.nodes]]
+name = "alice"
+validator = true
+invulnerable = true
+bootnode = false
+balance = 2000000000000
+
+[[relaychain.nodes]]
+name = "bob"
+args = ["--database=paritydb-experimental"]
+validator = true
+invulnerable = false
+bootnode = true
+balance = 2000000000000
+
+[[parachains]]
+id = 1000
+chain = "myparachain"
+onboard_as_parachain = false
+balance = 2000000000000
+default_db_snapshot = "https://storage.com/path/to/db_snapshot.tgz"
+chain_spec_path = "/path/to/my/chain/spec.json"
+cumulus_based = true
+
+[[parachains.collators]]
+name = "john"
+validator = true
+invulnerable = true
+bootnode = true
+balance = 5000000000
diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml
index bbc268302..7001258ed 100644
--- a/crates/provider/Cargo.toml
+++ b/crates/provider/Cargo.toml
@@ -8,11 +8,23 @@ edition = "2021"
[dependencies]
support = { path = "../support" }
configuration = { path = "../configuration" }
-async-trait = {workspace = true }
-futures = {workspace = true }
+async-trait = { workspace = true }
+futures = { workspace = true }
#napi = { version="2.12.7", features=["async"]}
#napi-derive = "2.12.5"
serde = { workspace = true, features = ["derive"] }
-serde_json = {workspace = true}
-tokio = { workspace = true, features = ["process", "macros", "fs", "time", "rt"] }
-thiserror = {workspace = true}
+serde_json = { workspace = true }
+tokio = { workspace = true, features = [
+ "process",
+ "macros",
+ "fs",
+ "time",
+ "rt",
+] }
+thiserror = { workspace = true }
+anyhow = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
+nix = { workspace = true, features = ["signal"] }
+
+[dev-dependencies]
+procfs = { workspace = true }
diff --git a/crates/provider/src/errors.rs b/crates/provider/src/errors.rs
deleted file mode 100644
index 9bd1e34f0..000000000
--- a/crates/provider/src/errors.rs
+++ /dev/null
@@ -1,45 +0,0 @@
-//! Zombienet Provider error definitions.
-
-macro_rules! from_error {
- ($type:ty, $target:ident, $targetvar:expr) => {
- impl From<$type> for $target {
- fn from(s: $type) -> Self {
- $targetvar(s.into())
- }
- }
- };
-}
-
-#[derive(Debug, thiserror::Error)]
-#[allow(missing_docs)]
-pub enum ProviderError {
- #[error("Invalid network configuration field {0}")]
- InvalidConfig(String),
- #[error("Can recover node: {0} info, field: {1}")]
- MissingNodeInfo(String, String),
- #[error("Duplicated node name: {0}")]
- DuplicatedNodeName(String),
- #[error("Error running cmd: {0}")]
- RunCommandError(String),
- #[error("Error spawning node: {0}")]
- ErrorSpawningNode(String),
- #[error("Node die/stale, logs: {0}")]
- NodeNotReady(String),
- // FSErrors are implemented in the associated type
- #[error(transparent)]
- FSError(Box),
- // From serde errors
- #[error("Serialization error")]
- SerializationError(serde_json::Error),
- #[error("IO error: {0}")]
- IOError(std::io::Error),
- #[error("Invalid script_path: {0}")]
- InvalidScriptPath(String),
-}
-
-from_error!(
- serde_json::Error,
- ProviderError,
- ProviderError::SerializationError
-);
-from_error!(std::io::Error, ProviderError, ProviderError::IOError);
diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs
index 636ee240f..b6f3e2098 100644
--- a/crates/provider/src/lib.rs
+++ b/crates/provider/src/lib.rs
@@ -1,94 +1,132 @@
-mod errors;
-mod native;
-mod shared;
+pub mod native;
+pub mod shared;
-use std::{net::IpAddr, path::PathBuf};
+use std::{
+ collections::HashMap, net::IpAddr, path::PathBuf, process::ExitStatus, sync::Arc,
+ time::Duration,
+};
use async_trait::async_trait;
-use errors::ProviderError;
-use shared::types::{FileMap, NativeRunCommandOptions, PodDef, Port, RunCommandResponse};
+use shared::types::{
+ GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
+ RunScriptOptions, SpawnNodeOptions,
+};
+use support::fs::FileSystemError;
+
+use crate::shared::types::Port;
+
+#[derive(Debug, thiserror::Error)]
+#[allow(missing_docs)]
+pub enum ProviderError {
+ #[error("Failed to spawn node '{0}': {1}")]
+ NodeSpawningFailed(String, anyhow::Error),
+
+ #[error("Error running command '{0}': {1}")]
+ RunCommandError(String, anyhow::Error),
+
+ #[error("Duplicated node name: {0}")]
+ DuplicatedNodeName(String),
+
+ #[error(transparent)]
+ FileSystemError(#[from] FileSystemError),
+
+ #[error("Invalid script path for {0}")]
+ InvalidScriptPath(anyhow::Error),
+
+ #[error("Script with path {0} not found")]
+ ScriptNotFound(PathBuf),
+
+ #[error("File generation failed: {0}")]
+ FileGenerationFailed(anyhow::Error),
+
+ #[error("Failed to retrieve process ID for node '{0}'")]
+ ProcessIdRetrievalFailed(String),
+
+ #[error("Failed to pause node '{0}'")]
+ PauseNodeFailed(String),
+
+ #[error("Failed to resume node '{0}'")]
+ ResumeNodeFaied(String),
+
+ #[error("Failed to kill node '{0}'")]
+ KillNodeFailed(String),
+}
#[async_trait]
pub trait Provider {
- async fn create_namespace(&mut self) -> Result<(), ProviderError>;
- async fn get_node_ip(&self) -> Result;
- async fn get_port_mapping(
- &mut self,
- port: Port,
- pod_name: String,
- ) -> Result;
- async fn get_node_info(&mut self, pod_name: String) -> Result<(IpAddr, Port), ProviderError>;
+ fn capabilities(&self) -> &ProviderCapabilities;
+
+ async fn namespaces(&self) -> HashMap;
+
+ async fn create_namespace(&self) -> Result;
+}
+
+pub type DynProvider = Arc;
+
+#[async_trait]
+pub trait ProviderNamespace {
+ fn id(&self) -> &str;
+
+ fn base_dir(&self) -> &PathBuf;
+
+ async fn nodes(&self) -> HashMap;
+
+ async fn spawn_node(&self, options: SpawnNodeOptions) -> Result;
+
+ async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError>;
+
+ async fn destroy(&self) -> Result<(), ProviderError>;
+
+ async fn static_setup(&self) -> Result<(), ProviderError>;
+}
+
+pub type DynNamespace = Arc;
+
+type ExecutionResult = Result;
+
+#[async_trait]
+pub trait ProviderNode {
+ fn name(&self) -> &str;
+
+ fn base_dir(&self) -> &PathBuf;
+
+ fn config_dir(&self) -> &PathBuf;
+
+ fn data_dir(&self) -> &PathBuf;
+
+ fn scripts_dir(&self) -> &PathBuf;
+
+ fn log_path(&self) -> &PathBuf;
+
+ async fn endpoint(&self) -> Result<(IpAddr, Port), ProviderError>;
+
+ async fn mapped_port(&self, port: Port) -> Result;
+
+ async fn logs(&self) -> Result;
+
+ async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError>;
+
async fn run_command(
&self,
- args: Vec,
- opts: NativeRunCommandOptions,
- ) -> Result;
- async fn run_script(
- &mut self,
- identifier: String,
- script_path: String,
- args: Vec,
- ) -> Result;
- async fn spawn_from_def(
- &mut self,
- pod_def: PodDef,
- files_to_copy: Vec,
- keystore: String,
- chain_spec_id: String,
- db_snapshot: String,
- ) -> Result<(), ProviderError>;
- async fn copy_file_from_pod(
- &mut self,
- pod_file_path: PathBuf,
- local_file_path: PathBuf,
- ) -> Result<(), ProviderError>;
- async fn create_resource(
- &mut self,
- resource_def: PodDef,
- scoped: bool,
- wait_ready: bool,
- ) -> Result<(), ProviderError>;
- async fn wait_node_ready(&mut self, node_name: String) -> Result<(), ProviderError>;
- async fn get_node_logs(&mut self, node_name: String) -> Result;
- async fn dump_logs(&mut self, path: String, pod_name: String) -> Result<(), ProviderError>;
- fn get_pause_args(&mut self, name: String) -> Vec;
- fn get_resume_args(&mut self, name: String) -> Vec;
- async fn restart_node(&mut self, name: String, timeout: u64) -> Result;
- async fn get_help_info(&mut self) -> Result<(), ProviderError>;
- async fn destroy_namespace(&mut self) -> Result<(), ProviderError>;
- async fn get_logs_command(&mut self, name: String) -> Result;
- async fn put_local_magic_file(
+ options: RunCommandOptions,
+ ) -> Result;
+
+ async fn run_script(&self, options: RunScriptOptions)
+ -> Result;
+
+ async fn copy_file_from_node(
&self,
- _name: String,
- _container: Option,
- ) -> Result<(), ProviderError> {
- Ok(())
- }
- fn is_pod_monitor_available() -> Result {
- Ok(false)
- }
- async fn spawn_introspector() -> Result<(), ProviderError> {
- Ok(())
- }
-
- async fn static_setup() -> Result<(), ProviderError> {
- Ok(())
- }
- async fn create_static_resource() -> Result<(), ProviderError> {
- Ok(())
- }
- async fn create_pod_monitor() -> Result<(), ProviderError> {
- Ok(())
- }
- async fn setup_cleaner() -> Result<(), ProviderError> {
- Ok(())
- }
-
- #[allow(clippy::diverging_sub_expression)]
- async fn upsert_cron_job() -> Result<(), ProviderError> {
- unimplemented!();
- }
+ remote_src: PathBuf,
+ local_dest: PathBuf,
+ ) -> Result<(), ProviderError>;
+
+ async fn pause(&self) -> Result<(), ProviderError>;
+
+ async fn resume(&self) -> Result<(), ProviderError>;
+
+ async fn restart(&self, after: Option) -> Result<(), ProviderError>;
+
+ async fn destroy(&self) -> Result<(), ProviderError>;
}
-// re-exports
-pub use native::NativeProvider;
+pub type DynNode = Arc;
diff --git a/crates/provider/src/native.rs b/crates/provider/src/native.rs
index 817a455f9..b73c4605b 100644
--- a/crates/provider/src/native.rs
+++ b/crates/provider/src/native.rs
@@ -1,833 +1,1526 @@
use std::{
self,
- collections::{
- hash_map::Entry::{Occupied, Vacant},
- HashMap,
- },
+ collections::HashMap,
fmt::Debug,
+ io::Error,
net::IpAddr,
- path::{Path, PathBuf},
+ path::PathBuf,
+ process::Stdio,
+ sync::{Arc, Weak},
};
+use anyhow::anyhow;
use async_trait::async_trait;
-use serde::Serialize;
-use support::{fs::FileSystem, net::download_file};
+use configuration::types::Port;
+use futures::{future::try_join_all, try_join};
+use nix::{
+ sys::signal::{kill, Signal},
+ unistd::Pid,
+};
+use support::fs::FileSystem;
use tokio::{
- process::Command,
+ io::{AsyncRead, AsyncReadExt, BufReader},
+ process::{Child, Command},
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
+ task::JoinHandle,
time::{sleep, Duration},
};
+use uuid::Uuid;
-use super::Provider;
use crate::{
- errors::ProviderError,
- shared::{
- constants::{DEFAULT_DATA_DIR, DEFAULT_REMOTE_DIR, LOCALHOST, P2P_PORT},
- types::{
- FileMap, NativeRunCommandOptions, PodDef, Port, Process, RunCommandResponse, ZombieRole,
- },
- },
+ shared::constants::{NODE_CONFIG_DIR, NODE_DATA_DIR, NODE_SCRIPTS_DIR},
+ DynNamespace, DynNode, ExecutionResult, GenerateFileCommand, GenerateFilesOptions, Provider,
+ ProviderCapabilities, ProviderError, ProviderNamespace, ProviderNode, RunCommandOptions,
+ RunScriptOptions, SpawnNodeOptions,
};
-#[derive(Debug, Serialize, Clone, PartialEq)]
-pub struct NativeProvider {
- // Namespace of the client (isolation directory)
- namespace: String,
- // Path where configuration relies, all the `files` are accessed relative to this.
- config_path: String,
- // Variable that shows if debug is activated
- is_debug: bool,
- // The timeout for start the node
- timeout: u32,
- // Command to use, e.g "bash"
- command: String,
- // Temporary directory, root directory for the network
- tmp_dir: String,
- local_magic_file_path: String,
- remote_dir: String,
- data_dir: String,
- process_map: HashMap,
- filesystem: T,
+
+#[derive(Debug, Clone)]
+pub struct NativeProvider {
+ capabilities: ProviderCapabilities,
+ tmp_dir: PathBuf,
+ filesystem: FS,
+ inner: Arc>>,
}
-impl NativeProvider {
- /// Zombienet `native` provider allows to run the nodes as a local process in the local environment
- /// params:
- /// namespace: Namespace of the clien
- /// config_path: Path where configuration relies
- /// tmp_dir: Temporary directory where files will be placed
- /// filesystem: Filesystem to use (std::fs::FileSystem, mock etc.)
- pub fn new(
- namespace: impl Into,
- config_path: impl Into,
- tmp_dir: impl Into,
- filesystem: T,
- ) -> Self {
- let tmp_dir: String = tmp_dir.into();
- let process_map: HashMap = HashMap::new();
-
- Self {
- namespace: namespace.into(),
- config_path: config_path.into(),
- is_debug: true,
- timeout: 60, // seconds
- local_magic_file_path: format!("{}/finished.txt", &tmp_dir),
- remote_dir: format!("{}{}", &tmp_dir, DEFAULT_REMOTE_DIR),
- data_dir: format!("{}{}", &tmp_dir, DEFAULT_DATA_DIR),
- command: "bash".into(),
- tmp_dir,
- process_map,
+#[derive(Debug)]
+struct NativeProviderInner {
+ namespaces: HashMap>,
+}
+
+#[derive(Debug, Clone)]
+struct WeakNativeProvider {
+ inner: Weak>>,
+}
+
+impl NativeProvider {
+ pub fn new(filesystem: FS) -> Self {
+ NativeProvider {
+ capabilities: ProviderCapabilities::new(),
+ tmp_dir: std::env::temp_dir(),
filesystem,
+ inner: Arc::new(RwLock::new(NativeProviderInner {
+ namespaces: Default::default(),
+ })),
}
}
+
+ pub fn tmp_dir(mut self, tmp_dir: impl Into) -> Self {
+ self.tmp_dir = tmp_dir.into();
+ self
+ }
}
#[async_trait]
-impl Provider for NativeProvider {
- async fn create_namespace(&mut self) -> Result<(), ProviderError> {
- // Native provider don't have the `namespace` isolation.
- // but we create the `remoteDir` to place files
- self.filesystem
- .create_dir(&self.remote_dir)
+impl Provider for NativeProvider {
+ fn capabilities(&self) -> &ProviderCapabilities {
+ &self.capabilities
+ }
+
+ async fn namespaces(&self) -> HashMap {
+ self.inner
+ .read()
.await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
- Ok(())
+ .namespaces
+ .clone()
+ .into_iter()
+ .map(|(id, namespace)| (id, Arc::new(namespace) as DynNamespace))
+ .collect()
}
- async fn get_port_mapping(
- &mut self,
- port: Port,
- pod_name: String,
- ) -> Result {
- let r = match self.process_map.get(&pod_name) {
- Some(process) => match process.port_mapping.get(&port) {
- Some(port) => Ok(*port),
- None => Err(ProviderError::MissingNodeInfo(pod_name, "port".into())),
+ async fn create_namespace(&self) -> Result {
+ let id = format!("zombie_{}", Uuid::new_v4());
+ let mut inner = self.inner.write().await;
+
+ let base_dir = PathBuf::from(format!("{}/{}", self.tmp_dir.to_string_lossy(), &id));
+ self.filesystem.create_dir(&base_dir).await?;
+
+ let namespace = NativeNamespace {
+ id: id.clone(),
+ base_dir,
+ filesystem: self.filesystem.clone(),
+ provider: WeakNativeProvider {
+ inner: Arc::downgrade(&self.inner),
},
- None => Err(ProviderError::MissingNodeInfo(pod_name, "process".into())),
+ inner: Arc::new(RwLock::new(NativeNamespaceInner {
+ nodes: Default::default(),
+ })),
};
- return r;
+ inner.namespaces.insert(id, namespace.clone());
+
+ Ok(Arc::new(namespace))
}
+}
+
+#[derive(Debug, Clone)]
+pub struct NativeNamespace {
+ id: String,
+ base_dir: PathBuf,
+ inner: Arc>>,
+ filesystem: FS,
+ provider: WeakNativeProvider,
+}
+
+#[derive(Debug)]
+struct NativeNamespaceInner {
+ nodes: HashMap>,
+}
- async fn get_node_info(&mut self, pod_name: String) -> Result<(IpAddr, Port), ProviderError> {
- let host_port = self.get_port_mapping(P2P_PORT, pod_name).await?;
- Ok((LOCALHOST, host_port))
+#[derive(Debug, Clone)]
+struct WeakNativeNamespace {
+ inner: Weak>>,
+}
+
+#[async_trait]
+impl ProviderNamespace for NativeNamespace {
+ fn id(&self) -> &str {
+ &self.id
}
- async fn get_node_ip(&self) -> Result {
- Ok(LOCALHOST)
+ fn base_dir(&self) -> &PathBuf {
+ &self.base_dir
}
- async fn run_command(
- &self,
- mut args: Vec,
- opts: NativeRunCommandOptions,
- ) -> Result {
- if let Some(arg) = args.get(0) {
- if arg == "bash" {
- args.remove(0);
- }
+ async fn nodes(&self) -> HashMap {
+ self.inner
+ .read()
+ .await
+ .nodes
+ .clone()
+ .into_iter()
+ .map(|(id, node)| (id, Arc::new(node) as DynNode))
+ .collect()
+ }
+
+ async fn spawn_node(&self, options: SpawnNodeOptions) -> Result {
+ let mut inner = self.inner.write().await;
+
+ if inner.nodes.contains_key(&options.name) {
+ return Err(ProviderError::DuplicatedNodeName(options.name));
}
- // -c is already used in the process::Command to execute the command thus
- // needs to be removed in case provided
- if let Some(arg) = args.get(0) {
- if arg == "-c" {
- args.remove(0);
- }
+ // create node directories and filepaths
+ let base_dir_raw = format!("{}/{}", &self.base_dir.to_string_lossy(), &options.name);
+ let base_dir = PathBuf::from(&base_dir_raw);
+ let log_path = PathBuf::from(format!("{}/{}.log", base_dir_raw, &options.name));
+ let config_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_CONFIG_DIR));
+ let data_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_DATA_DIR));
+ let scripts_dir = PathBuf::from(format!("{}{}", base_dir_raw, NODE_SCRIPTS_DIR));
+ self.filesystem.create_dir(&base_dir).await?;
+ try_join!(
+ self.filesystem.create_dir(&config_dir),
+ self.filesystem.create_dir(&data_dir),
+ self.filesystem.create_dir(&scripts_dir),
+ )?;
+
+ // copy injected files
+ let mut futures = vec![];
+ for file in options.injected_files {
+ futures.push(self.filesystem.copy(
+ file.local_path,
+ format!("{}{}", base_dir_raw, file.remote_path.to_string_lossy()),
+ ));
}
+ try_join_all(futures).await?;
+
+ let (process, stdout_reading_handle, stderr_reading_handle, log_writing_handle) =
+ create_process_with_log_tasks(
+ &options.name,
+ &options.command,
+ &options.args,
+ &options.env,
+ &log_path,
+ self.filesystem.clone(),
+ )?;
+
+ // create node structure holding state
+ let node = NativeNode {
+ name: options.name.clone(),
+ command: options.command,
+ args: options.args,
+ env: options.env,
+ base_dir,
+ config_dir,
+ data_dir,
+ scripts_dir,
+ log_path,
+ filesystem: self.filesystem.clone(),
+ namespace: WeakNativeNamespace {
+ inner: Arc::downgrade(&self.inner),
+ },
+ inner: Arc::new(RwLock::new(NativeNodeInner {
+ process,
+ stdout_reading_handle,
+ stderr_reading_handle,
+ log_writing_handle,
+ })),
+ };
- let result = Command::new(&self.command)
- .arg("-c")
- .arg(args.join(" "))
- .output()
- .await?;
+ // store node inside namespace
+ inner.nodes.insert(options.name, node.clone());
- if !result.status.success() && !opts.is_failure_allowed {
- return Err(ProviderError::RunCommandError(args.join(" ")));
- } else {
- // cmd success or we allow to fail
- // in either case we return Ok
- Ok(RunCommandResponse {
- exit_code: result.status,
- std_out: String::from_utf8_lossy(&result.stdout).into(),
- std_err: if result.stderr.is_empty() {
- None
- } else {
- Some(String::from_utf8_lossy(&result.stderr).into())
- },
+ Ok(Arc::new(node))
+ }
+
+ async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
+ // we spawn a node doing nothing but looping so we can execute our commands
+ let temp_node = self
+ .spawn_node(SpawnNodeOptions {
+ name: format!("temp_{}", Uuid::new_v4()),
+ command: "bash".to_string(),
+ args: vec!["-c".to_string(), "while :; do sleep 1; done".to_string()],
+ env: vec![],
+ injected_files: options.injected_files,
})
+ .await?;
+
+ for GenerateFileCommand {
+ command,
+ args,
+ env,
+ local_output_path,
+ } in options.commands
+ {
+ match temp_node
+ .run_command(RunCommandOptions { command, args, env })
+ .await
+ .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?
+ {
+ Ok(contents) => self
+ .filesystem
+ .write(
+ format!(
+ "{}{}",
+ self.base_dir.to_string_lossy(),
+ local_output_path.to_string_lossy()
+ ),
+ contents,
+ )
+ .await
+ .map_err(|err| ProviderError::FileGenerationFailed(err.into()))?,
+ Err((_, msg)) => Err(ProviderError::FileGenerationFailed(anyhow!("{msg}")))?,
+ };
}
+
+ temp_node.destroy().await
}
- // TODO: Add test
- async fn run_script(
- &mut self,
- identifier: String,
- script_path: String,
- args: Vec,
- ) -> Result {
- let script_filename = Path::new(&script_path)
- .file_name()
- .ok_or(ProviderError::InvalidScriptPath(script_path.clone()))?
- .to_str()
- .ok_or(ProviderError::InvalidScriptPath(script_path.clone()))?;
- let script_path_in_pod = format!("{}/{}/{}", self.tmp_dir, identifier, script_filename);
+ async fn static_setup(&self) -> Result<(), ProviderError> {
+ // no static setup exists for native provider
+ todo!()
+ }
- // upload the script
- self.filesystem
- .copy(&script_path, &script_path_in_pod)
- .await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
-
- // set as executable
- self.run_command(
- vec![
- "chmod".to_owned(),
- "+x".to_owned(),
- script_path_in_pod.clone(),
- ],
- NativeRunCommandOptions::default(),
- )
- .await?;
-
- let command = format!(
- "cd {}/{} && {} {}",
- self.tmp_dir,
- identifier,
- script_path_in_pod,
- args.join(" ")
- );
- let result = self
- .run_command(vec![command], NativeRunCommandOptions::default())
- .await?;
+ async fn destroy(&self) -> Result<(), ProviderError> {
+ // we need to clone nodes (behind an Arc, so cheaply) to avoid deadlock between the inner.write lock and the node.destroy
+ // method acquiring a lock the namespace to remove the node from the nodes hashmap.
+ let nodes: Vec> = self.inner.write().await.nodes.values().cloned().collect();
+ for node in nodes.iter() {
+ node.destroy().await?;
+ }
- Ok(RunCommandResponse {
- exit_code: result.exit_code,
- std_out: result.std_out,
- std_err: result.std_err,
- })
+ // remove namespace from provider
+ if let Some(provider) = self.provider.inner.upgrade() {
+ provider.write().await.namespaces.remove(&self.id);
+ }
+
+ Ok(())
}
+}
- // TODO: Add test
- async fn spawn_from_def(
- &mut self,
- pod_def: PodDef,
- files_to_copy: Vec,
- keystore: String,
- chain_spec_id: String,
- // TODO: add logic to download the snapshot
- db_snapshot: String,
- ) -> Result<(), ProviderError> {
- let name = pod_def.metadata.name.clone();
- // TODO: log::debug!(format!("{}", serde_json::to_string(&pod_def)));
-
- // keep this in the client.
- self.process_map.entry(name.clone()).and_modify(|p| {
- p.logs = format!("{}/{}.log", self.tmp_dir, name);
- p.port_mapping = pod_def
- .spec
- .ports
- .iter()
- .map(|item| (item.container_port, item.host_port))
- .collect();
- });
-
- // TODO: check how we will log with tables
- // let logTable = new CreateLogTable({
- // colWidths: [25, 100],
- // });
-
- // const logs = [
- // [decorators.cyan("Pod"), decorators.green(name)],
- // [decorators.cyan("Status"), decorators.green("Launching")],
- // [
- // decorators.cyan("Command"),
- // decorators.white(podDef.spec.command.join(" ")),
- // ],
- // ];
- // if (dbSnapshot) {
- // logs.push([decorators.cyan("DB Snapshot"), decorators.green(dbSnapshot)]);
- // }
- // logTable.pushToPrint(logs);
-
- // we need to get the snapshot from a public access
- // and extract to /data
- let _ = self
- .filesystem
- .create_dir(pod_def.spec.data_path.clone())
- .await;
+#[derive(Debug, Clone)]
+struct NativeNode {
+ name: String,
+ command: String,
+ args: Vec,
+ env: Vec<(String, String)>,
+ base_dir: PathBuf,
+ config_dir: PathBuf,
+ data_dir: PathBuf,
+ scripts_dir: PathBuf,
+ log_path: PathBuf,
+ inner: Arc>,
+ filesystem: FS,
+ namespace: WeakNativeNamespace,
+}
- let _ = download_file(db_snapshot, format!("{}/db.tgz", pod_def.spec.data_path)).await;
- let command = format!("cd {}/.. && tar -xzvf data/db.tgz", pod_def.spec.data_path);
+#[derive(Debug)]
+struct NativeNodeInner {
+ process: Child,
+ stdout_reading_handle: JoinHandle<()>,
+ stderr_reading_handle: JoinHandle<()>,
+ log_writing_handle: JoinHandle<()>,
+}
- self.run_command(vec![command], NativeRunCommandOptions::default())
- .await?;
+#[async_trait]
+impl ProviderNode for NativeNode {
+ fn name(&self) -> &str {
+ &self.name
+ }
- if !keystore.is_empty() {
- // initialize keystore
- let keystore_remote_dir = format!(
- "{}/chains/{}/keystore",
- pod_def.spec.data_path, chain_spec_id
- );
+ fn base_dir(&self) -> &PathBuf {
+ &self.base_dir
+ }
- let _ = self
- .filesystem
- .create_dir(keystore_remote_dir.clone())
- .await;
+ fn config_dir(&self) -> &PathBuf {
+ &self.config_dir
+ }
- let _ = self.filesystem.copy(&keystore, &keystore_remote_dir).await;
- }
+ fn data_dir(&self) -> &PathBuf {
+ &self.data_dir
+ }
- let files_to_copy_iter = files_to_copy.iter();
+ fn scripts_dir(&self) -> &PathBuf {
+ &self.scripts_dir
+ }
- for file in files_to_copy_iter {
- // log::debug!(format!("file.local_file_path: {}", file.local_file_path));
- // log::debug!(format!("file.remote_file_path: {}", file.remote_file_path));
+ fn log_path(&self) -> &PathBuf {
+ &self.log_path
+ }
- // log::debug!(format!("self.remote_dir: {}", self.remote_dir);
- // log::debug!(format!("self.data_dir: {}", self.data_dir);
+ async fn endpoint(&self) -> Result<(IpAddr, Port), ProviderError> {
+ todo!();
+ }
- let remote_file_path_str: String = file
- .clone()
- .remote_file_path
- .into_os_string()
- .into_string()
- .unwrap();
+ async fn mapped_port(&self, _port: Port) -> Result {
+ todo!()
+ }
- let resolved_remote_file_path = if remote_file_path_str.contains(&self.remote_dir) {
- format!(
- "{}/{}",
- &pod_def.spec.cfg_path,
- remote_file_path_str.replace(&self.remote_dir, "")
- )
- } else {
- format!(
- "{}/{}",
- &pod_def.spec.data_path,
- remote_file_path_str.replace(&self.data_dir, "")
- )
- };
+ async fn logs(&self) -> Result {
+ Ok(self.filesystem.read_to_string(&self.log_path).await?)
+ }
- let _ = self
- .filesystem
- .copy(
- file.clone()
- .local_file_path
- .into_os_string()
- .into_string()
- .unwrap(),
- resolved_remote_file_path,
- )
- .await;
+ async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError> {
+ Ok(self.filesystem.copy(&self.log_path, local_dest).await?)
+ }
+
+ async fn run_command(
+ &self,
+ options: RunCommandOptions,
+ ) -> Result {
+ let result = Command::new(options.command.clone())
+ .args(options.args)
+ .envs(options.env)
+ .output()
+ .await
+ .map_err(|err| ProviderError::RunCommandError(options.command, err.into()))?;
+
+ if result.status.success() {
+ Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
+ } else {
+ Ok(Err((
+ result.status,
+ String::from_utf8_lossy(&result.stderr).to_string(),
+ )))
+ }
+ }
+
+ async fn run_script(
+ &self,
+ options: RunScriptOptions,
+ ) -> Result {
+ let local_script_path = PathBuf::from(&options.local_script_path);
+
+ if !local_script_path
+ .try_exists()
+ .map_err(|err| ProviderError::InvalidScriptPath(err.into()))?
+ {
+ return Err(ProviderError::ScriptNotFound(local_script_path));
}
- self.create_resource(pod_def, false, true).await?;
+ // extract file name and build remote file path
+ let script_file_name = local_script_path
+ .file_name()
+ .map(|file_name| file_name.to_string_lossy().to_string())
+ .ok_or(ProviderError::InvalidScriptPath(anyhow!(
+ "Can't retrieve filename from script with path: {:?}",
+ options.local_script_path
+ )))?;
+ let remote_script_path = format!(
+ "{}/{}",
+ self.scripts_dir.to_string_lossy(),
+ script_file_name
+ );
- // TODO: check how we will log with tables
- // logTable = new CreateLogTable({
- // colWidths: [40, 80],
- // });
- // logTable.pushToPrint([
- // [decorators.cyan("Pod"), decorators.green(name)],
- // [decorators.cyan("Status"), decorators.green("Ready")],
- // ]);
- Ok(())
+ // copy and set script's execute permission
+ self.filesystem
+ .copy(local_script_path, &remote_script_path)
+ .await?;
+ self.filesystem.set_mode(&remote_script_path, 0o744).await?;
+
+ // execute script
+ self.run_command(RunCommandOptions {
+ command: remote_script_path,
+ args: options.args,
+ env: options.env,
+ })
+ .await
}
- async fn copy_file_from_pod(
- &mut self,
- pod_file_path: PathBuf,
- local_file_path: PathBuf,
+ async fn copy_file_from_node(
+ &self,
+ remote_src: PathBuf,
+ local_dest: PathBuf,
) -> Result<(), ProviderError> {
- // TODO: log::debug!(format!("cp {} {}", pod_file_path, local_file_path));
+ let remote_file_path = format!(
+ "{}{}",
+ self.base_dir.to_string_lossy(),
+ remote_src.to_string_lossy()
+ );
+ self.filesystem.copy(remote_file_path, local_dest).await?;
- self.filesystem
- .copy(&pod_file_path, &local_file_path)
- .await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
Ok(())
}
- async fn create_resource(
- &mut self,
- mut resource_def: PodDef,
- _scoped: bool,
- wait_ready: bool,
- ) -> Result<(), ProviderError> {
- let name: String = resource_def.metadata.name.clone();
- let local_file_path: String = format!("{}/{}.yaml", &self.tmp_dir, name);
- let content: String = serde_json::to_string(&resource_def)?;
+ async fn pause(&self) -> Result<(), ProviderError> {
+ let inner = self.inner.write().await;
+ let pid = retrieve_pid_from_process(&inner.process, &self.name)?;
- self.filesystem
- .write(&local_file_path, content)
- .await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
+ kill(pid, Signal::SIGSTOP)
+ .map_err(|_| ProviderError::PauseNodeFailed(self.name.clone()))?;
- if resource_def.spec.command.get(0) == Some(&"bash".into()) {
- resource_def.spec.command.remove(0);
- }
+ Ok(())
+ }
- if resource_def.metadata.labels.zombie_role == ZombieRole::Temp {
- // for temp we run some short living cmds
- self.run_command(
- resource_def.spec.command,
- NativeRunCommandOptions {
- is_failure_allowed: Some(true).is_some(),
- },
- )
- .await?;
- } else {
- // Allow others are spawned.
- let logs = format!("{}/{}.log", self.tmp_dir, name);
- let file_handler = self
- .filesystem
- .create(logs.clone())
- .await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
-
- let final_command = resource_def.spec.command.join(" ");
- let child_process = std::process::Command::new(&self.command)
- .arg("-c")
- .arg(final_command.clone())
- .stdout(file_handler)
- // TODO: redirect stderr to the same stdout
- //.stderr()
- .spawn()?;
-
- // TODO: log::debug!(node_process.id());
- // nodeProcess.stdout.pipe(log);
- // nodeProcess.stderr.pipe(log);
-
- match self.process_map.entry(name.clone()) {
- Occupied(_) => return Err(ProviderError::DuplicatedNodeName(name)),
- Vacant(slot) => {
- slot.insert(Process {
- pid: child_process.id(),
- logs,
- port_mapping: resource_def.spec.ports.iter().fold(
- HashMap::new(),
- |mut memo: HashMap, item| {
- memo.insert(item.container_port, item.host_port);
- memo
- },
- ),
- command: final_command,
- });
- },
- }
+ async fn resume(&self) -> Result<(), ProviderError> {
+ let inner = self.inner.write().await;
+ let pid = retrieve_pid_from_process(&inner.process, &self.name)?;
+
+ kill(pid, Signal::SIGCONT)
+ .map_err(|_| ProviderError::ResumeNodeFaied(self.name.clone()))?;
- if wait_ready {
- self.wait_node_ready(name).await?;
- }
- }
Ok(())
}
- // TODO: Add test
- async fn destroy_namespace(&mut self) -> Result<(), ProviderError> {
- // get pids to kill all related process
- let pids: Vec = self
- .process_map
- .iter()
- .filter(|(_, process)| process.pid != 0)
- .map(|(_, process)| process.pid.to_string())
- .collect();
+ async fn restart(&self, after: Option) -> Result<(), ProviderError> {
+ if let Some(duration) = after {
+ sleep(duration).await;
+ }
- // TODO: use a crate (or even std) to get this info instead of relying on bash
- let result = self
- .run_command(
- [format!(
- "ps ax| awk '{{print $1}}'| grep -E '{}'",
- pids.join("|")
- )]
- .to_vec(),
- NativeRunCommandOptions {
- is_failure_allowed: true,
- },
- )
+ let mut inner = self.inner.write().await;
+
+ // abort all task handlers and kill process
+ inner.log_writing_handle.abort();
+ inner.stdout_reading_handle.abort();
+ inner.stderr_reading_handle.abort();
+ inner
+ .process
+ .kill()
.await
- .unwrap();
+ .map_err(|_| ProviderError::KillNodeFailed(self.name.clone()))?;
+
+ // re-spawn process with tasks for logs
+ let (process, stdout_reading_handle, stderr_reading_handle, log_writing_handle) =
+ create_process_with_log_tasks(
+ &self.name,
+ &self.command,
+ &self.args,
+ &self.env,
+ &self.log_path,
+ self.filesystem.clone(),
+ )?;
+
+ // update node process and handlers
+ inner.process = process;
+ inner.stderr_reading_handle = stdout_reading_handle;
+ inner.stderr_reading_handle = stderr_reading_handle;
+ inner.log_writing_handle = log_writing_handle;
- if result.exit_code.code().unwrap() == 0 {
- let pids_to_kill: Vec = result
- .std_out
- .split(|c| c == '\n')
- .map(|s| s.into())
- .collect();
-
- let _ = self
- .run_command(
- [format!("kill -9 {}", pids_to_kill.join(" "))].to_vec(),
- NativeRunCommandOptions {
- is_failure_allowed: true,
- },
- )
- .await?;
- }
Ok(())
}
- // TODO: Add test
- async fn get_node_logs(&mut self, name: String) -> Result {
- // For now in native let's just return all the logs
- let result = self
- .filesystem
- .read_file(&format!("{}/{}.log", self.tmp_dir, name))
+ async fn destroy(&self) -> Result<(), ProviderError> {
+ let mut inner = self.inner.write().await;
+
+ inner.log_writing_handle.abort();
+ inner.stdout_reading_handle.abort();
+ inner.stderr_reading_handle.abort();
+ inner
+ .process
+ .kill()
.await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
- return Ok(result);
- }
+ .map_err(|_| ProviderError::KillNodeFailed(self.name.clone()))?;
+
+ if let Some(namespace) = self.namespace.inner.upgrade() {
+ namespace.write().await.nodes.remove(&self.name);
+ }
- async fn dump_logs(&mut self, path: String, pod_name: String) -> Result<(), ProviderError> {
- let dst_file_name: String = format!("{}/logs/{}.log", path, pod_name);
- let _ = self
- .filesystem
- .copy(
- &format!("{}/{}.log", self.tmp_dir, pod_name),
- &dst_file_name,
- )
- .await;
Ok(())
}
+}
- async fn wait_node_ready(&mut self, node_name: String) -> Result<(), ProviderError> {
- // check if the process is alive after 1 seconds
- sleep(Duration::from_millis(1000)).await;
-
- let Some(process_node) = self.process_map.get(&node_name) else {
- return Err(ProviderError::MissingNodeInfo(node_name, "process".into()));
- };
+fn retrieve_pid_from_process(process: &Child, node_name: &str) -> Result {
+ Ok(Pid::from_raw(
+ process
+ .id()
+ .ok_or(ProviderError::ProcessIdRetrievalFailed(
+ node_name.to_string(),
+ ))?
+ .try_into()
+ .map_err(|_| ProviderError::ProcessIdRetrievalFailed(node_name.to_string()))?,
+ ))
+}
- let result = self
- .run_command(
- vec![format!("ps {}", process_node.pid)],
- NativeRunCommandOptions {
- is_failure_allowed: true,
+fn create_stream_polling_task(
+ stream: impl AsyncRead + Unpin + Send + 'static,
+ tx: Sender, Error>>,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ let mut reader = BufReader::new(stream);
+ let mut buffer = vec![0u8; 1024];
+
+ loop {
+ match reader.read(&mut buffer).await {
+ Ok(0) => {
+ let _ = tx.send(Ok(Vec::new())).await;
+ break;
},
- )
- .await?;
+ Ok(n) => {
+ let _ = tx.send(Ok(buffer[..n].to_vec())).await;
+ },
+ Err(e) => {
+ let _ = tx.send(Err(e)).await;
+ break;
+ },
+ }
+ }
+ })
+}
- if result.exit_code.code().unwrap() > 0 {
- let lines: String = self.get_node_logs(node_name).await?;
- // TODO: check how we will log with tables
- // TODO: Log with a log table
- // const logTable = new CreateLogTable({
- // colWidths: [20, 100],
- // });
- // logTable.pushToPrint([
- // [decorators.cyan("Pod"), decorators.green(nodeName)],
- // [
- // decorators.cyan("Status"),
- // decorators.reverse(decorators.red("Error")),
- // ],
- // [
- // decorators.cyan("Message"),
- // decorators.white(`Process: ${pid}, for node: ${nodeName} dies.`),
- // ],
- // [decorators.cyan("Output"), decorators.white(lines)],
- // ]);
-
- return Err(ProviderError::NodeNotReady(lines));
+fn create_log_writing_task(
+ mut rx: Receiver, Error>>,
+ filesystem: impl FileSystem + Send + Sync + 'static,
+ log_path: PathBuf,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ loop {
+ sleep(Duration::from_millis(250)).await;
+ while let Some(Ok(data)) = rx.recv().await {
+ // TODO: find a better way instead of ignoring error ?
+ let _ = filesystem.append(&log_path, data).await;
+ }
}
+ })
+}
- // Process pid is
- // check log lines grow between 2/6/12 secs
- let lines_intial: RunCommandResponse = self
- .run_command(
- vec![format!("wc -l {}", process_node.logs)],
- NativeRunCommandOptions::default(),
- )
- .await?;
+type CreateProcessOutput = (Child, JoinHandle<()>, JoinHandle<()>, JoinHandle<()>);
+
+fn create_process_with_log_tasks(
+ name: &str,
+ command: &str,
+ args: &Vec,
+ env: &Vec<(String, String)>,
+ log_path: &PathBuf,
+ filesystem: impl FileSystem + Send + Sync + 'static,
+) -> Result {
+ // create process
+ let mut process = Command::new(command)
+ .args(args)
+ .envs(env.to_owned())
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .map_err(|err| ProviderError::NodeSpawningFailed(name.to_string(), err.into()))?;
+ let stdout = process.stdout.take().expect("infaillible, stdout is piped");
+ let stderr = process.stderr.take().expect("Infaillible, stderr is piped");
+
+ // create additionnal long-running tasks for logs
+ let (stdout_tx, rx) = mpsc::channel(10);
+ let stderr_tx = stdout_tx.clone();
+ let stdout_reading_handle = create_stream_polling_task(stdout, stdout_tx);
+ let stderr_reading_handle = create_stream_polling_task(stderr, stderr_tx);
+ let log_writing_handle = create_log_writing_task(rx, filesystem, log_path.to_owned());
+
+ Ok((
+ process,
+ stdout_reading_handle,
+ stderr_reading_handle,
+ log_writing_handle,
+ ))
+}
- for i in [2000, 6000, 12000] {
- sleep(Duration::from_millis(i)).await;
- let lines_now = self
- .run_command(
- vec![format!("wc -l {}", process_node.logs)],
- NativeRunCommandOptions::default(),
- )
- .await?;
- if lines_now.std_out > lines_intial.std_out {
- return Ok(());
- };
- }
+#[cfg(test)]
+mod tests {
+ use std::{ffi::OsString, fs, str::FromStr};
+
+ use procfs::process::Process;
+ use support::fs::in_memory::{InMemoryFile, InMemoryFileSystem};
+ use tokio::time::timeout;
+
+ use super::*;
+ use crate::shared::types::TransferedFile;
+
+ #[test]
+ fn provider_capabilities_method_should_return_provider_capabilities() {
+ let fs = InMemoryFileSystem::default();
+ let provider = NativeProvider::new(fs);
+
+ let capabilities = provider.capabilities();
- let error_string = format!(
- "Log lines of process: {} ( node: {} ) doesn't grow, please check logs at {}",
- process_node.pid, node_name, process_node.logs
+ assert_eq!(
+ capabilities,
+ &ProviderCapabilities {
+ requires_image: false
+ }
);
+ }
- Err(ProviderError::NodeNotReady(error_string))
+ #[tokio::test]
+ async fn provider_tmp_dir_method_should_set_the_temporary_for_provider() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (
+ OsString::from_str("/someotherdir").unwrap(),
+ InMemoryFile::dir(),
+ ),
+ ]));
+ let provider = NativeProvider::new(fs.clone()).tmp_dir("/someotherdir");
+
+ // we create a namespace to ensure tmp dir will be used to store namespace
+ let namespace = provider.create_namespace().await.unwrap();
+
+ assert!(namespace.base_dir().starts_with("/someotherdir"))
}
- // TODO: Add test
- fn get_pause_args(&mut self, name: String) -> Vec {
- let command = format!("kill -STOP {}", self.process_map[&name].pid);
- vec![command]
+ #[tokio::test]
+ async fn provider_create_namespace_method_should_create_a_new_namespace_and_returns_it() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // ensure namespace directory is created
+ assert!(fs
+ .files
+ .read()
+ .await
+ .contains_key(namespace.base_dir().as_os_str()));
+
+ // ensure namespace is added to provider namespaces
+ assert_eq!(provider.namespaces().await.len(), 1);
+
+ // ensure the only provider namespace is the same one as the one we just created
+ assert!(provider.namespaces().await.get(namespace.id()).is_some());
}
- // TODO: Add test
- fn get_resume_args(&mut self, name: String) -> Vec {
- let command = format!("kill -CONT {}", self.process_map[&name].pid);
- vec![command]
+ #[tokio::test]
+ async fn provider_namespaces_method_should_return_empty_namespaces_map_if_the_provider_has_no_namespaces(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+
+ assert_eq!(provider.namespaces().await.len(), 0);
}
- async fn restart_node(&mut self, name: String, timeout: u64) -> Result {
- let command = format!("kill -9 {}", self.process_map[&name].pid);
- let result = self
- .run_command(
- vec![command],
- NativeRunCommandOptions {
- is_failure_allowed: true,
- },
+ #[tokio::test]
+ async fn provider_namespaces_method_should_return_filled_namespaces_map_if_the_provider_has_one_namespace(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+
+ let namespace = provider.create_namespace().await.unwrap();
+
+ assert_eq!(provider.namespaces().await.len(), 1);
+ assert!(provider.namespaces().await.get(namespace.id()).is_some());
+ }
+
+ #[tokio::test]
+ async fn provider_namespaces_method_should_return_filled_namespaces_map_if_the_provider_has_two_namespaces(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+
+ let namespace1 = provider.create_namespace().await.unwrap();
+ let namespace2 = provider.create_namespace().await.unwrap();
+
+ assert_eq!(provider.namespaces().await.len(), 2);
+ assert!(provider.namespaces().await.get(namespace1.id()).is_some());
+ assert!(provider.namespaces().await.get(namespace2.id()).is_some());
+ }
+
+ #[tokio::test]
+ async fn namespace_spawn_node_method_should_creates_a_new_node_correctly() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ (
+ OsString::from_str("/file1").unwrap(),
+ InMemoryFile::file("My file 1"),
+ ),
+ (
+ OsString::from_str("/file2").unwrap(),
+ InMemoryFile::file("My file 2"),
+ ),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ let node = namespace
+ .spawn_node(
+ SpawnNodeOptions::new("mynode", "./testing/dummy_node")
+ .args(vec![
+ "-flag1",
+ "--flag2",
+ "--option1=value1",
+ "-option2=value2",
+ "--option3 value3",
+ "-option4 value4",
+ ])
+ .env(vec![
+ ("MY_VAR_1", "MY_VALUE_1"),
+ ("MY_VAR_2", "MY_VALUE_2"),
+ ("MY_VAR_3", "MY_VALUE_3"),
+ ])
+ .injected_files(vec![
+ TransferedFile::new("/file1", "/cfg/file1"),
+ TransferedFile::new("/file2", "/data/file2"),
+ ]),
)
- .await?;
+ .await
+ .unwrap();
- if result.exit_code.code().unwrap() > 0 {
- return Ok(false);
- }
+ // ensure node directories are created
+ assert!(fs
+ .files
+ .read()
+ .await
+ .contains_key(node.base_dir().as_os_str()));
+ assert!(fs
+ .files
+ .read()
+ .await
+ .contains_key(node.config_dir().as_os_str()));
+ assert!(fs
+ .files
+ .read()
+ .await
+ .contains_key(node.data_dir().as_os_str()));
+ assert!(fs
+ .files
+ .read()
+ .await
+ .contains_key(node.scripts_dir().as_os_str()));
- sleep(Duration::from_millis(timeout * 1000)).await;
+ // ensure injected files are presents
+ assert_eq!(
+ fs.files
+ .read()
+ .await
+ .get(
+ &OsString::from_str(&format!("{}/file1", node.config_dir().to_string_lossy()))
+ .unwrap()
+ )
+ .unwrap()
+ .contents()
+ .unwrap(),
+ "My file 1"
+ );
+ assert_eq!(
+ fs.files
+ .read()
+ .await
+ .get(
+ &OsString::from_str(&format!("{}/file2", node.data_dir().to_string_lossy()))
+ .unwrap()
+ )
+ .unwrap()
+ .contents()
+ .unwrap(),
+ "My file 2"
+ );
- let logs = self.process_map[&name].logs.clone();
+ // retrieve running process
+ let processes = get_processes_by_name("dummy_node").await;
+
+ // ensure only one dummy process exists
+ assert_eq!(processes.len(), 1);
+ let node_process = processes.first().unwrap();
+
+ // ensure process has correct state
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ // process can be running or sleeping because we sleep between echo calls
+ procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping
+ ));
+
+ // ensure process is passed correct args
+ let node_args = node_process.cmdline().unwrap();
+ assert!(node_args.contains(&"-flag1".to_string()));
+ assert!(node_args.contains(&"--flag2".to_string()));
+ assert!(node_args.contains(&"--option1=value1".to_string()));
+ assert!(node_args.contains(&"-option2=value2".to_string()));
+ assert!(node_args.contains(&"--option3 value3".to_string()));
+ assert!(node_args.contains(&"-option4 value4".to_string()));
+
+ // ensure process has correct environment
+ let node_env = node_process.environ().unwrap();
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_1").unwrap())
+ .unwrap(),
+ "MY_VALUE_1"
+ );
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_2").unwrap())
+ .unwrap(),
+ "MY_VALUE_2"
+ );
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_3").unwrap())
+ .unwrap(),
+ "MY_VALUE_3"
+ );
+
+ // ensure log file is created and logs are written and keep being written for some time
+ timeout(Duration::from_secs(30), async {
+ let mut expected_logs_line_count = 2;
+
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) {
+ if let Some(contents) = file.contents() {
+ if contents.lines().count() >= expected_logs_line_count {
+ if expected_logs_line_count >= 6 {
+ return;
+ } else {
+ expected_logs_line_count += 2;
+ }
+ }
+ }
+ }
+ }
+ })
+ .await
+ .unwrap();
- // log::debug!("Command: {}", self.process_map[&name].cmd.join(" "));
+ // ensure node is present in namespace
+ assert_eq!(namespace.nodes().await.len(), 1);
+ assert!(namespace.nodes().await.get(node.name()).is_some());
+ }
- let file_handler = self
- .filesystem
- .create(logs.clone())
+ #[tokio::test]
+ async fn namespace_spawn_node_method_should_returns_an_error_if_a_node_already_exists_with_this_name(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
.await
- .map_err(|e| ProviderError::FSError(Box::new(e)))?;
- let final_command = self.process_map[&name].command.clone();
-
- let child_process = std::process::Command::new(&self.command)
- .arg("-c")
- .arg(final_command.clone())
- // TODO: set env
- .stdout(file_handler)
- // TODO: redirect stderr to the same stdout
- //.stderr()
- .spawn()?;
-
- match self.process_map.entry(name.clone()) {
- Occupied(_) => return Err(ProviderError::DuplicatedNodeName(name)),
- Vacant(slot) => {
- slot.insert(Process {
- pid: child_process.id(),
- // TODO: complete this field
- logs,
- // TODO: complete this field
- port_mapping: HashMap::default(),
- command: final_command,
- });
- },
- }
- self.wait_node_ready(name).await?;
+ .unwrap();
- Ok(true)
- }
+ let result = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await;
- async fn get_logs_command(&mut self, name: String) -> Result {
- Ok(format!("tail -f {}/{}.log", self.tmp_dir, name))
+ // we must match here because Arc doesn't implements Debug, so unwrap_err is not an option
+ match result {
+ Ok(_) => panic!("expected result to be an error"),
+ Err(err) => assert_eq!(err.to_string(), "Duplicated node name: mynode"),
+ };
}
- // TODO: Add test
- async fn get_help_info(&mut self) -> Result<(), ProviderError> {
- let _ = self
- .run_command(
- vec!["--help".to_owned()],
- NativeRunCommandOptions::default(),
- )
- .await?;
+ #[tokio::test]
+ async fn namespace_generate_files_method_should_create_files_at_the_correct_locations_using_given_commands(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ namespace
+ .generate_files(GenerateFilesOptions::new(vec![
+ GenerateFileCommand::new("echo", "/myfile1").args(vec!["My file 1"]),
+ GenerateFileCommand::new("sh", "/myfile2")
+ .args(vec!["-c", "echo -n $MY_CONTENT"])
+ .env(vec![("MY_CONTENT", "My file 2")]),
+ ]))
+ .await
+ .unwrap();
- Ok(())
+ // ensure files have been generated correctly to right location
+ assert_eq!(
+ fs.files
+ .read()
+ .await
+ .get(
+ &OsString::from_str(&format!(
+ "{}/myfile1",
+ namespace.base_dir().to_string_lossy()
+ ))
+ .unwrap()
+ )
+ .unwrap()
+ .contents()
+ .unwrap(),
+ "My file 1\n"
+ );
+ assert_eq!(
+ fs.files
+ .read()
+ .await
+ .get(
+ &OsString::from_str(&format!(
+ "{}/myfile2",
+ namespace.base_dir().to_string_lossy()
+ ))
+ .unwrap()
+ )
+ .unwrap()
+ .contents()
+ .unwrap(),
+ "My file 2"
+ );
+
+ // ensure temporary node has been destroyed
+ assert_eq!(namespace.nodes().await.len(), 0);
}
-}
-#[cfg(test)]
-mod tests {
- use std::{os::unix::process::ExitStatusExt, process::ExitStatus};
+ #[tokio::test]
+ async fn namespace_destroy_should_destroy_all_namespace_nodes_and_namespace_itself() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn 2 dummy nodes to populate namespace
+ namespace
+ .spawn_node(SpawnNodeOptions::new("mynode1", "./testing/dummy_node"))
+ .await
+ .unwrap();
+ namespace
+ .spawn_node(SpawnNodeOptions::new("mynode2", "./testing/dummy_node"))
+ .await
+ .unwrap();
- use support::fs::mock::{MockError, MockFilesystem, Operation};
+ // ensure nodes are presents
+ assert_eq!(namespace.nodes().await.len(), 2);
- use super::*;
- use crate::shared::types::{PodLabels, PodMetadata, PodSpec};
+ namespace.destroy().await.unwrap();
- #[test]
- fn new_native_provider() {
- let native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
+ // ensure nodes are destroyed
+ assert_eq!(namespace.nodes().await.len(), 0);
- assert_eq!(native_provider.namespace, "something");
- assert_eq!(native_provider.config_path, "./");
- assert!(native_provider.is_debug);
- assert_eq!(native_provider.timeout, 60);
- assert_eq!(native_provider.tmp_dir, "/tmp");
- assert_eq!(native_provider.command, "bash");
- assert_eq!(native_provider.local_magic_file_path, "/tmp/finished.txt");
- assert_eq!(native_provider.remote_dir, "/tmp/cfg");
- assert_eq!(native_provider.data_dir, "/tmp/data");
+ // retrieve running process
+ let processes = get_processes_by_name("dummy_node").await;
+
+ // ensure no running process exists
+ assert_eq!(processes.len(), 0);
+
+ // ensure namespace is destroyed
+ assert_eq!(provider.namespaces().await.len(), 0);
}
#[tokio::test]
- async fn test_fielsystem_usage() {
- let mut native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
-
- native_provider.create_namespace().await.unwrap();
+ async fn node_logs_method_should_return_its_logs_as_a_string() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
- assert!(native_provider.filesystem.operations.len() == 1);
+ // wait some time for node to write logs
+ sleep(Duration::from_secs(5)).await;
assert_eq!(
- native_provider.filesystem.operations[0],
- Operation::CreateDir {
- path: "/tmp/cfg".into(),
- }
+ fs.files
+ .read()
+ .await
+ .get(node.log_path().as_os_str())
+ .unwrap()
+ .contents()
+ .unwrap(),
+ node.logs().await.unwrap()
);
}
#[tokio::test]
- #[should_panic(expected = "FSError(OpError(\"create\"))")]
- async fn test_fielsystem_usage_fails() {
- let mut native_provider: NativeProvider = NativeProvider::new(
- "something",
- "./",
- "/tmp",
- MockFilesystem::with_create_dir_error(MockError::OpError("create".into())),
- );
+ async fn node_dump_logs_method_should_writes_its_logs_to_a_given_destination() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
- native_provider.create_namespace().await.unwrap();
- }
+ // wait some time for node to write logs
+ sleep(Duration::from_secs(5)).await;
- #[tokio::test]
- async fn test_get_node_ip() {
- let native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
+ node.dump_logs(PathBuf::from("/tmp/my_log_file"))
+ .await
+ .unwrap();
- assert_eq!(native_provider.get_node_ip().await.unwrap(), LOCALHOST);
+ let files = fs.files.read().await;
+
+ assert_eq!(
+ files
+ .get(node.log_path().as_os_str())
+ .unwrap()
+ .contents()
+ .unwrap(),
+ files
+ .get(&OsString::from_str("/tmp/my_log_file").unwrap())
+ .unwrap()
+ .contents()
+ .unwrap(),
+ );
}
#[tokio::test]
- async fn test_run_command_when_bash_is_removed() {
- let native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
+ async fn node_run_command_method_should_execute_the_command_successfully_and_returns_stdout() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
- let result: RunCommandResponse = native_provider
+ let result = node
.run_command(
- vec!["bash".into(), "ls".into()],
- NativeRunCommandOptions::default(),
+ RunCommandOptions::new("sh")
+ .args(vec!["-c", "echo $MY_ENV_VAR"])
+ .env(vec![("MY_ENV_VAR", "Here is my content")]),
)
+ .await;
+
+ assert!(matches!(result, Ok(Ok(stdout)) if stdout == "Here is my content\n"));
+ }
+
+ #[tokio::test]
+ async fn node_run_command_method_should_execute_the_command_successfully_and_returns_error_code_and_stderr_if_an_error_happened(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
.await
.unwrap();
- assert_eq!(
- result,
- RunCommandResponse {
- exit_code: ExitStatus::from_raw(0),
- std_out: "Cargo.toml\nsrc\n".into(),
- std_err: None,
- }
+ let result = node
+ .run_command(RunCommandOptions::new("sh").args(vec!["-fakeargs"]))
+ .await;
+
+ assert!(
+ matches!(result, Ok(Err((exit_code, stderr))) if !exit_code.success() && !stderr.is_empty())
);
}
#[tokio::test]
- async fn test_run_command_when_dash_c_is_provided() {
- let native_provider = NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
+ async fn node_run_command_method_should_fail_to_execute_the_command_if_command_doesnt_exists() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
- let result = native_provider.run_command(
- vec!["-c".into(), "ls".into()],
- NativeRunCommandOptions::default(),
+ let err = node
+ .run_command(RunCommandOptions::new("myrandomprogram"))
+ .await
+ .unwrap_err();
+
+ assert_eq!(
+ err.to_string(),
+ "Error running command 'myrandomprogram': No such file or directory (os error 2)"
);
+ }
+
+ #[tokio::test]
+ async fn node_run_script_method_should_execute_the_script_successfully_and_returns_stdout() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ (
+ OsString::from_str("/tmp/dummy_script").unwrap(),
+ InMemoryFile::mirror(
+ "/tmp/dummy_script",
+ fs::read_to_string("./testing/dummy_script").unwrap(),
+ ),
+ ),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
+
+ let result = node
+ .run_script(
+ RunScriptOptions::new("/tmp/dummy_script")
+ .args(vec!["-c"])
+ .env(vec![("MY_ENV_VAR", "With env")]),
+ )
+ .await;
- let a = result.await;
- assert!(a.is_ok());
+ assert!(matches!(result, Ok(Ok(stdout)) if stdout == "My script\nWith env\nWith args\n"));
}
#[tokio::test]
- async fn test_run_command_when_error_return_error() {
- let native_provider = NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
+ async fn node_copy_file_from_node_method_should_copy_node_remote_file_to_local_path() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
- let mut some = native_provider.run_command(
- vec!["ls".into(), "ls".into()],
- NativeRunCommandOptions::default(),
- );
+ // wait 3s for node to start writing logs
+ sleep(Duration::from_secs(3)).await;
- assert!(some.await.is_err());
+ node.copy_file_from_node(
+ PathBuf::from("/mynode.log"),
+ PathBuf::from("/nodelog.backup"),
+ )
+ .await
+ .unwrap();
- some = native_provider.run_command(
- vec!["ls".into(), "ls".into()],
- NativeRunCommandOptions {
- is_failure_allowed: true,
- },
+ assert_eq!(
+ fs.files.read().await.get(node.log_path().as_os_str()),
+ fs.files
+ .read()
+ .await
+ .get(&OsString::from_str("/nodelog.backup").unwrap())
);
-
- assert!(some.await.is_ok());
}
#[tokio::test]
- async fn test_create_resource() {
- let mut native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
-
- let resource_def: PodDef = PodDef {
- metadata: PodMetadata {
- name: "string".to_owned(),
- namespace: "string".to_owned(),
- labels: PodLabels {
- app: "String".to_owned(),
- zombie_ns: "String".to_owned(),
- name: "String".to_owned(),
- instance: "String".to_owned(),
- zombie_role: ZombieRole::Node,
- },
- },
- spec: PodSpec {
- cfg_path: "string".to_owned(),
- data_path: "string".to_owned(),
- ports: vec![],
- command: vec!["ls".to_owned()],
- env: vec![],
- },
- };
+ async fn node_pause_method_should_pause_the_node_process() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
+ .await
+ .unwrap();
+
+ // wait 2s for node to spawn
+ sleep(Duration::from_secs(2)).await;
+
+ // retrieve running process
+ let processes = get_processes_by_name("dummy_node").await;
+ let node_process = processes.first().unwrap();
+
+ // ensure process has correct state pre-pause
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ // process can be running or sleeping because we sleep between echo calls
+ procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping
+ ));
+
+ node.pause().await.unwrap();
+
+ // wait node 1s to stop writing logs
+ sleep(Duration::from_secs(1)).await;
+ let logs = node.logs().await.unwrap();
+
+ // ensure process has been paused for 10sec and logs stopped writing
+ let _ = timeout(Duration::from_secs(10), async {
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ procfs::process::ProcState::Stopped
+ ));
+ assert_eq!(logs, node.logs().await.unwrap());
+ }
+ })
+ .await;
+ }
- native_provider
- .create_resource(resource_def, false, false)
+ #[tokio::test]
+ async fn node_resume_method_should_resume_the_paused_node_process() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
.await
.unwrap();
- assert_eq!(native_provider.process_map.len(), 1);
+ // wait 2s for node to spawn
+ sleep(Duration::from_secs(2)).await;
+
+ // retrieve running process
+ let processes = get_processes_by_name("dummy_node").await;
+ assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results
+ let node_process = processes.first().unwrap();
+
+ node.pause().await.unwrap();
+
+ // ensure process has been paused for 5sec
+ let _ = timeout(Duration::from_secs(5), async {
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ procfs::process::ProcState::Stopped
+ ));
+ }
+ })
+ .await;
+
+ node.resume().await.unwrap();
+
+ // ensure process has been resumed for 10sec
+ let _ = timeout(Duration::from_secs(10), async {
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ // process can be running or sleeping because we sleep between echo calls
+ procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping
+ ));
+ }
+ })
+ .await;
+
+ // ensure logs continue being written for some time
+ timeout(Duration::from_secs(30), async {
+ let mut expected_logs_line_count = 2;
+
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) {
+ if let Some(contents) = file.contents() {
+ if contents.lines().count() >= expected_logs_line_count {
+ if expected_logs_line_count >= 6 {
+ return;
+ } else {
+ expected_logs_line_count += 2;
+ }
+ }
+ }
+ }
+ }
+ })
+ .await
+ .unwrap();
}
+
#[tokio::test]
- async fn test_create_resource_wait_ready() {
- let mut native_provider: NativeProvider =
- NativeProvider::new("something", "./", "/tmp", MockFilesystem::new());
-
- let resource_def: PodDef = PodDef {
- metadata: PodMetadata {
- name: "string".to_owned(),
- namespace: "string".to_owned(),
- labels: PodLabels {
- app: "String".to_owned(),
- zombie_ns: "String".to_owned(),
- name: "String".to_owned(),
- instance: "String".to_owned(),
- zombie_role: ZombieRole::Node,
- },
- },
- spec: PodSpec {
- cfg_path: "string".to_owned(),
- data_path: "string".to_owned(),
- ports: vec![],
- command: vec!["for i in $(seq 1 10); do echo $i;sleep 1;done".into()],
- env: vec![],
- },
- };
+ async fn node_restart_should_kill_the_node_and_respawn_it_successfully() {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ (
+ OsString::from_str("/file1").unwrap(),
+ InMemoryFile::file("My file 1"),
+ ),
+ (
+ OsString::from_str("/file2").unwrap(),
+ InMemoryFile::file("My file 2"),
+ ),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ let node = namespace
+ .spawn_node(
+ SpawnNodeOptions::new("mynode", "./testing/dummy_node")
+ .args(vec![
+ "-flag1",
+ "--flag2",
+ "--option1=value1",
+ "-option2=value2",
+ "--option3 value3",
+ "-option4 value4",
+ ])
+ .env(vec![
+ ("MY_VAR_1", "MY_VALUE_1"),
+ ("MY_VAR_2", "MY_VALUE_2"),
+ ("MY_VAR_3", "MY_VALUE_3"),
+ ])
+ .injected_files(vec![
+ TransferedFile::new("/file1", "/cfg/file1"),
+ TransferedFile::new("/file2", "/data/file2"),
+ ]),
+ )
+ .await
+ .unwrap();
+
+ // wait 3s for node to spawn and start writing logs
+ sleep(Duration::from_secs(3)).await;
+
+ let processes = get_processes_by_name("dummy_node").await;
+ assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results
+ let old_process_id = processes.first().unwrap().pid();
+ let old_logs_count = node.logs().await.unwrap().lines().count();
+
+ node.restart(None).await.unwrap();
+
+ // wait 3s for node to restart and restart writing logs
+ sleep(Duration::from_secs(3)).await;
+
+ let processes = get_processes_by_name("dummy_node").await;
+ assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results
+ let node_process = processes.first().unwrap();
+
+ // ensure process has correct state
+ assert!(matches!(
+ node_process.stat().unwrap().state().unwrap(),
+ // process can be running or sleeping because we sleep between echo calls
+ procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping
+ ));
+
+ // ensure PID changed
+ assert_ne!(old_process_id, node_process.pid());
+
+ // ensure process restarted with correct args
+ let node_args = node_process.cmdline().unwrap();
+ assert!(node_args.contains(&"-flag1".to_string()));
+ assert!(node_args.contains(&"--flag2".to_string()));
+ assert!(node_args.contains(&"--option1=value1".to_string()));
+ assert!(node_args.contains(&"-option2=value2".to_string()));
+ assert!(node_args.contains(&"--option3 value3".to_string()));
+ assert!(node_args.contains(&"-option4 value4".to_string()));
+
+ // ensure process restarted with correct environment
+ let node_env = node_process.environ().unwrap();
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_1").unwrap())
+ .unwrap(),
+ "MY_VALUE_1"
+ );
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_2").unwrap())
+ .unwrap(),
+ "MY_VALUE_2"
+ );
+ assert_eq!(
+ node_env
+ .get(&OsString::from_str("MY_VAR_3").unwrap())
+ .unwrap(),
+ "MY_VALUE_3"
+ );
+
+ // ensure log writing restarted and they keep being written for some time
+ timeout(Duration::from_secs(30), async {
+ let mut expected_logs_line_count = old_logs_count;
+
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) {
+ if let Some(contents) = file.contents() {
+ if contents.lines().count() >= expected_logs_line_count {
+ if expected_logs_line_count >= old_logs_count + 6 {
+ return;
+ } else {
+ expected_logs_line_count += 2;
+ }
+ }
+ }
+ }
+ }
+ })
+ .await
+ .unwrap();
+
+ // ensure node is present in namespace
+ assert_eq!(namespace.nodes().await.len(), 1);
+ assert!(namespace.nodes().await.get(node.name()).is_some());
+ }
- native_provider
- .create_resource(resource_def, false, true)
+ #[tokio::test]
+ async fn node_destroy_method_should_destroy_the_node_itfself_and_remove_process_and_stop_logs_writing(
+ ) {
+ let fs = InMemoryFileSystem::new(HashMap::from([
+ (OsString::from_str("/").unwrap(), InMemoryFile::dir()),
+ (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()),
+ ]));
+ let provider = NativeProvider::new(fs.clone());
+ let namespace = provider.create_namespace().await.unwrap();
+
+ // spawn dummy node
+ let node = namespace
+ .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node"))
.await
.unwrap();
- assert_eq!(native_provider.process_map.len(), 1);
+ // wait 3s for node to start and begin writing logs
+ sleep(Duration::from_secs(3)).await;
+
+ node.destroy().await.unwrap();
+
+ // wait node 1s to be killed and stop writing logs
+ sleep(Duration::from_secs(1)).await;
+ let logs = node.logs().await.unwrap();
+
+ // ensure process is not running anymore
+ let processes = get_processes_by_name("dummy_node").await;
+ assert_eq!(processes.len(), 0);
+
+ // ensure logs are not being written anymore
+ let _ = timeout(Duration::from_secs(10), async {
+ loop {
+ sleep(Duration::from_millis(200)).await;
+
+ assert_eq!(logs, node.logs().await.unwrap());
+ }
+ })
+ .await;
+
+ // ensure node doesn't exists anymore in namespace
+ assert_eq!(namespace.nodes().await.len(), 0);
+ }
+
+ async fn get_processes_by_name(name: &str) -> Vec {
+ procfs::process::all_processes()
+ .unwrap()
+ .filter_map(|process| {
+ if let Ok(process) = process {
+ process
+ .cmdline()
+ .iter()
+ .any(|args| args.iter().any(|arg| arg.contains(name)))
+ .then_some(process)
+ } else {
+ None
+ }
+ })
+ .collect::>()
}
}
diff --git a/crates/provider/src/shared/constants.rs b/crates/provider/src/shared/constants.rs
index 2fab0358f..e76f5353d 100644
--- a/crates/provider/src/shared/constants.rs
+++ b/crates/provider/src/shared/constants.rs
@@ -1,13 +1,15 @@
use std::net::{IpAddr, Ipv4Addr};
-/// Default dir for configuration inside pods
-pub const DEFAULT_REMOTE_DIR: &str = "/cfg";
-/// Default dir for node /data
-pub const DEFAULT_DATA_DIR: &str = "/data";
+/// Directory for node configuration
+pub const NODE_CONFIG_DIR: &str = "/cfg";
+/// Directory for node configuration
+pub const NODE_DATA_DIR: &str = "/data";
+/// Directory for node scripts
+pub const NODE_SCRIPTS_DIR: &str = "/scripts";
/// Localhost ip
-pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
+pub const _LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
/// The port substrate listens for p2p connections on
-pub const P2P_PORT: u16 = 30333;
+pub const _P2P_PORT: u16 = 30333;
/// The remote port prometheus can be accessed with
pub const _PROMETHEUS_PORT: u16 = 9615;
/// The remote port websocket to access the RPC
diff --git a/crates/provider/src/shared/types.rs b/crates/provider/src/shared/types.rs
index 7ab5e8a41..1219b20ec 100644
--- a/crates/provider/src/shared/types.rs
+++ b/crates/provider/src/shared/types.rs
@@ -1,184 +1,236 @@
-use std::{
- collections::HashMap, os::unix::process::ExitStatusExt, path::PathBuf, process::ExitStatus,
-};
-
-use serde::{Deserialize, Serialize};
+use std::path::{Path, PathBuf};
pub type Port = u16;
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub enum ZombieRole {
- Temp,
- Node,
- BootNode,
- Collator,
- CumulusCollator,
- Authority,
- FullNode,
+#[derive(Debug, Default, Clone, PartialEq)]
+pub struct ProviderCapabilities {
+ pub requires_image: bool,
}
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub enum PortName {
- Prometheus,
- Rpc,
- RpcWs,
- P2P,
+impl ProviderCapabilities {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn requires_image(mut self) -> Self {
+ self.requires_image = true;
+ self
+ }
}
-// TODO: remove when we implement k8s/podman
-#[allow(dead_code)]
-#[derive(Debug, Clone, PartialEq)]
-enum ImagePullPolicy {
- IfNotPresent,
- Never,
- Always,
+pub struct SpawnNodeOptions {
+ pub name: String,
+ pub command: String,
+ pub args: Vec,
+ pub env: Vec<(String, String)>,
+ pub injected_files: Vec,
}
-#[derive(Debug, Clone, PartialEq)]
-pub struct FileMap {
- pub local_file_path: PathBuf,
- pub remote_file_path: PathBuf,
- pub is_unique: bool,
+impl SpawnNodeOptions {
+ pub fn new(name: S, command: S) -> Self
+ where
+ S: AsRef,
+ {
+ Self {
+ name: name.as_ref().to_string(),
+ command: command.as_ref().to_string(),
+ args: vec![],
+ env: vec![],
+ injected_files: vec![],
+ }
+ }
+
+ pub fn args(mut self, args: I) -> Self
+ where
+ S: AsRef,
+ I: IntoIterator- ,
+ {
+ self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
+ self
+ }
+
+ pub fn env
(mut self, env: I) -> Self
+ where
+ S: AsRef,
+ I: IntoIterator- ,
+ {
+ self.env = env
+ .into_iter()
+ .map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
+ .collect();
+ self
+ }
+
+ pub fn injected_files(mut self, injected_files: I) -> Self
+ where
+ I: IntoIterator
- ,
+ {
+ self.injected_files = injected_files.into_iter().collect();
+ self
+ }
}
-#[derive(Debug, Clone, PartialEq)]
-pub struct RunCommandResponse {
- pub exit_code: ExitStatus,
- pub std_out: String,
- pub std_err: Option,
+pub struct GenerateFileCommand {
+ pub command: String,
+ pub args: Vec,
+ pub env: Vec<(String, String)>,
+ pub local_output_path: PathBuf,
}
-impl RunCommandResponse {
- pub fn default() -> Self {
+impl GenerateFileCommand {
+ pub fn new
(command: S, local_output_path: P) -> Self
+ where
+ S: AsRef,
+ P: AsRef,
+ {
Self {
- exit_code: ExitStatus::from_raw(0),
- std_out: String::default(),
- std_err: None,
+ command: command.as_ref().to_string(),
+ args: vec![],
+ env: vec![],
+ local_output_path: local_output_path.as_ref().into(),
}
}
-}
-#[derive(Debug, Default, Clone, PartialEq)]
-pub struct NativeRunCommandOptions {
- pub is_failure_allowed: bool,
-}
+ pub fn args(mut self, args: I) -> Self
+ where
+ S: AsRef,
+ I: IntoIterator- ,
+ {
+ self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
+ self
+ }
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub struct NamespaceLabels {
- job_id: String,
- project_name: String,
+ pub fn env
(mut self, env: I) -> Self
+ where
+ S: AsRef,
+ I: IntoIterator- ,
+ {
+ self.env = env
+ .into_iter()
+ .map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
+ .collect();
+ self
+ }
}
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub struct NamespaceMetadata {
- pub name: String,
- pub labels: Option,
+pub struct GenerateFilesOptions {
+ pub commands: Vec,
+ pub injected_files: Vec,
}
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub struct NamespaceDef {
- pub api_version: String,
- pub kind: String,
- pub metadata: NamespaceMetadata,
-}
+impl GenerateFilesOptions {
+ pub fn new