Skip to content

Commit

Permalink
add exit status code frame
Browse files Browse the repository at this point in the history
  • Loading branch information
serbanrobu committed May 7, 2024
1 parent 0984787 commit 37e25c7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 28 deletions.
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
# Multiplexer / Demultiplexer for standard error and standard output

The `muxeo` command combines a program's _stderr_ and _stdout_ streams into a
single stream of frames written to _stdout_. The frame has the following
structure:
single frame stream, followed by a frame containing the exit status code, all
written to _stdout_. Frames are coded as follows:

```
+----------+--Frame---+---------+
| kind: u8 | len: u32 | payload |
+----------+----------+---------+
+--------------+----Err---+---------------+
| kind: u8 = 0 | len: u32 | payload: [u8] |
+--------------+----------+---------------+
+-----Exit Status Code-----+
| kind: u8 = 1 | code: i32 |
+--------------+-----------+
+--------------+----Out---+---------------+
| kind: u8 = 2 | len: u32 | payload: [u8] |
+--------------+----------+---------------+
```

The `demuxeo` command knows how to decode the stream of frames received as
_stdin_ and then writes to both _stderr_ and _stdout_ depending on the frame
kind (err/out).
kind (err/out) and exits with the decoded status code.

For example, the following command:

Expand Down
45 changes: 32 additions & 13 deletions src/bin/demuxeo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::process;

use clap::Parser;
use futures::TryStreamExt;
use muxeo::{Frame, FrameKind, MAX};
Expand All @@ -21,10 +23,37 @@ impl Decoder for EoDecoder {

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 1 + 4 {
// Not enough data to read kind and length marker.
// Not enough data to read kind marker and length marker / exit
// status code.
return Ok(None);
}

// Read kind marker.
let kind = match src[0] {
0 => FrameKind::Err,
1 => FrameKind::ExitStatusCode,
2 => FrameKind::Out,
k => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid frame kind {}.", k),
));
}
};

if let FrameKind::ExitStatusCode = kind {
// Read exit status code.
let mut code_bytes = [0; 4];
code_bytes.copy_from_slice(&src[1..1 + 4]);
let code = i32::from_be_bytes(code_bytes);

// Use advance to modify src such that it no longer contains this
// frame.
src.advance(1 + 4);

return Ok(Some(Frame::ExitStatusCode(code)));
}

// Read length marker.
let mut len_bytes = [0; 4];
len_bytes.copy_from_slice(&src[1..1 + 4]);
Expand All @@ -51,18 +80,6 @@ impl Decoder for EoDecoder {
return Ok(None);
}

// Read kind marker.
let kind = match src[0] {
0 => FrameKind::Err,
1 => FrameKind::Out,
k => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid frame kind {}.", k),
));
}
};

// Use advance and split_to to modify src such that it no longer
// contains this frame.
src.advance(1 + 4);
Expand All @@ -71,6 +88,7 @@ impl Decoder for EoDecoder {
Ok(Some(match kind {
FrameKind::Err => Frame::Err(bytes),
FrameKind::Out => Frame::Out(bytes),
_ => unreachable!(),
}))
}
}
Expand All @@ -88,6 +106,7 @@ async fn main() -> io::Result<()> {
Frame::Err(mut bytes) => {
stderr.write_all_buf(&mut bytes).await?;
}
Frame::ExitStatusCode(code) => process::exit(code),
Frame::Out(mut bytes) => {
stdout.write_all_buf(&mut bytes).await?;
}
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ use tokio_util::bytes::Bytes;

pub enum Frame {
Err(Bytes),
ExitStatusCode(i32),
Out(Bytes),
}

impl Frame {
pub fn kind(&self) -> FrameKind {
match self {
Self::Err(_) => FrameKind::Err,
Self::ExitStatusCode(_) => FrameKind::ExitStatusCode,
Self::Out(_) => FrameKind::Out,
}
}
}

pub enum FrameKind {
Err = 0,
Out = 1,
ExitStatusCode = 1,
Out = 2,
}

pub const MAX: usize = 8 * 1024 * 1024;
24 changes: 16 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
ffi::OsString,
process::{self, Stdio},
};
use std::{ffi::OsString, process::Stdio};

use clap::Parser;
use futures::{SinkExt, TryStreamExt};
Expand All @@ -28,9 +25,21 @@ impl Encoder<Frame> for EoEncoder {
type Error = io::Error;

fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
if let Frame::ExitStatusCode(code) = item {
// Reserve space in the buffer.
dst.reserve(1 + 4);

// Write the kind and exit status code to the buffer.
dst.put_u8(FrameKind::ExitStatusCode as u8);
dst.put_i32(code);

return Ok(());
}

let (kind, bytes) = match item {
Frame::Err(bytes) => (FrameKind::Err, bytes),
Frame::Out(bytes) => (FrameKind::Out, bytes),
_ => unreachable!(),
};

let bytes_len = bytes.len();
Expand Down Expand Up @@ -69,15 +78,14 @@ async fn main() -> io::Result<()> {

let stderr = ReaderStream::with_capacity(child.stderr.take().unwrap(), MAX).map_ok(Frame::Err);
let stdout = ReaderStream::with_capacity(child.stdout.take().unwrap(), MAX).map_ok(Frame::Out);
let mut writer = FramedWrite::new(io::stdout(), EoEncoder);

FramedWrite::new(io::stdout(), EoEncoder)
.send_all(&mut stderr.merge(stdout))
.await?;
writer.send_all(&mut stderr.merge(stdout)).await?;

let status = child.wait().await?;

if let Some(code) = status.code() {
process::exit(code);
writer.send(Frame::ExitStatusCode(code)).await?;
}

Ok(())
Expand Down

0 comments on commit 37e25c7

Please sign in to comment.