Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permit conversion into and creation from parts. #2

Merged
merged 3 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Permit conversion into and creation from "parts"
[#2](https://github.com/mxinden/asynchronous-codec/pull/2).

## [0.5.0] - 2021-01-06

### Changed

- Update to `bytes` `v1` and `pin-project-lite` `v0.2`.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "asynchronous-codec"
edition = "2018"
version = "0.5.0"
version = "0.6.0"
authors = ["Max Inden <[email protected]>"]
description = "Utilities for encoding and decoding frames using `async/await`"
license = "MIT"
Expand Down
82 changes: 71 additions & 11 deletions src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pin_project! {
/// let bytes = Bytes::from("Hello world!");
/// framed.send(bytes).await?;
///
/// // Release the I/O and codec
/// let (cur, _) = framed.release();
/// // Drop down to the underlying I/O stream.
/// let cur = framed.into_inner();
/// assert_eq!(cur.get_ref(), b"Hello world!");
/// # Ok::<_, std::io::Error>(())
/// # }).unwrap();
Expand Down Expand Up @@ -67,23 +67,50 @@ where
/// A codec is a type which implements `Decoder` and `Encoder`.
pub fn new(inner: T, codec: U) -> Self {
Self {
inner: framed_read_2(framed_write_2(Fuse::new(inner, codec))),
inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None),
}
}

/// Release the I/O and Codec
pub fn release(self) -> (T, U) {
let fuse = self.inner.release().release();
(fuse.t, fuse.u)
/// Creates a new `Framed` from [`FramedParts`].
///
/// See also [`Framed::into_parts`].
pub fn from_parts(
FramedParts {
io,
codec,
write_buffer,
read_buffer,
..
}: FramedParts<T, U>,
) -> Self {
let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer));
let framed_read = framed_read_2(framed_write, Some(read_buffer));
Self { inner: framed_read }
}

/// Consumes the `Framed`, returning its parts, such that a new
/// `Framed` may be constructed, possibly with a different codec.
///
/// See also [`Framed::from_parts`].
pub fn into_parts(self) -> FramedParts<T, U> {
let (framed_write, read_buffer) = self.inner.into_parts();
let (fuse, write_buffer) = framed_write.into_parts();
FramedParts {
io: fuse.t,
codec: fuse.u,
read_buffer,
write_buffer,
_priv: (),
}
}

/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been read or written but not yet
/// consumed by the decoder or flushed, respectively, is dropped.
/// To retain any such potentially buffered data, use [`Framed::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying codec wrapped by
Expand Down Expand Up @@ -142,3 +169,36 @@ where
self.project().inner.poll_close(cx)
}
}

/// The parts obtained from [`Framed::into_parts`].
pub struct FramedParts<T, U> {
/// The underlying I/O stream.
pub io: T,
/// The codec used for encoding and decoding frames.
pub codec: U,
/// The remaining read buffer, containing data that has been
/// read from `io` but not yet consumed by the codec's decoder.
pub read_buffer: BytesMut,
/// The remaining write buffer, containing framed data that has been
/// buffered but not yet flushed to `io`.
pub write_buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, U> FramedParts<T, U> {
/// Changes the codec used in this `FramedParts`.
pub fn map_codec<V, F>(self, f: F) -> FramedParts<T, V>
where
V: Encoder + Decoder,
F: FnOnce(U) -> V,
{
FramedParts {
io: self.io,
codec: f(self.codec),
read_buffer: self.read_buffer,
write_buffer: self.write_buffer,
_priv: (),
}
}
}
79 changes: 66 additions & 13 deletions src/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,47 @@ where
/// Creates a new `FramedRead` transport with the given `Decoder`.
pub fn new(inner: T, decoder: D) -> Self {
Self {
inner: framed_read_2(Fuse::new(inner, decoder)),
inner: framed_read_2(Fuse::new(inner, decoder), None),
}
}

/// Release the I/O and Decoder
pub fn release(self) -> (T, D) {
let fuse = self.inner.release();
(fuse.t, fuse.u)
/// Creates a new `FramedRead` from [`FramedReadParts`].
///
/// See also [`FramedRead::into_parts`].
pub fn from_parts(
FramedReadParts {
io,
decoder,
buffer,
..
}: FramedReadParts<T, D>,
) -> Self {
Self {
inner: framed_read_2(Fuse::new(io, decoder), Some(buffer)),
}
}

/// Consumes the `FramedRead`, returning its parts such that a
/// new `FramedRead` may be constructed, possibly with a different decoder.
///
/// See also [`FramedRead::from_parts`].
pub fn into_parts(self) -> FramedReadParts<T, D> {
let (fuse, buffer) = self.inner.into_parts();
FramedReadParts {
io: fuse.t,
decoder: fuse.u,
buffer,
_priv: (),
}
}

/// Consumes the `FramedRead`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been read but not yet consumed
/// by the decoder is dropped. To retain any such potentially
/// buffered data, use [`FramedRead::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying decoder.
Expand Down Expand Up @@ -136,10 +160,10 @@ impl<T> DerefMut for FramedRead2<T> {

const INITIAL_CAPACITY: usize = 8 * 1024;

pub fn framed_read_2<T>(inner: T) -> FramedRead2<T> {
pub fn framed_read_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedRead2<T> {
FramedRead2 {
inner,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(INITIAL_CAPACITY)),
}
}

Expand Down Expand Up @@ -210,11 +234,40 @@ where
}

impl<T> FramedRead2<T> {
pub fn release(self) -> T {
self.inner
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}

pub fn buffer(&self) -> &BytesMut {
&self.buffer
}
}

/// The parts obtained from (FramedRead::into_parts).
pub struct FramedReadParts<T, D> {
/// The underlying I/O stream.
pub io: T,
/// The frame decoder.
pub decoder: D,
/// The buffer of data that has been read from `io` but not
/// yet consumed by `decoder`.
pub buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, D> FramedReadParts<T, D> {
/// Changes the decoder in `FramedReadParts`.
pub fn map_decoder<E, F>(self, f: F) -> FramedReadParts<T, E>
where
E: Decoder,
F: FnOnce(D) -> E,
{
FramedReadParts {
io: self.io,
decoder: f(self.decoder),
buffer: self.buffer,
_priv: (),
}
}
}
78 changes: 65 additions & 13 deletions src/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,23 @@ where
/// Creates a new `FramedWrite` transport with the given `Encoder`.
pub fn new(inner: T, encoder: E) -> Self {
Self {
inner: framed_write_2(Fuse::new(inner, encoder)),
inner: framed_write_2(Fuse::new(inner, encoder), None),
}
}

/// Creates a new `FramedWrite` from [`FramedWriteParts`].
///
/// See also [`FramedWrite::into_parts`].
pub fn from_parts(
FramedWriteParts {
io,
encoder,
buffer,
..
}: FramedWriteParts<T, E>,
) -> Self {
Self {
inner: framed_write_2(Fuse::new(io, encoder), Some(buffer)),
}
}

Expand Down Expand Up @@ -84,19 +100,27 @@ where
self.inner.high_water_mark = hwm;
}

/// Release the I/O and Encoder
pub fn release(self) -> (T, E) {
let fuse = self.inner.release();
(fuse.t, fuse.u)
/// Consumes the `FramedWrite`, returning its parts such that
/// a new `FramedWrite` may be constructed, possibly with a different encoder.
///
/// See also [`FramedWrite::from_parts`].
pub fn into_parts(self) -> FramedWriteParts<T, E> {
let (fuse, buffer) = self.inner.into_parts();
FramedWriteParts {
io: fuse.t,
encoder: fuse.u,
buffer,
_priv: (),
}
}

/// Consumes the `FramedWrite`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been written but not yet flushed
/// is dropped. To retain any such potentially buffered data, use
/// [`FramedWrite::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying encoder.
Expand Down Expand Up @@ -179,11 +203,11 @@ impl<T> DerefMut for FramedWrite2<T> {
// TCP send buffer size (SO_SNDBUF)
const DEFAULT_SEND_HIGH_WATER_MARK: usize = 131072;

pub fn framed_write_2<T>(inner: T) -> FramedWrite2<T> {
pub fn framed_write_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedWrite2<T> {
FramedWrite2 {
inner,
high_water_mark: DEFAULT_SEND_HIGH_WATER_MARK,
buffer: BytesMut::with_capacity(1028 * 8),
buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(1028 * 8)),
}
}

Expand Down Expand Up @@ -243,11 +267,39 @@ where
}

impl<T> FramedWrite2<T> {
pub fn release(self) -> T {
self.inner
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}
}

fn err_eof() -> Error {
Error::new(ErrorKind::UnexpectedEof, "End of file")
}

/// The parts obtained from [`FramedWrite::into_parts`].
pub struct FramedWriteParts<T, E> {
/// The underlying I/O stream.
pub io: T,
/// The frame encoder.
pub encoder: E,
/// The framed data that has been buffered but not yet flushed to `io`.
pub buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, E> FramedWriteParts<T, E> {
/// Changes the encoder used in `FramedWriteParts`.
pub fn map_encoder<G, F>(self, f: F) -> FramedWriteParts<T, G>
where
G: Encoder,
F: FnOnce(E) -> G,
{
FramedWriteParts {
io: self.io,
encoder: f(self.encoder),
buffer: self.buffer,
_priv: (),
}
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ mod encoder;
pub use encoder::Encoder;

mod framed;
pub use framed::Framed;
pub use framed::{Framed, FramedParts};

mod framed_read;
pub use framed_read::FramedRead;
pub use framed_read::{FramedRead, FramedReadParts};

mod framed_write;
pub use framed_write::FramedWrite;
pub use framed_write::{FramedWrite, FramedWriteParts};

mod fuse;
Loading