From d1354c799fed1edb7211a523f7c6f330088fdb2e Mon Sep 17 00:00:00 2001 From: Loris Moulin <45130584+l0r1s@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:50:14 +0200 Subject: [PATCH] Feat/process manager (#122) --- .github/workflows/ci.yml | 11 +- Cargo.toml | 1 - .../examples/simple_network_example.rs | 5 +- .../examples/small_network_with_default.rs | 5 +- .../examples/small_network_with_para.rs | 5 +- crates/orchestrator/src/spawner.rs | 6 +- crates/provider/Cargo.toml | 5 +- crates/provider/src/native.rs | 1353 ++++++++++++----- crates/provider/src/shared/types.rs | 18 +- crates/provider/testing/dummy_node | 11 - crates/provider/testing/dummy_script | 9 - crates/support/Cargo.toml | 4 +- crates/support/src/fs.rs | 65 +- crates/support/src/fs/in_memory.rs | 145 +- crates/support/src/fs/local.rs | 62 +- crates/support/src/lib.rs | 1 + crates/support/src/process.rs | 119 ++ crates/support/src/process/fake.rs | 365 +++++ crates/support/src/process/os.rs | 103 ++ 19 files changed, 1730 insertions(+), 563 deletions(-) delete mode 100755 crates/provider/testing/dummy_node delete mode 100644 crates/provider/testing/dummy_script create mode 100644 crates/support/src/process.rs create mode 100644 crates/support/src/process/fake.rs create mode 100644 crates/support/src/process/os.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b13312c4b..41d1b21ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,12 +37,8 @@ jobs: - name: Build run: cargo build - - name: Tests (except provider crate) - run: cargo test --workspace --exclude provider - - - name: Tests (provider crate) - # there should be a unique test thread for native provider tests (asserting spawned processes count) - run: cargo test -p provider -- --test-threads 1 + - name: Tests + run: cargo test --workspace coverage: name: Zombienet SDK - coverage @@ -64,8 +60,7 @@ jobs: uses: taiki-e/install-action@cargo-llvm-cov - name: Collect coverage data - # there should be a unique test thread for native provider tests (asserting spawned processes count) - run: cargo llvm-cov nextest -j 1 --lcov --output-path lcov.info + run: cargo llvm-cov nextest --lcov --output-path lcov.info - name: Report code coverage uses: Nef10/lcov-reporter-action@v0.4.0 diff --git a/Cargo.toml b/Cargo.toml index fca4768d4..83512b9ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ multiaddr = "0.18" url = "2.3" uuid = "1.4" nix = "0.27" -procfs = "0.15" pest = "2.7" pest_derive = "2.7" rand = "0.8" diff --git a/crates/examples/examples/simple_network_example.rs b/crates/examples/examples/simple_network_example.rs index 676511ce5..fb42ef231 100644 --- a/crates/examples/examples/simple_network_example.rs +++ b/crates/examples/examples/simple_network_example.rs @@ -3,7 +3,7 @@ use configuration::NetworkConfig; use orchestrator::Orchestrator; use provider::NativeProvider; -use support::fs::local::LocalFileSystem; +use support::{fs::local::LocalFileSystem, process::os::OsProcessManager}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -11,7 +11,8 @@ async fn main() -> Result<(), Box> { .expect("errored?"); let fs = LocalFileSystem; - let provider = NativeProvider::new(fs.clone()); + let pm = OsProcessManager; + let provider = NativeProvider::new(fs.clone(), pm); let orchestrator = Orchestrator::new(fs, provider); orchestrator.spawn(config).await?; println!("🚀🚀🚀🚀 network deployed"); diff --git a/crates/examples/examples/small_network_with_default.rs b/crates/examples/examples/small_network_with_default.rs index 7b9afef50..663eb6c15 100644 --- a/crates/examples/examples/small_network_with_default.rs +++ b/crates/examples/examples/small_network_with_default.rs @@ -1,7 +1,7 @@ use configuration::NetworkConfigBuilder; use orchestrator::{AddNodeOpts, Orchestrator}; use provider::NativeProvider; -use support::fs::local::LocalFileSystem; +use support::{fs::local::LocalFileSystem, process::os::OsProcessManager}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -21,7 +21,8 @@ async fn main() -> Result<(), Box> { .unwrap(); let fs = LocalFileSystem; - let provider = NativeProvider::new(fs.clone()); + let pm = OsProcessManager; + let provider = NativeProvider::new(fs.clone(), pm); let orchestrator = Orchestrator::new(fs, provider); let mut network = orchestrator.spawn(config).await?; println!("🚀🚀🚀🚀 network deployed"); diff --git a/crates/examples/examples/small_network_with_para.rs b/crates/examples/examples/small_network_with_para.rs index 37c57d8a7..81ac29cac 100644 --- a/crates/examples/examples/small_network_with_para.rs +++ b/crates/examples/examples/small_network_with_para.rs @@ -3,7 +3,7 @@ use std::time::Duration; use configuration::{NetworkConfigBuilder, RegistrationStrategy}; use orchestrator::{AddNodeOpts, Orchestrator}; use provider::NativeProvider; -use support::fs::local::LocalFileSystem; +use support::{fs::local::LocalFileSystem, process::os::OsProcessManager}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -24,7 +24,8 @@ async fn main() -> Result<(), Box> { .unwrap(); let fs = LocalFileSystem; - let provider = NativeProvider::new(fs.clone()); + let pm = OsProcessManager; + let provider = NativeProvider::new(fs.clone(), pm); let orchestrator = Orchestrator::new(fs, provider); let mut network = orchestrator.spawn(config).await?; println!("🚀🚀🚀🚀 network deployed"); diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs index a0271d340..0ff0e7148 100644 --- a/crates/orchestrator/src/spawner.rs +++ b/crates/orchestrator/src/spawner.rs @@ -105,7 +105,7 @@ where bootnode_addr: ctx.bootnodes_addr.clone(), }; - let (cmd, args) = match ctx.role { + let (program, args) = match ctx.role { // Collator should be `non-cumulus` one (e.g adder/undying) ZombieRole::Node | ZombieRole::Collator => { @@ -126,11 +126,11 @@ where println!("\n"); println!("🚀 {}, spawning.... with command:", node.name); - println!("{cmd} {}", args.join(" ")); + println!("{program} {}", args.join(" ")); let spawn_ops = SpawnNodeOptions { name: node.name.clone(), - command: cmd, + program, args, env: node .env diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 7001258ed..374a1eb76 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -24,7 +24,4 @@ tokio = { workspace = true, features = [ thiserror = { workspace = true } anyhow = { workspace = true } uuid = { workspace = true, features = ["v4"] } -nix = { workspace = true, features = ["signal"] } - -[dev-dependencies] -procfs = { workspace = true } +nix = { workspace = true, features = ["signal"] } \ No newline at end of file diff --git a/crates/provider/src/native.rs b/crates/provider/src/native.rs index 2bc95e90c..96e49e589 100644 --- a/crates/provider/src/native.rs +++ b/crates/provider/src/native.rs @@ -1,7 +1,6 @@ use std::{ self, collections::HashMap, - fmt::Debug, io::Error, net::IpAddr, path::PathBuf, @@ -13,14 +12,13 @@ use anyhow::anyhow; use async_trait::async_trait; use configuration::types::Port; use futures::{future::try_join_all, try_join}; -use nix::{ - sys::signal::{kill, Signal}, - unistd::Pid, +use nix::{sys::signal::Signal, unistd::Pid}; +use support::{ + fs::FileSystem, + process::{Command, DynProcess, ProcessManager}, }; -use support::fs::FileSystem; use tokio::{ io::{AsyncRead, AsyncReadExt, BufReader}, - process::{Child, Command}, sync::{ mpsc::{self, Receiver, Sender}, RwLock, @@ -38,26 +36,42 @@ use crate::{ RunScriptOptions, SpawnNodeOptions, }; -#[derive(Debug, Clone)] -pub struct NativeProvider { +#[derive(Clone)] +pub struct NativeProvider +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ capabilities: ProviderCapabilities, tmp_dir: PathBuf, filesystem: FS, - inner: Arc>>, + process_manager: PM, + inner: Arc>>, } -#[derive(Debug)] -struct NativeProviderInner { - namespaces: HashMap>, +struct NativeProviderInner +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ + namespaces: HashMap>, } -#[derive(Debug, Clone)] -struct WeakNativeProvider { - inner: Weak>>, +#[derive(Clone)] +struct WeakNativeProvider +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ + inner: Weak>>, } -impl NativeProvider { - pub fn new(filesystem: FS) -> Self { +impl NativeProvider +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ + pub fn new(filesystem: FS, process_manager: PM) -> Self { NativeProvider { capabilities: ProviderCapabilities::new(), // NOTE: temp_dir in linux return `/tmp` but on mac something like @@ -66,6 +80,7 @@ impl NativeProvider { // you try to build a fullpath by concatenate. Use Pathbuf to prevent the issue. tmp_dir: std::env::temp_dir(), filesystem, + process_manager, inner: Arc::new(RwLock::new(NativeProviderInner { namespaces: Default::default(), })), @@ -79,7 +94,11 @@ impl NativeProvider { } #[async_trait] -impl Provider for NativeProvider { +impl Provider for NativeProvider +where + FS: FileSystem + Send + Sync + Clone + 'static, + PM: ProcessManager + Send + Sync + Clone + 'static, +{ fn capabilities(&self) -> &ProviderCapabilities { &self.capabilities } @@ -106,6 +125,7 @@ impl Provider for NativeProvider id: id.clone(), base_dir, filesystem: self.filesystem.clone(), + process_manager: self.process_manager.clone(), provider: WeakNativeProvider { inner: Arc::downgrade(&self.inner), }, @@ -120,27 +140,43 @@ impl Provider for NativeProvider } } -#[derive(Debug, Clone)] -pub struct NativeNamespace { +#[derive(Clone)] +pub struct NativeNamespace +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ id: String, base_dir: PathBuf, - inner: Arc>>, + inner: Arc>>, filesystem: FS, - provider: WeakNativeProvider, + process_manager: PM, + provider: WeakNativeProvider, } -#[derive(Debug)] -struct NativeNamespaceInner { - nodes: HashMap>, +struct NativeNamespaceInner +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ + nodes: HashMap>, } -#[derive(Debug, Clone)] -struct WeakNativeNamespace { - inner: Weak>>, +#[derive(Clone)] +struct WeakNativeNamespace +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ + inner: Weak>>, } #[async_trait] -impl ProviderNamespace for NativeNamespace { +impl ProviderNamespace for NativeNamespace +where + FS: FileSystem + Send + Sync + Clone + 'static, + PM: ProcessManager + Send + Sync + Clone + 'static, +{ fn id(&self) -> &str { &self.id } @@ -212,17 +248,19 @@ impl ProviderNamespace for Nativ let (process, stdout_reading_handle, stderr_reading_handle, log_writing_handle) = create_process_with_log_tasks( &options.name, - &options.command, + &options.program, &options.args, &options.env, &log_path, self.filesystem.clone(), - )?; + self.process_manager.clone(), + ) + .await?; // create node structure holding state let node = NativeNode { name: options.name.clone(), - command: options.command, + command: options.program, args: options.args, env: options.env, base_dir, @@ -231,6 +269,7 @@ impl ProviderNamespace for Nativ scripts_dir, log_path, filesystem: self.filesystem.clone(), + process_manager: self.process_manager.clone(), namespace: WeakNativeNamespace { inner: Arc::downgrade(&self.inner), }, @@ -253,7 +292,7 @@ impl ProviderNamespace for Nativ let temp_node = self .spawn_node(SpawnNodeOptions { name: format!("temp_{}", Uuid::new_v4()), - command: "bash".to_string(), + program: "bash".to_string(), args: vec!["-c".to_string(), "while :; do sleep 1; done".to_string()], env: vec![], injected_files: options.injected_files, @@ -262,7 +301,7 @@ impl ProviderNamespace for Nativ .await?; for GenerateFileCommand { - command, + program, args, env, local_output_path, @@ -284,7 +323,7 @@ impl ProviderNamespace for Nativ ); match temp_node - .run_command(RunCommandOptions { command, args, env }) + .run_command(RunCommandOptions { program, args, env }) .await .map_err(|err| ProviderError::FileGenerationFailed(err.into()))? { @@ -308,7 +347,8 @@ impl ProviderNamespace for Nativ async fn destroy(&self) -> Result<(), ProviderError> { // we need to clone nodes (behind an Arc, so cheaply) to avoid deadlock between the inner.write lock and the node.destroy // method acquiring a lock the namespace to remove the node from the nodes hashmap. - let nodes: Vec> = self.inner.write().await.nodes.values().cloned().collect(); + let nodes: Vec> = + self.inner.write().await.nodes.values().cloned().collect(); for node in nodes.iter() { node.destroy().await?; } @@ -322,8 +362,12 @@ impl ProviderNamespace for Nativ } } -#[derive(Debug, Clone)] -struct NativeNode { +#[derive(Clone)] +struct NativeNode +where + FS: FileSystem + Send + Sync + Clone, + PM: ProcessManager + Send + Sync + Clone, +{ name: String, command: String, args: Vec, @@ -335,19 +379,23 @@ struct NativeNode { log_path: PathBuf, inner: Arc>, filesystem: FS, - namespace: WeakNativeNamespace, + process_manager: PM, + namespace: WeakNativeNamespace, } -#[derive(Debug)] struct NativeNodeInner { - process: Child, + process: DynProcess, stdout_reading_handle: JoinHandle<()>, stderr_reading_handle: JoinHandle<()>, log_writing_handle: JoinHandle<()>, } #[async_trait] -impl ProviderNode for NativeNode { +impl ProviderNode for NativeNode +where + FS: FileSystem + Send + Sync + Clone + 'static, + PM: ProcessManager + Send + Sync + Clone + 'static, +{ fn name(&self) -> &str { &self.name } @@ -404,12 +452,15 @@ impl ProviderNode for NativeNode &self, options: RunCommandOptions, ) -> Result { - let result = Command::new(options.command.clone()) - .args(options.args) - .envs(options.env) - .output() + let result = self + .process_manager + .output( + Command::new(options.program.clone()) + .args(options.args) + .envs(options.env), + ) .await - .map_err(|err| ProviderError::RunCommandError(options.command, err.into()))?; + .map_err(|err| ProviderError::RunCommandError(options.program, err.into()))?; if result.status.success() { Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string())) @@ -427,10 +478,7 @@ impl ProviderNode for NativeNode ) -> Result { let local_script_path = PathBuf::from(&options.local_script_path); - if !local_script_path - .try_exists() - .map_err(|err| ProviderError::InvalidScriptPath(err.into()))? - { + if !self.filesystem.exists(&local_script_path).await { return Err(ProviderError::ScriptNotFound(local_script_path)); } @@ -456,7 +504,7 @@ impl ProviderNode for NativeNode // execute script self.run_command(RunCommandOptions { - command: remote_script_path, + program: remote_script_path, args: options.args, env: options.env, }) @@ -480,9 +528,11 @@ impl ProviderNode for NativeNode async fn pause(&self) -> Result<(), ProviderError> { let inner = self.inner.write().await; - let pid = retrieve_pid_from_process(&inner.process, &self.name)?; + let pid = retrieve_pid_from_process(&inner.process, &self.name).await?; - kill(pid, Signal::SIGSTOP) + self.process_manager + .kill(pid, Signal::SIGSTOP) + .await .map_err(|_| ProviderError::PauseNodeFailed(self.name.clone()))?; Ok(()) @@ -490,9 +540,11 @@ impl ProviderNode for NativeNode async fn resume(&self) -> Result<(), ProviderError> { let inner = self.inner.write().await; - let pid = retrieve_pid_from_process(&inner.process, &self.name)?; + let pid = retrieve_pid_from_process(&inner.process, &self.name).await?; - kill(pid, Signal::SIGCONT) + self.process_manager + .kill(pid, Signal::SIGCONT) + .await .map_err(|_| ProviderError::ResumeNodeFaied(self.name.clone()))?; Ok(()) @@ -524,7 +576,9 @@ impl ProviderNode for NativeNode &self.env, &self.log_path, self.filesystem.clone(), - )?; + self.process_manager.clone(), + ) + .await?; // update node process and handlers inner.process = process; @@ -536,7 +590,7 @@ impl ProviderNode for NativeNode } async fn destroy(&self) -> Result<(), ProviderError> { - let mut inner = self.inner.write().await; + let inner = self.inner.write().await; inner.log_writing_handle.abort(); inner.stdout_reading_handle.abort(); @@ -555,10 +609,14 @@ impl ProviderNode for NativeNode } } -fn retrieve_pid_from_process(process: &Child, node_name: &str) -> Result { +async fn retrieve_pid_from_process( + process: &DynProcess, + node_name: &str, +) -> Result { Ok(Pid::from_raw( process .id() + .await .ok_or(ProviderError::ProcessIdRetrievalFailed( node_name.to_string(), ))? @@ -567,10 +625,10 @@ fn retrieve_pid_from_process(process: &Child, node_name: &str) -> Result, Error>>, -) -> JoinHandle<()> { +fn create_stream_polling_task(stream: S, tx: Sender, Error>>) -> JoinHandle<()> +where + S: AsyncRead + Unpin + Send + 'static, +{ tokio::spawn(async move { let mut reader = BufReader::new(stream); let mut buffer = vec![0u8; 1024]; @@ -593,44 +651,61 @@ fn create_stream_polling_task( }) } -fn create_log_writing_task( +fn create_log_writing_task( mut rx: Receiver, Error>>, - filesystem: impl FileSystem + Send + Sync + 'static, + filesystem: FS, log_path: PathBuf, -) -> JoinHandle<()> { +) -> JoinHandle<()> +where + FS: FileSystem + Send + Sync + 'static, +{ tokio::spawn(async move { loop { - sleep(Duration::from_millis(250)).await; while let Some(Ok(data)) = rx.recv().await { // TODO: find a better way instead of ignoring error ? let _ = filesystem.append(&log_path, data).await; } + sleep(Duration::from_millis(250)).await; } }) } -type CreateProcessOutput = (Child, JoinHandle<()>, JoinHandle<()>, JoinHandle<()>); +type CreateProcessOutput = (DynProcess, JoinHandle<()>, JoinHandle<()>, JoinHandle<()>); -fn create_process_with_log_tasks( +async fn create_process_with_log_tasks( name: &str, - command: &str, + program: &str, args: &Vec, env: &Vec<(String, String)>, log_path: &PathBuf, - filesystem: impl FileSystem + Send + Sync + 'static, -) -> Result { + filesystem: FS, + process_manager: PM, +) -> Result +where + FS: FileSystem + Send + Sync + 'static, + PM: ProcessManager + Send + Sync + 'static, +{ // create process - let mut process = Command::new(command) - .args(args) - .envs(env.to_owned()) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() + let process = process_manager + .spawn( + Command::new(program) + .args(args) + .envs(env.to_owned()) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true), + ) + .await .map_err(|err| ProviderError::NodeSpawningFailed(name.to_string(), err.into()))?; - let stdout = process.stdout.take().expect("infaillible, stdout is piped"); - let stderr = process.stderr.take().expect("Infaillible, stderr is piped"); + let stdout = process + .take_stdout() + .await + .expect("infaillible, stdout is piped"); + let stderr = process + .take_stderr() + .await + .expect("Infaillible, stderr is piped"); // create additionnal long-running tasks for logs let (stdout_tx, rx) = mpsc::channel(10); @@ -649,10 +724,12 @@ fn create_process_with_log_tasks( #[cfg(test)] mod tests { - use std::{ffi::OsString, fs, str::FromStr}; + use std::{ffi::OsString, os::unix::process::ExitStatusExt, process::ExitStatus, str::FromStr}; - use procfs::process::Process; - use support::fs::in_memory::{InMemoryFile, InMemoryFileSystem}; + use support::{ + fs::in_memory::{InMemoryFile, InMemoryFileSystem}, + process::fake::{DynamicStreamValue, FakeProcessManager, FakeProcessState, StreamValue}, + }; use tokio::time::timeout; use super::*; @@ -661,7 +738,8 @@ mod tests { #[test] fn provider_capabilities_method_should_return_provider_capabilities() { let fs = InMemoryFileSystem::default(); - let provider = NativeProvider::new(fs); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs, pm); let capabilities = provider.capabilities(); @@ -682,7 +760,8 @@ mod tests { InMemoryFile::dir(), ), ])); - let provider = NativeProvider::new(fs.clone()).tmp_dir("/someotherdir"); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/someotherdir"); // we create a namespace to ensure tmp dir will be used to store namespace let namespace = provider.create_namespace().await.unwrap(); @@ -696,7 +775,8 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); @@ -721,7 +801,8 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); assert_eq!(provider.namespaces().await.len(), 0); } @@ -733,7 +814,8 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); @@ -748,7 +830,8 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm).tmp_dir("/tmp"); let namespace1 = provider.create_namespace().await.unwrap(); let namespace2 = provider.create_namespace().await.unwrap(); @@ -772,12 +855,20 @@ mod tests { InMemoryFile::file("My file 2"), ), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); let node = namespace .spawn_node( - SpawnNodeOptions::new("mynode", "./testing/dummy_node") + SpawnNodeOptions::new("mynode", "/path/to/my/node_binary") .args(vec![ "-flag1", "--flag2", @@ -849,68 +940,77 @@ mod tests { "My file 2" ); - // retrieve running process - let processes = get_processes_by_name("dummy_node").await; + // ensure only one process exists + assert_eq!(pm.count().await, 1); - // ensure only one dummy process exists - assert_eq!(processes.len(), 1); - let node_process = processes.first().unwrap(); + // retrieve the process + let processes = pm.processes().await; + let process = processes.first().unwrap(); // ensure process has correct state - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - // process can be running or sleeping because we sleep between echo calls - procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping - )); + assert!(matches!(process.state().await, FakeProcessState::Running)); // ensure process is passed correct args - let node_args = node_process.cmdline().unwrap(); - assert!(node_args.contains(&"-flag1".to_string())); - assert!(node_args.contains(&"--flag2".to_string())); - assert!(node_args.contains(&"--option1=value1".to_string())); - assert!(node_args.contains(&"-option2=value2".to_string())); - assert!(node_args.contains(&"--option3 value3".to_string())); - assert!(node_args.contains(&"-option4 value4".to_string())); + assert!(process + .args + .contains(&OsString::from_str("-flag1").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--flag2").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--option1=value1").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("-option2=value2").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--option3 value3").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("-option4 value4").unwrap())); // ensure process has correct environment - let node_env = node_process.environ().unwrap(); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_1").unwrap()) - .unwrap(), - "MY_VALUE_1" - ); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_2").unwrap()) - .unwrap(), - "MY_VALUE_2" - ); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_3").unwrap()) - .unwrap(), - "MY_VALUE_3" - ); + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_1").unwrap(), + OsString::from_str("MY_VALUE_1").unwrap() + ))); + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_2").unwrap(), + OsString::from_str("MY_VALUE_2").unwrap() + ))); + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_3").unwrap(), + OsString::from_str("MY_VALUE_3").unwrap() + ))); // ensure log file is created and logs are written and keep being written for some time - timeout(Duration::from_secs(30), async { - let mut expected_logs_line_count = 2; - + pm.advance_by(1).await; + let expected = ["Line 1\n", "Line 1\nLine 2\n", "Line 1\nLine 2\nLine 3\n"]; + let mut index = 0; + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; - - if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) { - if let Some(contents) = file.contents() { - if contents.lines().count() >= expected_logs_line_count { - if expected_logs_line_count >= 6 { - return; - } else { - expected_logs_line_count += 2; - } - } + // if we reach the expected len, all logs have been emited correctly in order + if index == expected.len() { + break; + } + + // check if there is some existing file with contents + if let Some(contents) = fs + .files + .read() + .await + .get(node.log_path().as_os_str()) + .map(|file| file.contents().unwrap()) + { + // if the contents correspond to what we expect, we continue to check the next expected thing and simulate cpu cycle + if contents == expected[index] { + index += 1; + pm.advance_by(1).await; } } + + sleep(Duration::from_millis(10)).await; } }) .await @@ -928,7 +1028,8 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); namespace @@ -942,11 +1043,35 @@ mod tests { // we must match here because Arc doesn't implements Debug, so unwrap_err is not an option match result { - Ok(_) => panic!("expected result to be an error"), + Ok(_) => unreachable!(), Err(err) => assert_eq!(err.to_string(), "Duplicated node name: mynode"), }; } + #[tokio::test] + async fn namespace_spawn_node_method_should_returns_an_error_spawning_process_failed() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::new()); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); + + // force error + pm.spawn_should_error(std::io::ErrorKind::TimedOut).await; + + let result = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .await; + + // we must match here because Arc doesn't implements Debug, so unwrap_err is not an option + match result { + Ok(_) => unreachable!(), + Err(err) => assert_eq!(err.to_string(), "Failed to spawn node 'mynode': timed out"), + }; + } + #[tokio::test] async fn namespace_generate_files_method_should_create_files_at_the_correct_locations_using_given_commands( ) { @@ -954,7 +1079,21 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([ + ( + OsString::from_str("echo").unwrap(), + vec![StreamValue::DynamicStdout(DynamicStreamValue::new( + |_, args, _| format!("{}\n", args.first().unwrap().to_string_lossy()), + ))], + ), + ( + OsString::from_str("sh").unwrap(), + vec![StreamValue::DynamicStdout(DynamicStreamValue::new( + |_, _, envs| envs.first().unwrap().1.to_string_lossy().to_string(), + ))], + ), + ])); + let provider = NativeProvider::new(fs.clone(), pm).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); namespace @@ -1011,16 +1150,24 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn 2 dummy nodes to populate namespace namespace - .spawn_node(SpawnNodeOptions::new("mynode1", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode1", "/path/to/my/node_binary")) .await .unwrap(); namespace - .spawn_node(SpawnNodeOptions::new("mynode2", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode2", "/path/to/my/node_binary")) .await .unwrap(); @@ -1032,11 +1179,8 @@ mod tests { // ensure nodes are destroyed assert_eq!(namespace.nodes().await.len(), 0); - // retrieve running process - let processes = get_processes_by_name("dummy_node").await; - // ensure no running process exists - assert_eq!(processes.len(), 0); + assert_eq!(pm.processes().await.len(), 0); // ensure namespace is destroyed assert_eq!(provider.namespaces().await.len(), 0); @@ -1048,28 +1192,43 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait some time for node to write logs - sleep(Duration::from_secs(5)).await; + // simulate logs process manager output + pm.advance_by(3).await; - assert_eq!( - fs.files - .read() - .await - .get(node.log_path().as_os_str()) - .unwrap() - .contents() - .unwrap(), - node.logs().await.unwrap() - ); + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 3) + { + return; + } + + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\nLine 3\n"); } #[tokio::test] @@ -1078,35 +1237,57 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait some time for node to write logs - sleep(Duration::from_secs(5)).await; + // simulate logs process manager output + pm.advance_by(3).await; + + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 3) + { + return; + } + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + + // dump logs node.dump_logs(PathBuf::from("/tmp/my_log_file")) .await .unwrap(); - let files = fs.files.read().await; - assert_eq!( - files - .get(node.log_path().as_os_str()) - .unwrap() - .contents() - .unwrap(), - files + fs.files + .read() + .await .get(&OsString::from_str("/tmp/my_log_file").unwrap()) .unwrap() .contents() .unwrap(), + "Line 1\nLine 2\nLine 3\n" ); } @@ -1116,15 +1297,33 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([ + ( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + ), + ( + OsString::from_str("sh").unwrap(), + vec![StreamValue::DynamicStdout(DynamicStreamValue::new( + |_, _, envs| format!("{}\n", envs.first().unwrap().1.to_string_lossy()), + ))], + ), + ])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); + pm.advance_by(3).await; + let result = node .run_command( RunCommandOptions::new("sh") @@ -1143,21 +1342,38 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([ + ( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + ), + ( + OsString::from_str("sh").unwrap(), + vec![StreamValue::Stderr("Some error happened".to_string())], + ), + ])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); + // force error + pm.output_should_fail(ExitStatus::from_raw(1)).await; + let result = node .run_command(RunCommandOptions::new("sh").args(vec!["-fakeargs"])) .await; assert!( - matches!(result, Ok(Err((exit_code, stderr))) if !exit_code.success() && !stderr.is_empty()) + matches!(result, Ok(Err((exit_code, stderr))) if !exit_code.success() && stderr == "Some error happened") ); } @@ -1167,15 +1383,26 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); + // force error + pm.output_should_error(std::io::ErrorKind::NotFound).await; + let err = node .run_command(RunCommandOptions::new("myrandomprogram")) .await @@ -1183,7 +1410,7 @@ mod tests { assert_eq!( err.to_string(), - "Error running command 'myrandomprogram': No such file or directory (os error 2)" + "Error running command 'myrandomprogram': entity not found" ); } @@ -1192,26 +1419,58 @@ mod tests { let fs = InMemoryFileSystem::new(HashMap::from([ (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/path").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/path/to").unwrap(), InMemoryFile::dir()), + ( + OsString::from_str("/path/to/my").unwrap(), + InMemoryFile::dir(), + ), ( - OsString::from_str("/tmp/dummy_script").unwrap(), - InMemoryFile::mirror( - "/tmp/dummy_script", - fs::read_to_string("./testing/dummy_script").unwrap(), - ), + OsString::from_str("/path/to/my/script").unwrap(), + InMemoryFile::file("some script"), ), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); + // we need to push stream after node spawn because the final script path is determined by the node local path + pm.push_stream( + format!("{}/script", node.scripts_dir().to_string_lossy()).into(), + vec![ + StreamValue::Stdout("My script\n".to_string()), + StreamValue::DynamicStdout(DynamicStreamValue::new(|_, _, envs| { + format!("{}\n", envs.first().unwrap().1.to_string_lossy()) + })), + StreamValue::DynamicStdout(DynamicStreamValue::new(|_, args, _| { + if args.first().is_some_and(|arg| arg == "-c") { + "With args\n".to_string() + } else { + String::new() + } + })), + ], + ) + .await; + + pm.advance_by(3).await; + let result = node .run_script( - RunScriptOptions::new("/tmp/dummy_script") + RunScriptOptions::new("/path/to/my/script") .args(vec!["-c"]) .env(vec![("MY_ENV_VAR", "With env")]), ) @@ -1220,23 +1479,88 @@ mod tests { assert!(matches!(result, Ok(Ok(stdout)) if stdout == "My script\nWith env\nWith args\n")); } + #[tokio::test] + async fn run_script_method_should_fails_if_script_doesnt_exists_locally() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); + + // spawn dummy node + let node = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) + .await + .unwrap(); + + // simulate process advancing + pm.advance_by(3).await; + + let err = node + .run_script( + RunScriptOptions::new("/path/to/my/script") + .args(vec!["-c"]) + .env(vec![("MY_ENV_VAR", "With env")]), + ) + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Script with path /path/to/my/script not found" + ); + } + #[tokio::test] async fn node_copy_file_from_node_method_should_copy_node_remote_file_to_local_path() { let fs = InMemoryFileSystem::new(HashMap::from([ (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait 3s for node to start writing logs - sleep(Duration::from_secs(3)).await; + pm.advance_by(3).await; + + // wait for logs to be written + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 3) + { + return; + } + + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); node.copy_file_from_node( PathBuf::from("/mynode.log"), @@ -1260,126 +1584,291 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + StreamValue::Stdout("Line 4\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait 2s for node to spawn - sleep(Duration::from_secs(2)).await; + { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); + + // ensure process has correct state pre-pause + assert!(matches!( + node_process.state().await, + FakeProcessState::Running + )); + + // simulate logs process manager output + pm.advance_by(2).await; + } - // retrieve running process - let processes = get_processes_by_name("dummy_node").await; - let node_process = processes.first().unwrap(); + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } - // ensure process has correct state pre-pause - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - // process can be running or sleeping because we sleep between echo calls - procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping - )); + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); + + // pause the node node.pause().await.unwrap(); - // wait node 1s to stop writing logs - sleep(Duration::from_secs(1)).await; - let logs = node.logs().await.unwrap(); + // simulate process manager advancing process when process paused + { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); + + // ensure process has correct state post-pause + assert!(matches!( + node_process.state().await, + FakeProcessState::Stopped + )); + + pm.advance_by(2).await; + } - // ensure process has been paused for 10sec and logs stopped writing - let _ = timeout(Duration::from_secs(10), async { + // ensure logs didn't change after some time or timeout + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - procfs::process::ProcState::Stopped - )); - assert_eq!(logs, node.logs().await.unwrap()); + sleep(Duration::from_millis(10)).await; } }) - .await; + .await + .unwrap(); + + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); } #[tokio::test] - async fn node_resume_method_should_resume_the_paused_node_process() { + async fn node_pause_method_should_fails_if_some_error_happened() { let fs = InMemoryFileSystem::new(HashMap::from([ (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait 2s for node to spawn - sleep(Duration::from_secs(2)).await; + // simulate processes advancing + pm.advance_by(3).await; - // retrieve running process - let processes = get_processes_by_name("dummy_node").await; - assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results - let node_process = processes.first().unwrap(); + // force error + pm.kill_should_error(nix::errno::Errno::EPERM).await; - node.pause().await.unwrap(); + // pause the node where some error would happen + let err = node.pause().await.unwrap_err(); - // ensure process has been paused for 5sec - let _ = timeout(Duration::from_secs(5), async { - loop { - sleep(Duration::from_millis(200)).await; + assert_eq!(err.to_string(), "Failed to pause node 'mynode'"); + } - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - procfs::process::ProcState::Stopped - )); - } - }) - .await; + #[tokio::test] + async fn node_resume_method_should_resume_the_paused_node_process() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + StreamValue::Stdout("Line 4\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); - node.resume().await.unwrap(); + // spawn dummy node + let node = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) + .await + .unwrap(); + + { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); + + // ensure process has correct state pre-pause + assert!(matches!( + node_process.state().await, + FakeProcessState::Running + )); + + // simulate logs process manager output + pm.advance_by(2).await; + } - // ensure process has been resumed for 10sec - let _ = timeout(Duration::from_secs(10), async { + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - // process can be running or sleeping because we sleep between echo calls - procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping - )); + sleep(Duration::from_millis(10)).await; } }) - .await; + .await + .unwrap(); - // ensure logs continue being written for some time - timeout(Duration::from_secs(30), async { - let mut expected_logs_line_count = 2; + // ensure logs are correct after some time + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); + node.pause().await.unwrap(); + + { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); + + // ensure process has correct state post-pause / pre-resume + assert!(matches!( + node_process.state().await, + FakeProcessState::Stopped + )); + + // simulate logs process manager output + pm.advance_by(2).await; + } + + // ensure logs are not written when process is paused + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); + + node.resume().await.unwrap(); + + { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); + + // ensure process has correct state post-resume + assert!(matches!( + node_process.state().await, + FakeProcessState::Running + )); + + // simulate logs process manager output + pm.advance_by(2).await; + } + + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; - - if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) { - if let Some(contents) = file.contents() { - if contents.lines().count() >= expected_logs_line_count { - if expected_logs_line_count >= 6 { - return; - } else { - expected_logs_line_count += 2; - } - } - } + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 4) + { + return; } + + sleep(Duration::from_millis(10)).await; } }) .await .unwrap(); + + // ensure logs are written and correct after process is resumed + assert_eq!( + node.logs().await.unwrap(), + "Line 1\nLine 2\nLine 3\nLine 4\n" + ); + } + + #[tokio::test] + async fn node_resume_method_should_fails_if_some_error_happened() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); + + // spawn dummy node + let node = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) + .await + .unwrap(); + + // simulate processes advancing + pm.advance_by(3).await; + + // pause the node + node.pause().await.unwrap(); + + // force error + pm.kill_should_error(nix::errno::Errno::EPERM).await; + + let err = node.resume().await.unwrap_err(); + + assert_eq!(err.to_string(), "Failed to resume node 'mynode'"); } #[tokio::test] @@ -1396,12 +1885,21 @@ mod tests { InMemoryFile::file("My file 2"), ), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + StreamValue::Stdout("Line 4\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); let node = namespace .spawn_node( - SpawnNodeOptions::new("mynode", "./testing/dummy_node") + SpawnNodeOptions::new("mynode", "/path/to/my/node_binary") .args(vec![ "-flag1", "--flag2", @@ -1423,91 +1921,155 @@ mod tests { .await .unwrap(); - // wait 3s for node to spawn and start writing logs - sleep(Duration::from_secs(3)).await; + let old_process_id = { + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let node_process = processes.first().unwrap(); - let processes = get_processes_by_name("dummy_node").await; - assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results - let old_process_id = processes.first().unwrap().pid(); - let old_logs_count = node.logs().await.unwrap().lines().count(); + // ensure process has correct state post-pause / pre-resume + assert!(matches!( + node_process.state().await, + FakeProcessState::Running + )); - node.restart(None).await.unwrap(); + // simulate process advance and logs writting + pm.advance_by(2).await; - // wait 3s for node to restart and restart writing logs - sleep(Duration::from_secs(3)).await; + node_process.id + }; + + // ensure logs are correct after some time or timeout + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } - let processes = get_processes_by_name("dummy_node").await; - assert_eq!(processes.len(), 1); // needed to avoid test run in parallel and false results - let node_process = processes.first().unwrap(); + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); - // ensure process has correct state - assert!(matches!( - node_process.stat().unwrap().state().unwrap(), - // process can be running or sleeping because we sleep between echo calls - procfs::process::ProcState::Running | procfs::process::ProcState::Sleeping - )); - - // ensure PID changed - assert_ne!(old_process_id, node_process.pid()); - - // ensure process restarted with correct args - let node_args = node_process.cmdline().unwrap(); - assert!(node_args.contains(&"-flag1".to_string())); - assert!(node_args.contains(&"--flag2".to_string())); - assert!(node_args.contains(&"--option1=value1".to_string())); - assert!(node_args.contains(&"-option2=value2".to_string())); - assert!(node_args.contains(&"--option3 value3".to_string())); - assert!(node_args.contains(&"-option4 value4".to_string())); - - // ensure process restarted with correct environment - let node_env = node_process.environ().unwrap(); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_1").unwrap()) - .unwrap(), - "MY_VALUE_1" - ); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_2").unwrap()) - .unwrap(), - "MY_VALUE_2" - ); - assert_eq!( - node_env - .get(&OsString::from_str("MY_VAR_3").unwrap()) - .unwrap(), - "MY_VALUE_3" - ); + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); - // ensure log writing restarted and they keep being written for some time - timeout(Duration::from_secs(30), async { - let mut expected_logs_line_count = old_logs_count; + // restart node + node.restart(None).await.unwrap(); + // retrieve running process + let processes = pm.processes().await; + assert_eq!(processes.len(), 1); + let process = processes.first().unwrap(); + + // ensure process has correct state post-restart + assert!(matches!(process.state().await, FakeProcessState::Running)); + + // simulate process advance and logs writting + pm.advance_by(2).await; + + // ensure pid changed + assert_ne!(old_process_id, process.id); + + // ensure process is passed correct args after restart + assert!(process + .args + .contains(&OsString::from_str("-flag1").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--flag2").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--option1=value1").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("-option2=value2").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("--option3 value3").unwrap())); + assert!(process + .args + .contains(&OsString::from_str("-option4 value4").unwrap())); + + // ensure process has correct environment after restart + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_1").unwrap(), + OsString::from_str("MY_VALUE_1").unwrap() + ))); + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_2").unwrap(), + OsString::from_str("MY_VALUE_2").unwrap() + ))); + assert!(process.envs.contains(&( + OsString::from_str("MY_VAR_3").unwrap(), + OsString::from_str("MY_VALUE_3").unwrap() + ))); + + // ensure logs are correct after restart, appending to old logs or timeout + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; - - if let Some(file) = fs.files.read().await.get(node.log_path().as_os_str()) { - if let Some(contents) = file.contents() { - if contents.lines().count() >= expected_logs_line_count { - if expected_logs_line_count >= old_logs_count + 6 { - return; - } else { - expected_logs_line_count += 2; - } - } - } + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 4) + { + return; } + + sleep(Duration::from_millis(10)).await; } }) .await .unwrap(); + assert_eq!( + node.logs().await.unwrap(), + "Line 1\nLine 2\nLine 1\nLine 2\n" + ); + // ensure node is present in namespace assert_eq!(namespace.nodes().await.len(), 1); assert!(namespace.nodes().await.get(node.name()).is_some()); } + #[tokio::test] + async fn node_restart_method_should_fails_if_some_error_happened() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); + + // spawn dummy node + let node = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) + .await + .unwrap(); + + // simulate processes advancing + pm.advance_by(3).await; + + // force error + pm.node_kill_should_error(nix::errno::Errno::EPERM).await; + + let err = node.restart(None).await.unwrap_err(); + + assert_eq!(err.to_string(), "Failed to kill node 'mynode'"); + } + #[tokio::test] async fn node_destroy_method_should_destroy_the_node_itfself_and_remove_process_and_stop_logs_writing( ) { @@ -1515,56 +2077,109 @@ mod tests { (OsString::from_str("/").unwrap(), InMemoryFile::dir()), (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), ])); - let provider = NativeProvider::new(fs.clone()); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + StreamValue::Stdout("Line 4\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); let namespace = provider.create_namespace().await.unwrap(); // spawn dummy node let node = namespace - .spawn_node(SpawnNodeOptions::new("mynode", "./testing/dummy_node")) + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) .await .unwrap(); - // wait 3s for node to start and begin writing logs - sleep(Duration::from_secs(3)).await; + // simulate process advancing + pm.advance_by(2).await; - node.destroy().await.unwrap(); + // ensure logs are correct, waiting some time or timeout + timeout(Duration::from_secs(3), async { + loop { + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } + + sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); - // wait node 1s to be killed and stop writing logs - sleep(Duration::from_secs(1)).await; - let logs = node.logs().await.unwrap(); + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); - // ensure process is not running anymore - let processes = get_processes_by_name("dummy_node").await; - assert_eq!(processes.len(), 0); + // destroy the node + node.destroy().await.unwrap(); - // ensure logs are not being written anymore - let _ = timeout(Duration::from_secs(10), async { + // simulate processes advancing + pm.advance_by(2).await; + + // ensure logs are not being written anymore, waiting some time or timeout + timeout(Duration::from_secs(3), async { loop { - sleep(Duration::from_millis(200)).await; + if node + .logs() + .await + .is_ok_and(|logs| logs.lines().count() == 2) + { + return; + } - assert_eq!(logs, node.logs().await.unwrap()); + sleep(Duration::from_millis(10)).await; } }) - .await; + .await + .unwrap(); + + assert_eq!(node.logs().await.unwrap(), "Line 1\nLine 2\n"); + + // ensure process is not running anymore + assert_eq!(pm.processes().await.len(), 0); // ensure node doesn't exists anymore in namespace assert_eq!(namespace.nodes().await.len(), 0); } - async fn get_processes_by_name(name: &str) -> Vec { - procfs::process::all_processes() - .unwrap() - .filter_map(|process| { - if let Ok(process) = process { - process - .cmdline() - .iter() - .any(|args| args.iter().any(|arg| arg.contains(name))) - .then_some(process) - } else { - None - } - }) - .collect::>() + #[tokio::test] + async fn node_destroy_method_should_fails_if_some_error_happened() { + let fs = InMemoryFileSystem::new(HashMap::from([ + (OsString::from_str("/").unwrap(), InMemoryFile::dir()), + (OsString::from_str("/tmp").unwrap(), InMemoryFile::dir()), + ])); + let pm = FakeProcessManager::new(HashMap::from([( + OsString::from_str("/path/to/my/node_binary").unwrap(), + vec![ + StreamValue::Stdout("Line 1\n".to_string()), + StreamValue::Stdout("Line 2\n".to_string()), + StreamValue::Stdout("Line 3\n".to_string()), + ], + )])); + let provider = NativeProvider::new(fs.clone(), pm.clone()).tmp_dir("/tmp"); + let namespace = provider.create_namespace().await.unwrap(); + + // spawn dummy node + let node = namespace + .spawn_node(SpawnNodeOptions::new("mynode", "/path/to/my/node_binary")) + .await + .unwrap(); + + // simulate processes advancing + pm.advance_by(3).await; + + // force error + pm.node_kill_should_error(nix::errno::Errno::EPERM).await; + + let err = node.destroy().await.unwrap_err(); + + assert_eq!(err.to_string(), "Failed to kill node 'mynode'"); } } diff --git a/crates/provider/src/shared/types.rs b/crates/provider/src/shared/types.rs index 1c8466af8..e15efec8c 100644 --- a/crates/provider/src/shared/types.rs +++ b/crates/provider/src/shared/types.rs @@ -20,7 +20,7 @@ impl ProviderCapabilities { pub struct SpawnNodeOptions { pub name: String, - pub command: String, + pub program: String, pub args: Vec, pub env: Vec<(String, String)>, // TODO: naming @@ -32,13 +32,13 @@ pub struct SpawnNodeOptions { } impl SpawnNodeOptions { - pub fn new(name: S, command: S) -> Self + pub fn new(name: S, program: S) -> Self where S: AsRef, { Self { name: name.as_ref().to_string(), - command: command.as_ref().to_string(), + program: program.as_ref().to_string(), args: vec![], env: vec![], injected_files: vec![], @@ -78,20 +78,20 @@ impl SpawnNodeOptions { #[derive(Debug)] pub struct GenerateFileCommand { - pub command: String, + pub program: String, pub args: Vec, pub env: Vec<(String, String)>, pub local_output_path: PathBuf, } impl GenerateFileCommand { - pub fn new(command: S, local_output_path: P) -> Self + pub fn new(program: S, local_output_path: P) -> Self where S: AsRef, P: AsRef, { Self { - command: command.as_ref().to_string(), + program: program.as_ref().to_string(), args: vec![], env: vec![], local_output_path: local_output_path.as_ref().into(), @@ -147,18 +147,18 @@ impl GenerateFilesOptions { } pub struct RunCommandOptions { - pub command: String, + pub program: String, pub args: Vec, pub env: Vec<(String, String)>, } impl RunCommandOptions { - pub fn new(command: S) -> Self + pub fn new(program: S) -> Self where S: AsRef, { Self { - command: command.as_ref().to_string(), + program: program.as_ref().to_string(), args: vec![], env: vec![], } diff --git a/crates/provider/testing/dummy_node b/crates/provider/testing/dummy_node deleted file mode 100755 index 4a6e0cdf1..000000000 --- a/crates/provider/testing/dummy_node +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash - -i=0 - -# infinite loop to simulate long-running process with fake output -while :; do - echo "Line $i" - i=$((i+1)) - # sleep randomly between 1 and 3 (included) seconds - sleep $((RANDOM % 3 + 1)) -done \ No newline at end of file diff --git a/crates/provider/testing/dummy_script b/crates/provider/testing/dummy_script deleted file mode 100644 index 89c039a48..000000000 --- a/crates/provider/testing/dummy_script +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -echo "My script" - -echo "$MY_ENV_VAR" - -if [ "$1" == "-c" ]; then - echo "With args" -fi \ No newline at end of file diff --git a/crates/support/Cargo.toml b/crates/support/Cargo.toml index 3d671bd6a..7393a03ff 100644 --- a/crates/support/Cargo.toml +++ b/crates/support/Cargo.toml @@ -12,4 +12,6 @@ async-trait = { workspace = true } futures = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true, features = ["full"] } -uuid = { workspace = true, features = ["v4"] } \ No newline at end of file +uuid = { workspace = true, features = ["v4"] } +nix = { workspace = true, features = ["signal"] } +rand = { workspace = true } diff --git a/crates/support/src/fs.rs b/crates/support/src/fs.rs index 6afb60c22..952486e0c 100644 --- a/crates/support/src/fs.rs +++ b/crates/support/src/fs.rs @@ -19,31 +19,42 @@ pub type FileSystemResult = Result; #[async_trait] pub trait FileSystem { - async fn create_dir(&self, path: impl AsRef + Send) -> FileSystemResult<()>; - - async fn create_dir_all(&self, path: impl AsRef + Send) -> FileSystemResult<()>; - - async fn read(&self, path: impl AsRef + Send) -> FileSystemResult>; - - async fn read_to_string(&self, path: impl AsRef + Send) -> FileSystemResult; - - async fn write( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()>; - - async fn append( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()>; - - async fn copy( - &self, - from: impl AsRef + Send, - to: impl AsRef + Send, - ) -> FileSystemResult<()>; - - async fn set_mode(&self, path: impl AsRef + Send, perm: u32) -> FileSystemResult<()>; + async fn create_dir

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send; + + async fn create_dir_all

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send; + + async fn read

(&self, path: P) -> FileSystemResult> + where + P: AsRef + Send; + + async fn read_to_string

(&self, path: P) -> FileSystemResult + where + P: AsRef + Send; + + async fn write(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send; + + async fn append(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send; + + async fn copy(&self, from: P1, to: P2) -> FileSystemResult<()> + where + P1: AsRef + Send, + P2: AsRef + Send; + + async fn set_mode

(&self, path: P, perm: u32) -> FileSystemResult<()> + where + P: AsRef + Send; + + async fn exists

(&self, path: P) -> bool + where + P: AsRef + Send; } diff --git a/crates/support/src/fs/in_memory.rs b/crates/support/src/fs/in_memory.rs index 05c9b5389..b90743886 100644 --- a/crates/support/src/fs/in_memory.rs +++ b/crates/support/src/fs/in_memory.rs @@ -1,11 +1,4 @@ -use std::{ - collections::HashMap, - ffi::OsString, - fs::{self, Permissions}, - os::unix::prelude::PermissionsExt, - path::Path, - sync::Arc, -}; +use std::{collections::HashMap, ffi::OsString, path::Path, sync::Arc}; use anyhow::anyhow; use async_trait::async_trait; @@ -15,14 +8,8 @@ use super::{FileSystem, FileSystemResult}; #[derive(Debug, Clone, PartialEq)] pub enum InMemoryFile { - File { - mode: u32, - contents: Vec, - mirror: bool, - }, - Directory { - mode: u32, - }, + File { mode: u32, contents: Vec }, + Directory { mode: u32 }, } impl InMemoryFile { @@ -40,31 +27,6 @@ impl InMemoryFile { Self::File { mode: 0o664, contents: contents.as_ref().to_vec(), - mirror: false, - } - } - - pub fn mirror(path: P, contents: C) -> Self - where - P: AsRef, - C: AsRef, - { - Self::mirror_raw(path, contents.as_ref()) - } - - pub fn mirror_raw(path: P, contents: C) -> Self - where - P: AsRef, - C: AsRef<[u8]>, - { - // mirror file to local filesystem - fs::create_dir_all(path.as_ref().parent().unwrap()).unwrap(); - fs::write(path, contents.as_ref()).unwrap(); - - Self::File { - mode: 0o664, - contents: contents.as_ref().to_vec(), - mirror: true, } } @@ -96,12 +58,6 @@ impl InMemoryFile { Self::Directory { .. } => None, } } - - pub fn set_mirror(&mut self) { - if let Self::File { mirror, .. } = self { - *mirror = true; - }; - } } #[derive(Default, Debug, Clone)] @@ -119,7 +75,10 @@ impl InMemoryFileSystem { #[async_trait] impl FileSystem for InMemoryFileSystem { - async fn create_dir(&self, path: impl AsRef + Send) -> FileSystemResult<()> { + async fn create_dir

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send, + { let path = path.as_ref(); let os_path = path.as_os_str(); @@ -152,7 +111,10 @@ impl FileSystem for InMemoryFileSystem { Ok(()) } - async fn create_dir_all(&self, path: impl AsRef + Send) -> FileSystemResult<()> { + async fn create_dir_all

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send, + { let path = path.as_ref(); let mut files = self.files.write().await; let ancestors = path @@ -176,7 +138,10 @@ impl FileSystem for InMemoryFileSystem { Ok(()) } - async fn read(&self, path: impl AsRef + Send) -> FileSystemResult> { + async fn read

(&self, path: P) -> FileSystemResult> + where + P: AsRef + Send, + { let os_path = path.as_ref().as_os_str(); match self.files.read().await.get(os_path) { @@ -188,7 +153,10 @@ impl FileSystem for InMemoryFileSystem { } } - async fn read_to_string(&self, path: impl AsRef + Send) -> FileSystemResult { + async fn read_to_string

(&self, path: P) -> FileSystemResult + where + P: AsRef + Send, + { let os_path = path.as_ref().as_os_str().to_owned(); let content = self.read(path).await?; @@ -196,11 +164,11 @@ impl FileSystem for InMemoryFileSystem { .map_err(|_| anyhow!("invalid utf-8 encoding for file {:?}", os_path).into()) } - async fn write( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()> { + async fn write(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send, + { let path = path.as_ref(); let os_path = path.as_os_str(); let mut files = self.files.write().await; @@ -225,11 +193,11 @@ impl FileSystem for InMemoryFileSystem { Ok(()) } - async fn append( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()> { + async fn append(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send, + { let path = path.as_ref(); let mut existing_contents = match self.read(path).await { Ok(existing_contents) => existing_contents, @@ -243,50 +211,27 @@ impl FileSystem for InMemoryFileSystem { self.write(path, existing_contents).await } - async fn copy( - &self, - from: impl AsRef + Send, - to: impl AsRef + Send, - ) -> FileSystemResult<()> { + async fn copy(&self, from: P1, to: P2) -> FileSystemResult<()> + where + P1: AsRef + Send, + P2: AsRef + Send, + { let from_ref = from.as_ref(); let to_ref = to.as_ref(); let content = self.read(from_ref).await?; - self.write(to_ref, content).await?; - - // handle mirror file - let mut files = self.files.write().await; - let file = files.get(from_ref.as_os_str()).unwrap(); - if let InMemoryFile::File { - mode, - contents, - mirror, - } = file - { - if *mirror { - fs::create_dir_all(to_ref.parent().unwrap()).unwrap(); - fs::write(to_ref, contents).unwrap(); - fs::set_permissions(to_ref, Permissions::from_mode(*mode)).unwrap(); - files.get_mut(to_ref.as_os_str()).unwrap().set_mirror(); - } - } - Ok(()) + self.write(to_ref, content).await } - async fn set_mode(&self, path: impl AsRef + Send, mode: u32) -> FileSystemResult<()> { + async fn set_mode

(&self, path: P, mode: u32) -> FileSystemResult<()> + where + P: AsRef + Send, + { let os_path = path.as_ref().as_os_str(); if let Some(file) = self.files.write().await.get_mut(os_path) { match file { - InMemoryFile::File { - mode: old_mode, - mirror, - .. - } => { + InMemoryFile::File { mode: old_mode, .. } => { *old_mode = mode; - - if *mirror { - fs::set_permissions(os_path, Permissions::from_mode(mode)).unwrap(); - } }, InMemoryFile::Directory { mode: old_mode, .. } => { *old_mode = mode; @@ -297,6 +242,16 @@ impl FileSystem for InMemoryFileSystem { Err(anyhow!("file {:?} not found", os_path).into()) } } + + async fn exists

(&self, path: P) -> bool + where + P: AsRef + Send, + { + self.files + .read() + .await + .contains_key(path.as_ref().as_os_str()) + } } #[cfg(test)] diff --git a/crates/support/src/fs/local.rs b/crates/support/src/fs/local.rs index ecf34334c..6f7d0e56b 100644 --- a/crates/support/src/fs/local.rs +++ b/crates/support/src/fs/local.rs @@ -10,35 +10,47 @@ pub struct LocalFileSystem; #[async_trait] impl FileSystem for LocalFileSystem { - async fn create_dir(&self, path: impl AsRef + Send) -> FileSystemResult<()> { + async fn create_dir

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send, + { tokio::fs::create_dir(path).await.map_err(Into::into) } - async fn create_dir_all(&self, path: impl AsRef + Send) -> FileSystemResult<()> { + async fn create_dir_all

(&self, path: P) -> FileSystemResult<()> + where + P: AsRef + Send, + { tokio::fs::create_dir_all(path).await.map_err(Into::into) } - async fn read(&self, path: impl AsRef + Send) -> FileSystemResult> { + async fn read

(&self, path: P) -> FileSystemResult> + where + P: AsRef + Send, + { tokio::fs::read(path).await.map_err(Into::into) } - async fn read_to_string(&self, path: impl AsRef + Send) -> FileSystemResult { + async fn read_to_string

(&self, path: P) -> FileSystemResult + where + P: AsRef + Send, + { tokio::fs::read_to_string(path).await.map_err(Into::into) } - async fn write( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()> { + async fn write(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send, + { tokio::fs::write(path, contents).await.map_err(Into::into) } - async fn append( - &self, - path: impl AsRef + Send, - contents: impl AsRef<[u8]> + Send, - ) -> FileSystemResult<()> { + async fn append(&self, path: P, contents: C) -> FileSystemResult<()> + where + P: AsRef + Send, + C: AsRef<[u8]> + Send, + { let contents = contents.as_ref(); let mut file = tokio::fs::OpenOptions::new() .create(true) @@ -54,22 +66,32 @@ impl FileSystem for LocalFileSystem { file.flush().await.and(Ok(())).map_err(Into::into) } - async fn copy( - &self, - from: impl AsRef + Send, - to: impl AsRef + Send, - ) -> FileSystemResult<()> { + async fn copy(&self, from: P1, to: P2) -> FileSystemResult<()> + where + P1: AsRef + Send, + P2: AsRef + Send, + { tokio::fs::copy(from, to) .await .and(Ok(())) .map_err(Into::into) } - async fn set_mode(&self, path: impl AsRef + Send, mode: u32) -> FileSystemResult<()> { + async fn set_mode

(&self, path: P, mode: u32) -> FileSystemResult<()> + where + P: AsRef + Send, + { tokio::fs::set_permissions(path, Permissions::from_mode(mode)) .await .map_err(Into::into) } + + async fn exists

(&self, path: P) -> bool + where + P: AsRef + Send, + { + path.as_ref().exists() + } } #[cfg(test)] diff --git a/crates/support/src/lib.rs b/crates/support/src/lib.rs index 57ee38c4b..653afb60e 100644 --- a/crates/support/src/lib.rs +++ b/crates/support/src/lib.rs @@ -1,2 +1,3 @@ pub mod fs; pub mod net; +pub mod process; diff --git a/crates/support/src/process.rs b/crates/support/src/process.rs new file mode 100644 index 000000000..581edc881 --- /dev/null +++ b/crates/support/src/process.rs @@ -0,0 +1,119 @@ +use std::{ + ffi::{OsStr, OsString}, + fmt::Debug, + io, + process::Stdio, + sync::Arc, +}; + +use async_trait::async_trait; +use tokio::io::AsyncRead; + +pub mod fake; +pub mod os; + +pub struct Command { + program: OsString, + args: Vec, + envs: Vec<(OsString, OsString)>, + stdin: Option, + stdout: Option, + stderr: Option, + kill_on_drop: bool, +} + +impl Command { + pub fn new(program: S) -> Self + where + S: AsRef, + { + Self { + program: program.as_ref().to_os_string(), + args: vec![], + envs: vec![], + stdin: None, + stdout: None, + stderr: None, + kill_on_drop: false, + } + } + + pub fn args(mut self, args: I) -> Self + where + I: IntoIterator, + S: AsRef, + { + self.args = args + .into_iter() + .map(|arg| arg.as_ref().to_os_string()) + .collect(); + self + } + + pub fn envs(mut self, vars: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: AsRef, + { + self.envs = vars + .into_iter() + .map(|(key, val)| (key.as_ref().to_os_string(), val.as_ref().to_os_string())) + .collect(); + self + } + + pub fn stdin(mut self, cfg: T) -> Self + where + T: Into, + { + self.stdin = Some(cfg.into()); + self + } + + pub fn stdout(mut self, cfg: T) -> Self + where + T: Into, + { + self.stdout = Some(cfg.into()); + self + } + + pub fn stderr(mut self, cfg: T) -> Self + where + T: Into, + { + self.stderr = Some(cfg.into()); + self + } + + pub fn kill_on_drop(mut self, kill_on_drop: bool) -> Self { + self.kill_on_drop = kill_on_drop; + self + } +} + +pub type DynAsyncRead = Box; + +#[async_trait] +pub trait Process: Debug { + async fn id(&self) -> Option; + async fn take_stdout(&self) -> Option; + async fn take_stderr(&self) -> Option; + async fn kill(&self) -> io::Result<()>; +} + +pub type DynProcess = Arc; + +#[async_trait] +pub trait ProcessManager { + async fn spawn(&self, command: Command) -> io::Result; + + async fn output(&self, command: Command) -> io::Result; + + async fn kill(&self, pid: nix::unistd::Pid, signal: T) -> nix::Result<()> + where + T: Into> + Send; +} + +pub type DynProcessManager = Arc; diff --git a/crates/support/src/process/fake.rs b/crates/support/src/process/fake.rs new file mode 100644 index 000000000..d11e4a209 --- /dev/null +++ b/crates/support/src/process/fake.rs @@ -0,0 +1,365 @@ +use std::{ + collections::HashMap, ffi::OsString, os::unix::process::ExitStatusExt, process::ExitStatus, + sync::Arc, +}; + +use async_trait::async_trait; +use nix::sys::signal::Signal; +use rand; +use tokio::{ + io::AsyncRead, + sync::{mpsc, RwLock}, +}; + +use super::{Command, DynAsyncRead, DynProcess, Process, ProcessManager}; + +#[derive(Debug, Clone)] +pub enum FakeProcessState { + Running, + Stopped, +} + +#[derive(Debug)] +pub struct FakeStdStream(mpsc::Receiver); + +impl AsyncRead for FakeStdStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let data = self.0.poll_recv(cx); + + match data { + std::task::Poll::Ready(Some(chunk)) => { + buf.put_slice(chunk.as_bytes()); + std::task::Poll::Ready(Ok(())) + }, + std::task::Poll::Ready(None) => { + buf.put_slice(&[]); + std::task::Poll::Ready(Ok(())) + }, + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +#[derive(Debug)] +pub struct FakeProcess { + pub id: u32, + pub program: OsString, + pub args: Vec, + pub envs: Vec<(OsString, OsString)>, + inner: RwLock, + process_manager: FakeProcessManager, +} + +impl FakeProcess { + pub async fn state(&self) -> FakeProcessState { + self.inner.read().await.state.clone() + } +} + +#[derive(Debug)] +struct FakeProcessInner { + state: FakeProcessState, + stream_values: Vec, + stdout_tx: mpsc::Sender, + stderr_tx: mpsc::Sender, + stdout: Option, + stderr: Option, +} + +#[async_trait] +impl Process for FakeProcess { + async fn id(&self) -> Option { + Some(self.id) + } + + async fn take_stdout(&self) -> Option { + self.inner + .write() + .await + .stdout + .take() + .map(|stdout| Box::new(stdout) as DynAsyncRead) + } + + async fn take_stderr(&self) -> Option { + self.inner + .write() + .await + .stderr + .take() + .map(|stderr| Box::new(stderr) as DynAsyncRead) + } + + async fn kill(&self) -> std::io::Result<()> { + let mut pm_inner = self.process_manager.inner.write().await; + + if let Some(errno) = pm_inner.node_kill_should_error { + return Err(errno.into()); + } + + pm_inner.processes.remove(&self.id); + + Ok(()) + } +} + +pub type StreamValueGeneratorFn = + Arc, Vec<(OsString, OsString)>) -> String + Send + Sync>; + +#[derive(Clone)] +pub struct DynamicStreamValue(StreamValueGeneratorFn); + +impl DynamicStreamValue { + pub fn new(f: F) -> Self + where + F: Fn(OsString, Vec, Vec<(OsString, OsString)>) -> String + Send + Sync + 'static, + { + Self(Arc::new(f)) + } +} + +impl std::fmt::Debug for DynamicStreamValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Fn(OsString, Vec, Vec<(OsString, OsString)>) -> String" + ) + } +} + +#[derive(Debug, Clone)] +pub enum StreamValue { + Stdout(String), + Stderr(String), + DynamicStdout(DynamicStreamValue), + DynamicStderr(DynamicStreamValue), +} + +#[derive(Clone, Debug)] +pub struct FakeProcessManager { + inner: Arc>, +} + +#[derive(Debug)] +pub struct FakeProcessManagerInner { + processes: HashMap>, + streams: HashMap>, + spawn_should_error: Option, + output_should_fail: Option, + output_should_error: Option, + kill_should_error: Option, + node_kill_should_error: Option, +} + +impl FakeProcessManager { + pub fn new(streams: HashMap>) -> Self { + FakeProcessManager { + inner: Arc::new(RwLock::new(FakeProcessManagerInner { + processes: HashMap::new(), + streams, + spawn_should_error: None, + output_should_error: None, + output_should_fail: None, + kill_should_error: None, + node_kill_should_error: None, + })), + } + } + + pub async fn spawn_should_error(&self, err_kind: std::io::ErrorKind) { + let mut inner = self.inner.write().await; + inner.spawn_should_error = Some(err_kind); + } + + pub async fn output_should_error(&self, err_kind: std::io::ErrorKind) { + let mut inner = self.inner.write().await; + inner.output_should_error = Some(err_kind); + } + + pub async fn output_should_fail(&self, exit_code: ExitStatus) { + let mut inner = self.inner.write().await; + inner.output_should_fail = Some(exit_code); + } + + pub async fn kill_should_error(&self, errno: nix::errno::Errno) { + let mut inner = self.inner.write().await; + inner.kill_should_error = Some(errno); + } + + pub async fn node_kill_should_error(&self, errno: nix::errno::Errno) { + let mut inner = self.inner.write().await; + inner.node_kill_should_error = Some(errno); + } + + pub async fn push_stream(&self, program: OsString, values: Vec) { + self.inner.write().await.streams.insert(program, values); + } + + pub async fn processes(&self) -> Vec> { + self.inner + .write() + .await + .processes + .values() + .map(Arc::clone) + .collect() + } + + pub async fn count(&self) -> usize { + self.inner.read().await.processes.len() + } + + pub async fn advance_by(&self, cycles: usize) { + for (_, process) in self.inner.write().await.processes.iter() { + let mut inner = process.inner.write().await; + + for _ in 0..cycles { + if !inner.stream_values.is_empty() + && matches!(inner.state, FakeProcessState::Running) + { + let data = inner.stream_values.remove(0); + match data { + StreamValue::Stdout(stdout_chunk) => { + inner.stdout_tx.send(stdout_chunk).await.unwrap() + }, + StreamValue::Stderr(stderr_chunk) => { + inner.stderr_tx.send(stderr_chunk).await.unwrap() + }, + StreamValue::DynamicStderr(DynamicStreamValue(f)) => inner + .stderr_tx + .send(f( + process.program.clone(), + process.args.clone(), + process.envs.clone(), + )) + .await + .unwrap(), + StreamValue::DynamicStdout(DynamicStreamValue(f)) => inner + .stdout_tx + .send(f( + process.program.clone(), + process.args.clone(), + process.envs.clone(), + )) + .await + .unwrap(), + }; + } + } + } + } +} + +#[async_trait] +impl ProcessManager for FakeProcessManager { + async fn spawn(&self, command: Command) -> std::io::Result { + if let Some(err_kind) = self.inner.read().await.spawn_should_error { + return Err(err_kind.into()); + } + + let mut inner = self.inner.write().await; + let stream_values = inner + .streams + .get(&command.program) + .cloned() + .unwrap_or_default(); + let (stdout_tx, stdout_rx) = mpsc::channel(10); + let (stderr_tx, stderr_rx) = mpsc::channel(10); + + let process = Arc::new(FakeProcess { + id: rand::random::() as u32, + program: command.program, + args: command.args, + envs: command.envs, + inner: RwLock::new(FakeProcessInner { + state: FakeProcessState::Running, + stream_values, + stdout_tx, + stderr_tx, + stdout: Some(FakeStdStream(stdout_rx)), + stderr: Some(FakeStdStream(stderr_rx)), + }), + process_manager: self.clone(), + }); + + inner.processes.insert(process.id, Arc::clone(&process)); + + Ok(process) + } + + async fn output(&self, command: Command) -> std::io::Result { + if let Some(err_kind) = self.inner.read().await.output_should_error { + return Err(err_kind.into()); + } + + let stream_values = self + .inner + .read() + .await + .streams + .get(&command.program) + .cloned() + .unwrap_or_default(); + + let (stdout, stderr) = stream_values.into_iter().fold( + (String::new(), String::new()), + |(mut stdout, mut stderr), value| { + match value { + StreamValue::Stdout(stdout_chunk) => stdout.push_str(&stdout_chunk), + StreamValue::Stderr(stderr_chunk) => stderr.push_str(&stderr_chunk), + StreamValue::DynamicStdout(DynamicStreamValue(f)) => stdout.push_str(&f( + command.program.clone(), + command.args.clone(), + command.envs.clone(), + )), + StreamValue::DynamicStderr(DynamicStreamValue(f)) => stderr.push_str(&f( + command.program.clone(), + command.args.clone(), + command.envs.clone(), + )), + } + (stdout, stderr) + }, + ); + + Ok(std::process::Output { + status: self + .inner + .read() + .await + .output_should_fail + .unwrap_or(ExitStatus::from_raw(0)), + stdout: stdout.as_bytes().to_vec(), + stderr: stderr.as_bytes().to_vec(), + }) + } + + async fn kill(&self, pid: nix::unistd::Pid, signal: T) -> nix::Result<()> + where + T: Into> + Send, + { + if let Some(errno) = self.inner.read().await.kill_should_error { + return Err(errno); + } + + let pid = pid.as_raw().try_into().unwrap(); + let processes = &self.inner.write().await.processes; + let process_state = &mut processes.get(&pid).unwrap().inner.write().await.state; + + match (process_state.clone(), signal.into()) { + (FakeProcessState::Running, Some(Signal::SIGSTOP)) => { + *process_state = FakeProcessState::Stopped; + }, + (FakeProcessState::Stopped, Some(Signal::SIGCONT)) => { + *process_state = FakeProcessState::Running + }, + _ => {}, + } + + Ok(()) + } +} diff --git a/crates/support/src/process/os.rs b/crates/support/src/process/os.rs new file mode 100644 index 000000000..1180cc4a2 --- /dev/null +++ b/crates/support/src/process/os.rs @@ -0,0 +1,103 @@ +use std::{io, sync::Arc}; + +use async_trait::async_trait; +use tokio::sync::RwLock; + +use super::DynAsyncRead; +use crate::process::{Command, DynProcess, Process, ProcessManager}; + +#[derive(Debug)] +struct OsProcess { + child: RwLock, +} + +#[async_trait] +impl Process for OsProcess { + async fn id(&self) -> Option { + self.child.read().await.id() + } + + async fn take_stdout(&self) -> Option { + self.child + .write() + .await + .stdout + .take() + .map(|stdout| Box::new(stdout) as DynAsyncRead) + } + + async fn take_stderr(&self) -> Option { + self.child + .write() + .await + .stderr + .take() + .map(|stderr| Box::new(stderr) as DynAsyncRead) + } + + async fn kill(&self) -> io::Result<()> { + self.child.write().await.kill().await + } +} + +#[derive(Clone)] +pub struct OsProcessManager; + +impl OsProcessManager { + fn create_base_command(command: Command) -> tokio::process::Command { + let mut cmd = tokio::process::Command::new(command.program.clone()); + + if !command.args.is_empty() { + cmd.args(command.args.clone()); + } + + if !command.envs.is_empty() { + cmd.envs(command.envs.clone()); + } + + if let Some(stdin) = command.stdin { + cmd.stdin(stdin); + } + + if let Some(stdout) = command.stdout { + cmd.stdout(stdout); + } + + if let Some(stderr) = command.stderr { + cmd.stderr(stderr); + } + + cmd + } +} + +#[async_trait] +impl ProcessManager for OsProcessManager { + async fn spawn(&self, command: Command) -> io::Result { + let kill_on_drop = command.kill_on_drop; + let mut base_command = OsProcessManager::create_base_command(command); + + if kill_on_drop { + base_command.kill_on_drop(true); + } + + Ok(base_command.spawn().map(|child| { + Arc::new(OsProcess { + child: RwLock::new(child), + }) + })?) + } + + async fn output(&self, command: Command) -> io::Result { + let mut base_command = OsProcessManager::create_base_command(command); + + Ok(base_command.output().await?) + } + + async fn kill(&self, pid: nix::unistd::Pid, signal: T) -> nix::Result<()> + where + T: Into> + Send, + { + nix::sys::signal::kill(pid, signal) + } +}