diff --git a/docs/reference/components/sources/exec.cue b/docs/reference/components/sources/exec.cue index 0bd488ebf73a8..2db85c7b368ea 100644 --- a/docs/reference/components/sources/exec.cue +++ b/docs/reference/components/sources/exec.cue @@ -54,7 +54,7 @@ components: sources: exec: { } } } - current_dir: { + working_directory: { common: false required: false description: "The directory in which to run the command." @@ -76,7 +76,7 @@ components: sources: exec: { required: false type: bool: default: true } - maximum_buffer_size: { + maximum_buffer_size_bytes: { common: false description: "The maximum buffer size allowed before a log event will be generated." required: false @@ -125,7 +125,7 @@ components: sources: exec: { required: false warnings: [] type: uint: { - default: 60 + default: 5 unit: "seconds" } } diff --git a/src/sources/exec.rs b/src/sources/exec.rs index 02b8a761b6ee1..6f25bc4b1461b 100644 --- a/src/sources/exec.rs +++ b/src/sources/exec.rs @@ -29,13 +29,13 @@ pub struct ExecConfig { #[serde(flatten)] pub mode: Mode, pub command: Vec, - pub current_dir: Option, + pub working_directory: Option, #[serde(default = "default_include_stderr")] pub include_stderr: bool, #[serde(default = "default_events_per_line")] pub event_per_line: bool, #[serde(default = "default_maximum_buffer_size")] - pub maximum_buffer_size: u64, + pub maximum_buffer_size_bytes: u64, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -68,10 +68,10 @@ impl Default for ExecConfig { exec_interval_secs: default_exec_interval_secs(), }, command: vec!["echo".to_owned(), "Hello World!".to_owned()], - current_dir: None, + working_directory: None, include_stderr: default_include_stderr(), event_per_line: default_events_per_line(), - maximum_buffer_size: default_maximum_buffer_size(), + maximum_buffer_size_bytes: default_maximum_buffer_size(), } } } @@ -86,7 +86,7 @@ fn default_exec_interval_secs() -> u64 { } fn default_respawn_interval_secs() -> u64 { - 60 + 5 } fn default_respawn_on_exit() -> bool { @@ -124,7 +124,7 @@ impl ExecConfig { pub(self) fn validate(&self) -> Result<(), ExecConfigError> { if self.command.is_empty() { Err(ExecConfigError::CommandEmpty) - } else if self.maximum_buffer_size == 0 { + } else if self.maximum_buffer_size_bytes == 0 { Err(ExecConfigError::ZeroBuffer) } else { Ok(()) @@ -183,7 +183,7 @@ pub fn run_scheduled( out: Pipeline, ) -> crate::Result { Ok(Box::pin(async move { - info!("Starting scheduled exec runs."); + debug!("Starting scheduled exec runs."); let schedule = Duration::from_secs(exec_interval_secs); let mut interval = @@ -225,7 +225,7 @@ pub fn run_scheduled( } } - info!("Finished scheduled exec runs."); + debug!("Finished scheduled exec runs."); Ok(()) })) } @@ -285,7 +285,7 @@ async fn run_command( shutdown: ShutdownSignal, mut out: Pipeline, ) -> Result, Error> { - info!("Starting command run."); + debug!("Starting command run."); let mut command = build_command(&config); // Mark the start time just before spawning the process as @@ -320,7 +320,7 @@ async fn run_command( stderr_reader, shutdown.clone(), config.event_per_line, - config.maximum_buffer_size, + config.maximum_buffer_size_bytes, STDERR, sender.clone(), ); @@ -330,7 +330,7 @@ async fn run_command( stdout_reader, shutdown.clone(), config.event_per_line, - config.maximum_buffer_size, + config.maximum_buffer_size_bytes, STDOUT, sender, ); @@ -356,7 +356,7 @@ async fn run_command( let elapsed = start.elapsed(); - info!("Finished command run."); + debug!("Finished command run."); let _ = out.flush().await; match child.try_wait() { @@ -426,7 +426,7 @@ fn build_command(config: &ExecConfig) -> Command { command.kill_on_drop(true); // Explicitly set the current dir if needed - if let Some(current_dir) = &config.current_dir { + if let Some(current_dir) = &config.working_directory { command.current_dir(current_dir); } @@ -502,7 +502,7 @@ fn spawn_reader_thread( ) { // Start the green background thread for collecting Box::pin(tokio::spawn(async move { - info!("Start capturing {} command output.", stream); + debug!("Start capturing {} command output.", stream); let mut read_buffer: Vec = Vec::new(); @@ -515,48 +515,48 @@ fn spawn_reader_thread( while let Ok(bytes_read) = limited_reader.read_until(b'\n', &mut read_buffer).await { if bytes_read == 0 { // If we get a continuous stream of \n the bytes_read will be at least 1 - info!("End of input reached, stop reading."); + debug!("End of input reached, stop reading."); break; - } else { - // Strip of the end of line bytes - if read_buffer.ends_with(&[b'\n']) { - let _ = read_buffer.pop(); + } - if read_buffer.ends_with(&[b'\r']) { - let _ = read_buffer.pop(); - } - } + // Strip of the end of line bytes + if read_buffer.ends_with(&[b'\n']) { + let _ = read_buffer.pop(); - let read_bytes = Bytes::from(read_buffer.clone()); - if sender.send((read_bytes, stream)).await.is_err() { - break; + if read_buffer.ends_with(&[b'\r']) { + let _ = read_buffer.pop(); } + } - // Clear the read buffer ready for the next read - read_buffer.clear(); - - // Reset the limit for the next read (minus any left over if we do utf8 checks) - limited_reader.set_limit(buf_size); + let read_bytes = Bytes::from(read_buffer.clone()); + if sender.send((read_bytes, stream)).await.is_err() { + break; } + + // Clear the read buffer ready for the next read + read_buffer.clear(); + + // Reset the limit for the next read (minus any left over if we do utf8 checks) + limited_reader.set_limit(buf_size); } } else { // Keep reading max buffer chunks while let Ok(bytes_read) = limited_reader.read_to_end(&mut read_buffer).await { if bytes_read == 0 { - info!("End of input reached, stop reading."); + debug!("End of input reached, stop reading."); break; - } else { - let read_bytes = Bytes::from(read_buffer.clone()); - if sender.send((read_bytes, stream)).await.is_err() { - break; - } - - // Clear the read buffer ready for the next read - read_buffer.clear(); + } - // Reset the limit for the next read (minus any left over if we do utf8 checks) - limited_reader.set_limit(buf_size); + let read_bytes = Bytes::from(read_buffer.clone()); + if sender.send((read_bytes, stream)).await.is_err() { + break; } + + // Clear the read buffer ready for the next read + read_buffer.clear(); + + // Reset the limit for the next read (minus any left over if we do utf8 checks) + limited_reader.set_limit(buf_size); } } @@ -566,7 +566,7 @@ fn spawn_reader_thread( let _ = sender.send((read_bytes, stream)).await.is_err(); } - info!("Finished capturing {} command output.", stream); + debug!("Finished capturing {} command output.", stream); })); } @@ -649,10 +649,10 @@ mod tests { respawn_interval_secs: default_respawn_interval_secs(), }, command: vec!["./runner".to_owned(), "arg1".to_owned(), "arg2".to_owned()], - current_dir: Some(PathBuf::from("/tmp")), + working_directory: Some(PathBuf::from("/tmp")), include_stderr: default_include_stderr(), event_per_line: default_events_per_line(), - maximum_buffer_size: default_maximum_buffer_size(), + maximum_buffer_size_bytes: default_maximum_buffer_size(), }; let command = build_command(&config); @@ -821,10 +821,10 @@ mod tests { respawn_interval_secs: default_respawn_interval_secs(), }, command: vec!["yes".to_owned()], - current_dir: None, + working_directory: None, include_stderr: default_include_stderr(), event_per_line: default_events_per_line(), - maximum_buffer_size: default_maximum_buffer_size(), + maximum_buffer_size_bytes: default_maximum_buffer_size(), } } }