Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async resource process invocation #493

Merged
merged 11 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dsc/tests/dsc.exit_code.tests.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

Describe 'exit code tests' {
It 'non-zero exit code in manifest has corresponding message' {
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2>&1
$result | Should -Match 'ERROR.*?[Exit code 8].*?manifest description: Placeholder from manifest for exit code 8'
dsc resource get -r Test/ExitCode --input "{ exitCode: 8 }" 2> $TestDrive/tracing.txt
"$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Placeholder from manifest for exit code 8'
}
It 'non-zero exit code not in manifest has generic message' {
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2>&1
$result | Should -Match 'ERROR.*?Error.*?[Exit code 1]'
dsc resource get -r Test/ExitCode --input "{ exitCode: 1 }" 2> $TestDrive/tracing.txt
"$TestDrive/tracing.txt" | Should -FileContentMatchExactly 'Exit code 1'
}
It 'success exit code executes without error' {
$result = dsc resource get -r Test/ExitCode --input "{ exitCode: 0 }" | ConvertFrom-Json
Expand Down
1 change: 1 addition & 0 deletions dsc_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde_yaml = { version = "0.9.3" }
thiserror = "1.0"
security_context_lib = { path = "../security_context_lib" }
semver = "1.0"
tokio = { version = "1.38.1", features = ["full"] }
tracing = "0.1.37"
tracing-indicatif = { version = "0.3.6" }
tree-sitter = "0.22"
Expand Down
190 changes: 123 additions & 67 deletions dsc_lib/src/dscresources/command_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

use jsonschema::JSONSchema;
use serde_json::Value;
use std::{collections::HashMap, env, io::{Read, Write}, process::{Command, Stdio}};
use std::{collections::HashMap, env, process::Stdio};
use crate::{configure::{config_doc::ExecutionKind, {config_result::ResourceGetResult, parameters, Configurator}}, util::parse_input_to_json};
use crate::dscerror::DscError;
use super::{dscresource::get_diff, invoke_result::{ExportResult, GetResult, ResolveResult, SetResult, TestResult, ValidateResult, ResourceGetResponse, ResourceSetResponse, ResourceTestResponse, get_in_desired_state}, resource_manifest::{ArgKind, InputKind, Kind, ResourceManifest, ReturnKind, SchemaKind}};
use tracing::{error, warn, info, debug, trace};
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command};

pub const EXIT_PROCESS_TERMINATED: i32 = 0x102;

Expand Down Expand Up @@ -553,21 +554,27 @@ pub fn invoke_resolve(resource: &ResourceManifest, cwd: &str, input: &str) -> Re
Ok(result)
}

/// Invoke a command and return the exit code, stdout, and stderr.
/// Asynchronously invoke a command and return the exit code, stdout, and stderr.
///
/// # Arguments
///
/// * `executable` - The command to execute
/// * `args` - Optional arguments to pass to the command
/// * `input` - Optional input to pass to the command
/// * `cwd` - Optional working directory to execute the command in
/// * `env` - Optional environment variable mappings to add or update
/// * `exit_codes` - Optional descriptions of exit codes
///
/// # Errors
///
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
#[allow(clippy::implicit_hasher)]
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
debug!("Invoking command '{}' with args {:?}", executable, args);
///
async fn run_process_async(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
anmenaga marked this conversation as resolved.
Show resolved Hide resolved

// use somewhat large initial buffer to avoid early string reallocations;
// the value is based on list result of largest of built-in adapters - WMI adapter ~500KB
const INITIAL_BUFFER_CAPACITY: usize = 1024*1024;

let mut command = Command::new(executable);
if input.is_some() {
command.stdin(Stdio::piped());
Expand All @@ -583,62 +590,110 @@ pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option
if let Some(env) = env {
command.envs(env);
}

if executable == "dsc" && env::var("DEBUG_DSC").is_ok() {
// remove this env var from child process as it will fail reading from keyboard to allow attaching
command.env_remove("DEBUG_DSC");
}

let mut child = command.spawn()?;
let mut child = match command.spawn() {
Ok(c) => c,
Err(e) => {
return Err(DscError::CommandOperation(e.to_string(), executable.to_string()))
}
};

let stdout = child.stdout.take().expect("child did not have a handle to stdout");
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();

if let Some(input) = input {
trace!("Writing to command STDIN: {input}");
// pipe to child stdin in a scope so that it is dropped before we wait
// otherwise the pipe isn't closed and the child process waits forever
let Some(mut child_stdin) = child.stdin.take() else {
return Err(DscError::CommandOperation("Failed to open stdin".to_string(), executable.to_string()));
};
child_stdin.write_all(input.as_bytes())?;
child_stdin.flush()?;
let mut stdin = child.stdin.take().expect("child did not have a handle to stdin");
stdin.write_all(input.as_bytes()).await.expect("could not write to stdin");
drop(stdin);
}

let Some(mut child_stdout) = child.stdout.take() else {
return Err(DscError::CommandOperation("Failed to open stdout".to_string(), executable.to_string()));
let Some(child_id) = child.id() else {
return Err(DscError::CommandOperation("Can't get child process id".to_string(), executable.to_string()));
};
let mut stdout_buf = Vec::new();
child_stdout.read_to_end(&mut stdout_buf)?;

let Some(mut child_stderr) = child.stderr.take() else {
return Err(DscError::CommandOperation("Failed to open stderr".to_string(), executable.to_string()));
};
let mut stderr_buf = Vec::new();
child_stderr.read_to_end(&mut stderr_buf)?;

let exit_status = child.wait()?;
let exit_code = exit_status.code().unwrap_or(EXIT_PROCESS_TERMINATED);
let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
if !stdout.is_empty() {
trace!("STDOUT returned: {}", &stdout);
}
let cleaned_stderr = if stderr.is_empty() {
stderr
} else {
trace!("STDERR returned data to be traced");
log_resource_traces(executable, &child.id(), &stderr);
// TODO: remove logged traces from STDERR
String::new()
};
let child_task = tokio::spawn(async move {
child.wait().await
});

if exit_code != 0 {
if let Some(exit_codes) = exit_codes {
if let Some(error_message) = exit_codes.get(&exit_code) {
return Err(DscError::CommandExitFromManifest(executable.to_string(), exit_code, error_message.to_string()));
let stdout_task = tokio::spawn(async move {
let mut stdout_result = String::with_capacity(INITIAL_BUFFER_CAPACITY);
while let Ok(Some(line)) = stdout_reader.next_line().await {
stdout_result.push_str(&line);
stdout_result.push('\n');
}
stdout_result
});

let stderr_task = tokio::spawn(async move {
let mut filtered_stderr = String::with_capacity(INITIAL_BUFFER_CAPACITY);
while let Ok(Some(stderr_line)) = stderr_reader.next_line().await {
let filtered_stderr_line = log_stderr_line(&child_id, &stderr_line);
if !filtered_stderr_line.is_empty() {
filtered_stderr.push_str(filtered_stderr_line);
filtered_stderr.push('\n');
}
}
filtered_stderr
});

let exit_code = child_task.await.unwrap()?.code();
let stdout_result = stdout_task.await.unwrap();
let stderr_result = stderr_task.await.unwrap();

if let Some(code) = exit_code {
debug!("Process '{executable}' id {child_id} exited with code {code}");

if code != 0 {
if let Some(exit_codes) = exit_codes {
if let Some(error_message) = exit_codes.get(&code) {
return Err(DscError::CommandExitFromManifest(executable.to_string(), code, error_message.to_string()));
}
}
return Err(DscError::Command(executable.to_string(), code, stderr_result));
}
return Err(DscError::Command(executable.to_string(), exit_code, cleaned_stderr));

Ok((code, stdout_result, stderr_result))
} else {
debug!("Process '{executable}' id {child_id} terminated by signal");
Err(DscError::CommandOperation("Process terminated by signal".to_string(), executable.to_string()))
}
}

/// Invoke a command and return the exit code, stdout, and stderr.
///
/// # Arguments
///
/// * `executable` - The command to execute
/// * `args` - Optional arguments to pass to the command
/// * `input` - Optional input to pass to the command
/// * `cwd` - Optional working directory to execute the command in
/// * `env` - Optional environment variable mappings to add or update
/// * `exit_codes` - Optional descriptions of exit codes
///
/// # Errors
///
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
///
/// # Panics
///
/// Will panic if tokio runtime can't be created.
///
#[allow(clippy::implicit_hasher)]
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
debug!("Invoking command '{}' with args {:?}", executable, args);

Ok((exit_code, stdout, cleaned_stderr))
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(run_process_async(executable, args, input, cwd, env, exit_codes))
}

fn process_args(args: &Option<Vec<ArgKind>>, value: &str) -> Option<Vec<String>> {
Expand Down Expand Up @@ -784,30 +839,31 @@ fn json_to_hashmap(json: &str) -> Result<HashMap<String, String>, DscError> {
///
/// * `process_name` - The name of the process
/// * `process_id` - The ID of the process
/// * `stderr` - The stderr output from the process
pub fn log_resource_traces(process_name: &str, process_id: &u32, stderr: &str)
/// * `trace_line` - The stderr line from the process
pub fn log_stderr_line<'a>(process_id: &u32, trace_line: &'a str) -> &'a str
{
if !stderr.is_empty()
if !trace_line.is_empty()
{
for trace_line in stderr.lines() {
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
if let Some(msg) = json_obj.get("Error") {
error!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Warning") {
warn!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Info") {
info!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Debug") {
debug!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Trace") {
trace!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
} else {
// TODO: deserialize tracing JSON to have better presentation
trace!("Process '{process_name}' id {process_id} : {trace_line}");
};
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
if let Some(msg) = json_obj.get("Error") {
error!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Warning") {
warn!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Info") {
info!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Debug") {
debug!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
} else if let Some(msg) = json_obj.get("Trace") {
trace!("Process id {process_id} : {}", msg.as_str().unwrap_or_default());
} else {
trace!("Process '{process_name}' id {process_id} : {trace_line}");
}
// the line is a valid json, but not one of standard trace lines - return it as filtered stderr_line
return trace_line;
};
} else {
// the line is not a valid json - return it as filtered stderr_line
return trace_line;
}
}
};

""
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ class TestClassResource : BaseTestClass
static [TestClassResource[]] Export()
{
$resultList = [List[TestClassResource]]::new()
1..5 | %{
$resultCount = 5
if ($env:TestClassResourceResultCount) {
$resultCount = $env:TestClassResourceResultCount
}
1..$resultCount | %{
$obj = New-Object TestClassResource
$obj.Name = "Object$_"
$obj.Prop1 = "Property of object$_"
Expand Down
11 changes: 11 additions & 0 deletions powershell-adapter/Tests/powershellgroup.resource.tests.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,15 @@ Describe 'PowerShell adapter resource tests' {
$env:PATH = $oldPath
}
}

It 'Dsc can process large resource output' -Tag z1{
$env:TestClassResourceResultCount = 5000 # with sync resource invocations this was not possible

$r = dsc resource export -r TestClassResource/TestClassResource
$LASTEXITCODE | Should -Be 0
$res = $r | ConvertFrom-Json
$res.resources[0].properties.result.count | Should -Be 5000

$env:TestClassResourceResultCount = $null
}
}
Loading