diff --git a/dsc/tests/dsc.exit_code.tests.ps1 b/dsc/tests/dsc.exit_code.tests.ps1 index 05bec814..1e7cd37f 100644 --- a/dsc/tests/dsc.exit_code.tests.ps1 +++ b/dsc/tests/dsc.exit_code.tests.ps1 @@ -3,12 +3,12 @@ Describe 'exit code tests' { It 'non-zero exit code in manifest has corresponding message' { - $result = dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2>&1 - $result | Should -Match 'ERROR.*?[Exit code 8].*?manifest description: Placeholder from manifest for exit code 8' + dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2> $TestDrive/tracing.txt + "$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Placeholder from manifest for exit code 8' } It 'non-zero exit code not in manifest has generic message' { - $result = dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2>&1 - $result | Should -Match 'ERROR.*?Error.*?[Exit code 1]' + dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2> $TestDrive/tracing.txt + "$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Exit code 1' } It 'success exit code executes without error' { $result = dsc resource get -r Test/ExitCode --input "{ exitCode: 0 }" | ConvertFrom-Json diff --git a/dsc_lib/Cargo.toml b/dsc_lib/Cargo.toml index 160a6719..6148640c 100644 --- a/dsc_lib/Cargo.toml +++ b/dsc_lib/Cargo.toml @@ -18,6 +18,7 @@ serde_yaml = { version = "0.9.3" } thiserror = "1.0" security_context_lib = { path = "../security_context_lib" } semver = "1.0" +tokio = { version = "1.38.1", features = ["full"] } tracing = "0.1.37" tracing-indicatif = { version = "0.3.6" } tree-sitter = "0.22" diff --git a/dsc_lib/src/dscresources/command_resource.rs b/dsc_lib/src/dscresources/command_resource.rs index 0c11f91b..30f87b66 100644 --- a/dsc_lib/src/dscresources/command_resource.rs +++ b/dsc_lib/src/dscresources/command_resource.rs @@ -3,11 +3,12 @@ use jsonschema::JSONSchema; use serde_json::Value; -use std::{collections::HashMap, env, io::{Read, Write}, process::{Command, Stdio}}; +use std::{collections::HashMap, env, process::Stdio}; use crate::{configure::{config_doc::ExecutionKind, {config_result::ResourceGetResult, parameters, Configurator}}, util::parse_input_to_json}; use crate::dscerror::DscError; use super::{dscresource::get_diff, invoke_result::{ExportResult, GetResult, ResolveResult, SetResult, TestResult, ValidateResult, ResourceGetResponse, ResourceSetResponse, ResourceTestResponse, get_in_desired_state}, resource_manifest::{ArgKind, InputKind, Kind, ResourceManifest, ReturnKind, SchemaKind}}; use tracing::{error, warn, info, debug, trace}; +use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command}; pub const EXIT_PROCESS_TERMINATED: i32 = 0x102; @@ -553,7 +554,7 @@ pub fn invoke_resolve(resource: &ResourceManifest, cwd: &str, input: &str) -> Re Ok(result) } -/// Invoke a command and return the exit code, stdout, and stderr. +/// Asynchronously invoke a command and return the exit code, stdout, and stderr. /// /// # Arguments /// @@ -561,13 +562,19 @@ pub fn invoke_resolve(resource: &ResourceManifest, cwd: &str, input: &str) -> Re /// * `args` - Optional arguments to pass to the command /// * `input` - Optional input to pass to the command /// * `cwd` - Optional working directory to execute the command in +/// * `env` - Optional environment variable mappings to add or update +/// * `exit_codes` - Optional descriptions of exit codes /// /// # Errors /// /// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened. -#[allow(clippy::implicit_hasher)] -pub fn invoke_command(executable: &str, args: Option>, input: Option<&str>, cwd: Option<&str>, env: Option>, exit_codes: &Option>) -> Result<(i32, String, String), DscError> { - debug!("Invoking command '{}' with args {:?}", executable, args); +/// +async fn run_process_async(executable: &str, args: Option>, input: Option<&str>, cwd: Option<&str>, env: Option>, exit_codes: &Option>) -> Result<(i32, String, String), DscError> { + + // use somewhat large initial buffer to avoid early string reallocations; + // the value is based on list result of largest of built-in adapters - WMI adapter ~500KB + const INITIAL_BUFFER_CAPACITY: usize = 1024*1024; + let mut command = Command::new(executable); if input.is_some() { command.stdin(Stdio::piped()); @@ -583,62 +590,110 @@ pub fn invoke_command(executable: &str, args: Option>, input: Option if let Some(env) = env { command.envs(env); } - if executable == "dsc" && env::var("DEBUG_DSC").is_ok() { // remove this env var from child process as it will fail reading from keyboard to allow attaching command.env_remove("DEBUG_DSC"); } - let mut child = command.spawn()?; + let mut child = match command.spawn() { + Ok(c) => c, + Err(e) => { + return Err(DscError::CommandOperation(e.to_string(), executable.to_string())) + } + }; + + let stdout = child.stdout.take().expect("child did not have a handle to stdout"); + let stderr = child.stderr.take().expect("child did not have a handle to stderr"); + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); + if let Some(input) = input { trace!("Writing to command STDIN: {input}"); - // pipe to child stdin in a scope so that it is dropped before we wait - // otherwise the pipe isn't closed and the child process waits forever - let Some(mut child_stdin) = child.stdin.take() else { - return Err(DscError::CommandOperation("Failed to open stdin".to_string(), executable.to_string())); - }; - child_stdin.write_all(input.as_bytes())?; - child_stdin.flush()?; + let mut stdin = child.stdin.take().expect("child did not have a handle to stdin"); + stdin.write_all(input.as_bytes()).await.expect("could not write to stdin"); + drop(stdin); } - let Some(mut child_stdout) = child.stdout.take() else { - return Err(DscError::CommandOperation("Failed to open stdout".to_string(), executable.to_string())); + let Some(child_id) = child.id() else { + return Err(DscError::CommandOperation("Can't get child process id".to_string(), executable.to_string())); }; - let mut stdout_buf = Vec::new(); - child_stdout.read_to_end(&mut stdout_buf)?; - let Some(mut child_stderr) = child.stderr.take() else { - return Err(DscError::CommandOperation("Failed to open stderr".to_string(), executable.to_string())); - }; - let mut stderr_buf = Vec::new(); - child_stderr.read_to_end(&mut stderr_buf)?; - - let exit_status = child.wait()?; - let exit_code = exit_status.code().unwrap_or(EXIT_PROCESS_TERMINATED); - let stdout = String::from_utf8_lossy(&stdout_buf).to_string(); - let stderr = String::from_utf8_lossy(&stderr_buf).to_string(); - if !stdout.is_empty() { - trace!("STDOUT returned: {}", &stdout); - } - let cleaned_stderr = if stderr.is_empty() { - stderr - } else { - trace!("STDERR returned data to be traced"); - log_resource_traces(executable, &child.id(), &stderr); - // TODO: remove logged traces from STDERR - String::new() - }; + let child_task = tokio::spawn(async move { + child.wait().await + }); - if exit_code != 0 { - if let Some(exit_codes) = exit_codes { - if let Some(error_message) = exit_codes.get(&exit_code) { - return Err(DscError::CommandExitFromManifest(executable.to_string(), exit_code, error_message.to_string())); + let stdout_task = tokio::spawn(async move { + let mut stdout_result = String::with_capacity(INITIAL_BUFFER_CAPACITY); + while let Ok(Some(line)) = stdout_reader.next_line().await { + stdout_result.push_str(&line); + stdout_result.push('\n'); + } + stdout_result + }); + + let stderr_task = tokio::spawn(async move { + let mut filtered_stderr = String::with_capacity(INITIAL_BUFFER_CAPACITY); + while let Ok(Some(stderr_line)) = stderr_reader.next_line().await { + let filtered_stderr_line = log_stderr_line(&child_id, &stderr_line); + if !filtered_stderr_line.is_empty() { + filtered_stderr.push_str(filtered_stderr_line); + filtered_stderr.push('\n'); + } + } + filtered_stderr + }); + + let exit_code = child_task.await.unwrap()?.code(); + let stdout_result = stdout_task.await.unwrap(); + let stderr_result = stderr_task.await.unwrap(); + + if let Some(code) = exit_code { + debug!("Process '{executable}' id {child_id} exited with code {code}"); + + if code != 0 { + if let Some(exit_codes) = exit_codes { + if let Some(error_message) = exit_codes.get(&code) { + return Err(DscError::CommandExitFromManifest(executable.to_string(), code, error_message.to_string())); + } } + return Err(DscError::Command(executable.to_string(), code, stderr_result)); } - return Err(DscError::Command(executable.to_string(), exit_code, cleaned_stderr)); + + Ok((code, stdout_result, stderr_result)) + } else { + debug!("Process '{executable}' id {child_id} terminated by signal"); + Err(DscError::CommandOperation("Process terminated by signal".to_string(), executable.to_string())) } +} + +/// Invoke a command and return the exit code, stdout, and stderr. +/// +/// # Arguments +/// +/// * `executable` - The command to execute +/// * `args` - Optional arguments to pass to the command +/// * `input` - Optional input to pass to the command +/// * `cwd` - Optional working directory to execute the command in +/// * `env` - Optional environment variable mappings to add or update +/// * `exit_codes` - Optional descriptions of exit codes +/// +/// # Errors +/// +/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened. +/// +/// # Panics +/// +/// Will panic if tokio runtime can't be created. +/// +#[allow(clippy::implicit_hasher)] +pub fn invoke_command(executable: &str, args: Option>, input: Option<&str>, cwd: Option<&str>, env: Option>, exit_codes: &Option>) -> Result<(i32, String, String), DscError> { + debug!("Invoking command '{}' with args {:?}", executable, args); - Ok((exit_code, stdout, cleaned_stderr)) + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(run_process_async(executable, args, input, cwd, env, exit_codes)) } fn process_args(args: &Option>, value: &str) -> Option> { @@ -784,30 +839,31 @@ fn json_to_hashmap(json: &str) -> Result, DscError> { /// /// * `process_name` - The name of the process /// * `process_id` - The ID of the process -/// * `stderr` - The stderr output from the process -pub fn log_resource_traces(process_name: &str, process_id: &u32, stderr: &str) +/// * `trace_line` - The stderr line from the process +pub fn log_stderr_line<'a>(process_id: &u32, trace_line: &'a str) -> &'a str { - if !stderr.is_empty() + if !trace_line.is_empty() { - for trace_line in stderr.lines() { - if let Result::Ok(json_obj) = serde_json::from_str::(trace_line) { - if let Some(msg) = json_obj.get("Error") { - error!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default()); - } else if let Some(msg) = json_obj.get("Warning") { - warn!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default()); - } else if let Some(msg) = json_obj.get("Info") { - info!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default()); - } else if let Some(msg) = json_obj.get("Debug") { - debug!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default()); - } else if let Some(msg) = json_obj.get("Trace") { - trace!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default()); - } else { - // TODO: deserialize tracing JSON to have better presentation - trace!("Process '{process_name}' id {process_id} : {trace_line}"); - }; + if let Result::Ok(json_obj) = serde_json::from_str::(trace_line) { + if let Some(msg) = json_obj.get("Error") { + error!("Process id {process_id} : {}", msg.as_str().unwrap_or_default()); + } else if let Some(msg) = json_obj.get("Warning") { + warn!("Process id {process_id} : {}", msg.as_str().unwrap_or_default()); + } else if let Some(msg) = json_obj.get("Info") { + info!("Process id {process_id} : {}", msg.as_str().unwrap_or_default()); + } else if let Some(msg) = json_obj.get("Debug") { + debug!("Process id {process_id} : {}", msg.as_str().unwrap_or_default()); + } else if let Some(msg) = json_obj.get("Trace") { + trace!("Process id {process_id} : {}", msg.as_str().unwrap_or_default()); } else { - trace!("Process '{process_name}' id {process_id} : {trace_line}"); - } + // the line is a valid json, but not one of standard trace lines - return it as filtered stderr_line + return trace_line; + }; + } else { + // the line is not a valid json - return it as filtered stderr_line + return trace_line; } - } + }; + + "" } diff --git a/powershell-adapter/Tests/TestClassResource/0.0.1/TestClassResource.psm1 b/powershell-adapter/Tests/TestClassResource/0.0.1/TestClassResource.psm1 index f561c68a..71eb6afc 100644 --- a/powershell-adapter/Tests/TestClassResource/0.0.1/TestClassResource.psm1 +++ b/powershell-adapter/Tests/TestClassResource/0.0.1/TestClassResource.psm1 @@ -59,7 +59,11 @@ class TestClassResource : BaseTestClass static [TestClassResource[]] Export() { $resultList = [List[TestClassResource]]::new() - 1..5 | %{ + $resultCount = 5 + if ($env:TestClassResourceResultCount) { + $resultCount = $env:TestClassResourceResultCount + } + 1..$resultCount | %{ $obj = New-Object TestClassResource $obj.Name = "Object$_" $obj.Prop1 = "Property of object$_" diff --git a/powershell-adapter/Tests/powershellgroup.resource.tests.ps1 b/powershell-adapter/Tests/powershellgroup.resource.tests.ps1 index 3864799f..abe380e4 100644 --- a/powershell-adapter/Tests/powershellgroup.resource.tests.ps1 +++ b/powershell-adapter/Tests/powershellgroup.resource.tests.ps1 @@ -241,4 +241,15 @@ Describe 'PowerShell adapter resource tests' { $env:PATH = $oldPath } } + + It 'Dsc can process large resource output' -Tag z1{ + $env:TestClassResourceResultCount = 5000 # with sync resource invocations this was not possible + + $r = dsc resource export -r TestClassResource/TestClassResource + $LASTEXITCODE | Should -Be 0 + $res = $r | ConvertFrom-Json + $res.resources[0].properties.result.count | Should -Be 5000 + + $env:TestClassResourceResultCount = $null + } }