Skip to content

Commit

Permalink
refactor(exec source) Address review comments (#19)
Browse files Browse the repository at this point in the history
Signed-off-by: Stuart Broad <[email protected]>
  • Loading branch information
moogstuart committed Apr 6, 2021
1 parent ecd4faf commit 5f65b4c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 50 deletions.
6 changes: 3 additions & 3 deletions docs/reference/components/sources/exec.cue
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ components: sources: exec: {
}
}
}
current_dir: {
working_directory: {
common: false
required: false
description: "The directory in which to run the command."
Expand All @@ -76,7 +76,7 @@ components: sources: exec: {
required: false
type: bool: default: true
}
maximum_buffer_size: {
maximum_buffer_size_bytes: {
common: false
description: "The maximum buffer size allowed before a log event will be generated."
required: false
Expand Down Expand Up @@ -125,7 +125,7 @@ components: sources: exec: {
required: false
warnings: []
type: uint: {
default: 60
default: 5
unit: "seconds"
}
}
Expand Down
94 changes: 47 additions & 47 deletions src/sources/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ pub struct ExecConfig {
#[serde(flatten)]
pub mode: Mode,
pub command: Vec<String>,
pub current_dir: Option<PathBuf>,
pub working_directory: Option<PathBuf>,
#[serde(default = "default_include_stderr")]
pub include_stderr: bool,
#[serde(default = "default_events_per_line")]
pub event_per_line: bool,
#[serde(default = "default_maximum_buffer_size")]
pub maximum_buffer_size: u64,
pub maximum_buffer_size_bytes: u64,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down Expand Up @@ -68,10 +68,10 @@ impl Default for ExecConfig {
exec_interval_secs: default_exec_interval_secs(),
},
command: vec!["echo".to_owned(), "Hello World!".to_owned()],
current_dir: None,
working_directory: None,
include_stderr: default_include_stderr(),
event_per_line: default_events_per_line(),
maximum_buffer_size: default_maximum_buffer_size(),
maximum_buffer_size_bytes: default_maximum_buffer_size(),
}
}
}
Expand All @@ -86,7 +86,7 @@ fn default_exec_interval_secs() -> u64 {
}

fn default_respawn_interval_secs() -> u64 {
60
5
}

fn default_respawn_on_exit() -> bool {
Expand Down Expand Up @@ -124,7 +124,7 @@ impl ExecConfig {
pub(self) fn validate(&self) -> Result<(), ExecConfigError> {
if self.command.is_empty() {
Err(ExecConfigError::CommandEmpty)
} else if self.maximum_buffer_size == 0 {
} else if self.maximum_buffer_size_bytes == 0 {
Err(ExecConfigError::ZeroBuffer)
} else {
Ok(())
Expand Down Expand Up @@ -183,7 +183,7 @@ pub fn run_scheduled(
out: Pipeline,
) -> crate::Result<super::Source> {
Ok(Box::pin(async move {
info!("Starting scheduled exec runs.");
debug!("Starting scheduled exec runs.");
let schedule = Duration::from_secs(exec_interval_secs);

let mut interval =
Expand Down Expand Up @@ -225,7 +225,7 @@ pub fn run_scheduled(
}
}

info!("Finished scheduled exec runs.");
debug!("Finished scheduled exec runs.");
Ok(())
}))
}
Expand Down Expand Up @@ -285,7 +285,7 @@ async fn run_command(
shutdown: ShutdownSignal,
mut out: Pipeline,
) -> Result<Option<ExitStatus>, Error> {
info!("Starting command run.");
debug!("Starting command run.");
let mut command = build_command(&config);

// Mark the start time just before spawning the process as
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn run_command(
stderr_reader,
shutdown.clone(),
config.event_per_line,
config.maximum_buffer_size,
config.maximum_buffer_size_bytes,
STDERR,
sender.clone(),
);
Expand All @@ -330,7 +330,7 @@ async fn run_command(
stdout_reader,
shutdown.clone(),
config.event_per_line,
config.maximum_buffer_size,
config.maximum_buffer_size_bytes,
STDOUT,
sender,
);
Expand All @@ -356,7 +356,7 @@ async fn run_command(

let elapsed = start.elapsed();

info!("Finished command run.");
debug!("Finished command run.");
let _ = out.flush().await;

match child.try_wait() {
Expand Down Expand Up @@ -426,7 +426,7 @@ fn build_command(config: &ExecConfig) -> Command {
command.kill_on_drop(true);

// Explicitly set the current dir if needed
if let Some(current_dir) = &config.current_dir {
if let Some(current_dir) = &config.working_directory {
command.current_dir(current_dir);
}

Expand Down Expand Up @@ -502,7 +502,7 @@ fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
) {
// Start the green background thread for collecting
Box::pin(tokio::spawn(async move {
info!("Start capturing {} command output.", stream);
debug!("Start capturing {} command output.", stream);

let mut read_buffer: Vec<u8> = Vec::new();

Expand All @@ -515,48 +515,48 @@ fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
while let Ok(bytes_read) = limited_reader.read_until(b'\n', &mut read_buffer).await {
if bytes_read == 0 {
// If we get a continuous stream of \n the bytes_read will be at least 1
info!("End of input reached, stop reading.");
debug!("End of input reached, stop reading.");
break;
} else {
// Strip of the end of line bytes
if read_buffer.ends_with(&[b'\n']) {
let _ = read_buffer.pop();
}

if read_buffer.ends_with(&[b'\r']) {
let _ = read_buffer.pop();
}
}
// Strip of the end of line bytes
if read_buffer.ends_with(&[b'\n']) {
let _ = read_buffer.pop();

let read_bytes = Bytes::from(read_buffer.clone());
if sender.send((read_bytes, stream)).await.is_err() {
break;
if read_buffer.ends_with(&[b'\r']) {
let _ = read_buffer.pop();
}
}

// Clear the read buffer ready for the next read
read_buffer.clear();

// Reset the limit for the next read (minus any left over if we do utf8 checks)
limited_reader.set_limit(buf_size);
let read_bytes = Bytes::from(read_buffer.clone());
if sender.send((read_bytes, stream)).await.is_err() {
break;
}

// Clear the read buffer ready for the next read
read_buffer.clear();

// Reset the limit for the next read (minus any left over if we do utf8 checks)
limited_reader.set_limit(buf_size);
}
} else {
// Keep reading max buffer chunks
while let Ok(bytes_read) = limited_reader.read_to_end(&mut read_buffer).await {
if bytes_read == 0 {
info!("End of input reached, stop reading.");
debug!("End of input reached, stop reading.");
break;
} else {
let read_bytes = Bytes::from(read_buffer.clone());
if sender.send((read_bytes, stream)).await.is_err() {
break;
}

// Clear the read buffer ready for the next read
read_buffer.clear();
}

// Reset the limit for the next read (minus any left over if we do utf8 checks)
limited_reader.set_limit(buf_size);
let read_bytes = Bytes::from(read_buffer.clone());
if sender.send((read_bytes, stream)).await.is_err() {
break;
}

// Clear the read buffer ready for the next read
read_buffer.clear();

// Reset the limit for the next read (minus any left over if we do utf8 checks)
limited_reader.set_limit(buf_size);
}
}

Expand All @@ -566,7 +566,7 @@ fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
let _ = sender.send((read_bytes, stream)).await.is_err();
}

info!("Finished capturing {} command output.", stream);
debug!("Finished capturing {} command output.", stream);
}));
}

Expand Down Expand Up @@ -649,10 +649,10 @@ mod tests {
respawn_interval_secs: default_respawn_interval_secs(),
},
command: vec!["./runner".to_owned(), "arg1".to_owned(), "arg2".to_owned()],
current_dir: Some(PathBuf::from("/tmp")),
working_directory: Some(PathBuf::from("/tmp")),
include_stderr: default_include_stderr(),
event_per_line: default_events_per_line(),
maximum_buffer_size: default_maximum_buffer_size(),
maximum_buffer_size_bytes: default_maximum_buffer_size(),
};

let command = build_command(&config);
Expand Down Expand Up @@ -821,10 +821,10 @@ mod tests {
respawn_interval_secs: default_respawn_interval_secs(),
},
command: vec!["yes".to_owned()],
current_dir: None,
working_directory: None,
include_stderr: default_include_stderr(),
event_per_line: default_events_per_line(),
maximum_buffer_size: default_maximum_buffer_size(),
maximum_buffer_size_bytes: default_maximum_buffer_size(),
}
}
}

0 comments on commit 5f65b4c

Please sign in to comment.