diff --git a/README.md b/README.md index 3dda552..f8b1e4a 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,24 @@ # Multiplexer / Demultiplexer for standard error and standard output The `muxeo` command combines a program's _stderr_ and _stdout_ streams into a -single stream of frames written to _stdout_. The frame has the following -structure: +single frame stream, followed by a frame containing the exit status code, all +written to _stdout_. Frames are coded as follows: ``` -+----------+--Frame---+---------+ -| kind: u8 | len: u32 | payload | -+----------+----------+---------+ ++--------------+----Err---+---------------+ +| kind: u8 = 0 | len: u32 | payload: [u8] | ++--------------+----------+---------------+ ++-----Exit Status Code-----+ +| kind: u8 = 1 | code: i32 | ++--------------+-----------+ ++--------------+----Out---+---------------+ +| kind: u8 = 2 | len: u32 | payload: [u8] | ++--------------+----------+---------------+ ``` The `demuxeo` command knows how to decode the stream of frames received as _stdin_ and then writes to both _stderr_ and _stdout_ depending on the frame -kind (err/out). +kind (err/out) and exits with the decoded status code. For example, the following command: diff --git a/src/bin/demuxeo.rs b/src/bin/demuxeo.rs index 16e3701..161e321 100644 --- a/src/bin/demuxeo.rs +++ b/src/bin/demuxeo.rs @@ -1,3 +1,5 @@ +use std::process; + use clap::Parser; use futures::TryStreamExt; use muxeo::{Frame, FrameKind, MAX}; @@ -21,10 +23,37 @@ impl Decoder for EoDecoder { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 1 + 4 { - // Not enough data to read kind and length marker. + // Not enough data to read kind marker and length marker / exit + // status code. return Ok(None); } + // Read kind marker. + let kind = match src[0] { + 0 => FrameKind::Err, + 1 => FrameKind::ExitStatusCode, + 2 => FrameKind::Out, + k => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid frame kind {}.", k), + )); + } + }; + + if let FrameKind::ExitStatusCode = kind { + // Read exit status code. + let mut code_bytes = [0; 4]; + code_bytes.copy_from_slice(&src[1..1 + 4]); + let code = i32::from_be_bytes(code_bytes); + + // Use advance to modify src such that it no longer contains this + // frame. + src.advance(1 + 4); + + return Ok(Some(Frame::ExitStatusCode(code))); + } + // Read length marker. let mut len_bytes = [0; 4]; len_bytes.copy_from_slice(&src[1..1 + 4]); @@ -51,18 +80,6 @@ impl Decoder for EoDecoder { return Ok(None); } - // Read kind marker. - let kind = match src[0] { - 0 => FrameKind::Err, - 1 => FrameKind::Out, - k => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid frame kind {}.", k), - )); - } - }; - // Use advance and split_to to modify src such that it no longer // contains this frame. src.advance(1 + 4); @@ -71,6 +88,7 @@ impl Decoder for EoDecoder { Ok(Some(match kind { FrameKind::Err => Frame::Err(bytes), FrameKind::Out => Frame::Out(bytes), + _ => unreachable!(), })) } } @@ -88,6 +106,7 @@ async fn main() -> io::Result<()> { Frame::Err(mut bytes) => { stderr.write_all_buf(&mut bytes).await?; } + Frame::ExitStatusCode(code) => process::exit(code), Frame::Out(mut bytes) => { stdout.write_all_buf(&mut bytes).await?; } diff --git a/src/lib.rs b/src/lib.rs index 9ba5818..013d04a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ use tokio_util::bytes::Bytes; pub enum Frame { Err(Bytes), + ExitStatusCode(i32), Out(Bytes), } @@ -9,6 +10,7 @@ impl Frame { pub fn kind(&self) -> FrameKind { match self { Self::Err(_) => FrameKind::Err, + Self::ExitStatusCode(_) => FrameKind::ExitStatusCode, Self::Out(_) => FrameKind::Out, } } @@ -16,7 +18,8 @@ impl Frame { pub enum FrameKind { Err = 0, - Out = 1, + ExitStatusCode = 1, + Out = 2, } pub const MAX: usize = 8 * 1024 * 1024; diff --git a/src/main.rs b/src/main.rs index 6f64786..aab76e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,4 @@ -use std::{ - ffi::OsString, - process::{self, Stdio}, -}; +use std::{ffi::OsString, process::Stdio}; use clap::Parser; use futures::{SinkExt, TryStreamExt}; @@ -28,9 +25,21 @@ impl Encoder for EoEncoder { type Error = io::Error; fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + if let Frame::ExitStatusCode(code) = item { + // Reserve space in the buffer. + dst.reserve(1 + 4); + + // Write the kind and exit status code to the buffer. + dst.put_u8(FrameKind::ExitStatusCode as u8); + dst.put_i32(code); + + return Ok(()); + } + let (kind, bytes) = match item { Frame::Err(bytes) => (FrameKind::Err, bytes), Frame::Out(bytes) => (FrameKind::Out, bytes), + _ => unreachable!(), }; let bytes_len = bytes.len(); @@ -69,15 +78,14 @@ async fn main() -> io::Result<()> { let stderr = ReaderStream::with_capacity(child.stderr.take().unwrap(), MAX).map_ok(Frame::Err); let stdout = ReaderStream::with_capacity(child.stdout.take().unwrap(), MAX).map_ok(Frame::Out); + let mut writer = FramedWrite::new(io::stdout(), EoEncoder); - FramedWrite::new(io::stdout(), EoEncoder) - .send_all(&mut stderr.merge(stdout)) - .await?; + writer.send_all(&mut stderr.merge(stdout)).await?; let status = child.wait().await?; if let Some(code) = status.code() { - process::exit(code); + writer.send(Frame::ExitStatusCode(code)).await?; } Ok(())