From 43dde9499faf2b9bd3b6c9efa8e945e7df4da413 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 10 Nov 2020 17:08:12 +0100 Subject: [PATCH 1/2] Permit conversion into and creation from parts. --- src/framed.rs | 76 +++++++++++++++++++++++++++++++++------ src/framed_read.rs | 74 +++++++++++++++++++++++++++++++------- src/framed_write.rs | 73 ++++++++++++++++++++++++++++++------- src/lib.rs | 6 ++-- tests/framed_write.rs | 6 ++-- tests/length_delimited.rs | 2 +- 6 files changed, 193 insertions(+), 44 deletions(-) diff --git a/src/framed.rs b/src/framed.rs index d142551..521ab58 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -29,8 +29,8 @@ use std::task::{Context, Poll}; /// let bytes = Bytes::from("Hello world!"); /// framed.send(bytes).await?; /// -/// // Release the I/O and codec -/// let (cur, _) = framed.release(); +/// // Drop down to the underlying I/O stream. +/// let cur = framed.into_inner(); /// assert_eq!(cur.get_ref(), b"Hello world!"); /// # Ok::<_, std::io::Error>(()) /// # }).unwrap(); @@ -65,23 +65,44 @@ where /// A codec is a type which implements `Decoder` and `Encoder`. pub fn new(inner: T, codec: U) -> Self { Self { - inner: framed_read_2(framed_write_2(Fuse::new(inner, codec))), + inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None), } } - /// Release the I/O and Codec - pub fn release(self: Self) -> (T, U) { - let fuse = self.inner.release().release(); - (fuse.t, fuse.u) + /// Creates a new `Framed` from [`FramedParts`]. + /// + /// See also [`Framed::into_parts`]. + pub fn from_parts(FramedParts { + io, codec, write_buffer, read_buffer, .. + }: FramedParts) -> Self { + let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer)); + let framed_read = framed_read_2(framed_write, Some(read_buffer)); + Self { inner: framed_read } + } + + /// Consumes the `Framed`, returning its parts, such that a new + /// `Framed` may be constructed, possibly with a different codec. + /// + /// See also [`Framed::from_parts`]. + pub fn into_parts(self) -> FramedParts { + let (framed_write, read_buffer) = self.inner.into_parts(); + let (fuse, write_buffer) = framed_write.into_parts(); + FramedParts { + io: fuse.t, + codec: fuse.u, + read_buffer, + write_buffer, + _priv: (), + } } /// Consumes the `Framed`, returning its underlying I/O stream. /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. + /// Note that data that has already been read or written but not yet + /// consumed by the decoder or flushed, respectively, is dropped. + /// To retain any such potentially buffered data, use [`Framed::into_parts()`]. pub fn into_inner(self) -> T { - self.release().0 + self.into_parts().io } /// Returns a reference to the underlying codec wrapped by @@ -140,3 +161,36 @@ where self.project().inner.poll_close(cx) } } + +/// The parts obtained from [`Framed::into_parts`]. +pub struct FramedParts { + /// The underlying I/O stream. + pub io: T, + /// The codec used for encoding and decoding frames. + pub codec: U, + /// The remaining read buffer, containing data that has been + /// read from `io` but not yet consumed by the codec's decoder. + pub read_buffer: BytesMut, + /// The remaining write buffer, containing framed data that has been + /// buffered but not yet flushed to `io`. + pub write_buffer: BytesMut, + /// Keep the constructor private. + _priv: (), +} + +impl FramedParts { + /// Changes the codec used in this `FramedParts`. + pub fn map_codec(self, f: F) -> FramedParts + where + V: Encoder + Decoder, + F: FnOnce(U) -> V, + { + FramedParts { + io: self.io, + codec: f(self.codec), + read_buffer: self.read_buffer, + write_buffer: self.write_buffer, + _priv: (), + } + } +} diff --git a/src/framed_read.rs b/src/framed_read.rs index 326d0ed..b5e3fd9 100644 --- a/src/framed_read.rs +++ b/src/framed_read.rs @@ -56,23 +56,42 @@ where /// Creates a new `FramedRead` transport with the given `Decoder`. pub fn new(inner: T, decoder: D) -> Self { Self { - inner: framed_read_2(Fuse::new(inner, decoder)), + inner: framed_read_2(Fuse::new(inner, decoder), None), } } - /// Release the I/O and Decoder - pub fn release(self: Self) -> (T, D) { - let fuse = self.inner.release(); - (fuse.t, fuse.u) + /// Creates a new `FramedRead` from [`FramedReadParts`]. + /// + /// See also [`FramedRead::into_parts`]. + pub fn from_parts(FramedReadParts { + io, decoder, buffer, .. + }: FramedReadParts) -> Self { + Self { + inner: framed_read_2(Fuse::new(io, decoder), Some(buffer)) + } + } + + /// Consumes the `FramedRead`, returning its parts such that a + /// new `FramedRead` may be constructed, possibly with a different decoder. + /// + /// See also [`FramedRead::from_parts`]. + pub fn into_parts(self) -> FramedReadParts { + let (fuse, buffer) = self.inner.into_parts(); + FramedReadParts { + io: fuse.t, + decoder: fuse.u, + buffer, + _priv: (), + } } /// Consumes the `FramedRead`, returning its underlying I/O stream. /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. + /// Note that data that has already been read but not yet consumed + /// by the decoder is dropped. To retain any such potentially + /// buffered data, use [`FramedRead::into_parts()`]. pub fn into_inner(self) -> T { - self.release().0 + self.into_parts().io } /// Returns a reference to the underlying decoder. @@ -133,10 +152,10 @@ impl DerefMut for FramedRead2 { const INITIAL_CAPACITY: usize = 8 * 1024; -pub fn framed_read_2(inner: T) -> FramedRead2 { +pub fn framed_read_2(inner: T, buffer: Option) -> FramedRead2 { FramedRead2 { inner, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(INITIAL_CAPACITY)), } } @@ -207,11 +226,40 @@ where } impl FramedRead2 { - pub fn release(self: Self) -> T { - self.inner + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) } pub fn buffer(&self) -> &BytesMut { &self.buffer } } + +/// The parts obtained from (FramedRead::into_parts). +pub struct FramedReadParts { + /// The underlying I/O stream. + pub io: T, + /// The frame decoder. + pub decoder: D, + /// The buffer of data that has been read from `io` but not + /// yet consumed by `decoder`. + pub buffer: BytesMut, + /// Keep the constructor private. + _priv: (), +} + +impl FramedReadParts { + /// Changes the decoder in `FramedReadParts`. + pub fn map_decoder(self, f: F) -> FramedReadParts + where + E: Decoder, + F: FnOnce(D) -> E, + { + FramedReadParts { + io: self.io, + decoder: f(self.decoder), + buffer: self.buffer, + _priv: (), + } + } +} diff --git a/src/framed_write.rs b/src/framed_write.rs index d997cb5..f11815f 100644 --- a/src/framed_write.rs +++ b/src/framed_write.rs @@ -44,7 +44,18 @@ where /// Creates a new `FramedWrite` transport with the given `Encoder`. pub fn new(inner: T, encoder: E) -> Self { Self { - inner: framed_write_2(Fuse::new(inner, encoder)), + inner: framed_write_2(Fuse::new(inner, encoder), None), + } + } + + /// Creates a new `FramedWrite` from [`FramedWriteParts`]. + /// + /// See also [`FramedWrite::into_parts`]. + pub fn from_parts(FramedWriteParts { + io, encoder, buffer, .. + }: FramedWriteParts) -> Self { + Self { + inner: framed_write_2(Fuse::new(io, encoder), Some(buffer)) } } @@ -82,19 +93,27 @@ where self.inner.high_water_mark = hwm; } - /// Release the I/O and Encoder - pub fn release(self) -> (T, E) { - let fuse = self.inner.release(); - (fuse.t, fuse.u) + /// Consumes the `FramedWrite`, returning its parts such that + /// a new `FramedWrite` may be constructed, possibly with a different encoder. + /// + /// See also [`FramedWrite::from_parts`]. + pub fn into_parts(self) -> FramedWriteParts { + let (fuse, buffer) = self.inner.into_parts(); + FramedWriteParts { + io: fuse.t, + encoder: fuse.u, + buffer, + _priv: (), + } } /// Consumes the `FramedWrite`, returning its underlying I/O stream. /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. + /// Note that data that has already been written but not yet flushed + /// is dropped. To retain any such potentially buffered data, use + /// [`FramedWrite::into_parts()`]. pub fn into_inner(self) -> T { - self.release().0 + self.into_parts().io } /// Returns a reference to the underlying encoder. @@ -176,11 +195,11 @@ impl DerefMut for FramedWrite2 { // TCP send buffer size (SO_SNDBUF) const DEFAULT_SEND_HIGH_WATER_MARK: usize = 131072; -pub fn framed_write_2(inner: T) -> FramedWrite2 { +pub fn framed_write_2(inner: T, buffer: Option) -> FramedWrite2 { FramedWrite2 { inner, high_water_mark: DEFAULT_SEND_HIGH_WATER_MARK, - buffer: BytesMut::with_capacity(1028 * 8), + buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(1028 * 8)), } } @@ -240,11 +259,39 @@ where } impl FramedWrite2 { - pub fn release(self) -> T { - self.inner + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) } } fn err_eof() -> Error { Error::new(ErrorKind::UnexpectedEof, "End of file") } + +/// The parts obtained from [`FramedWrite::into_parts`]. +pub struct FramedWriteParts { + /// The underlying I/O stream. + pub io: T, + /// The frame encoder. + pub encoder: E, + /// The framed data that has been buffered but not yet flushed to `io`. + pub buffer: BytesMut, + /// Keep the constructor private. + _priv: (), +} + +impl FramedWriteParts { + /// Changes the encoder used in `FramedWriteParts`. + pub fn map_encoder(self, f: F) -> FramedWriteParts + where + G: Encoder, + F: FnOnce(E) -> G, + { + FramedWriteParts { + io: self.io, + encoder: f(self.encoder), + buffer: self.buffer, + _priv: (), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 212d4fe..eb20aef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,12 +37,12 @@ mod encoder; pub use encoder::Encoder; mod framed; -pub use framed::Framed; +pub use framed::{Framed, FramedParts}; mod framed_read; -pub use framed_read::FramedRead; +pub use framed_read::{FramedRead, FramedReadParts}; mod framed_write; -pub use framed_write::FramedWrite; +pub use framed_write::{FramedWrite, FramedWriteParts}; mod fuse; diff --git a/tests/framed_write.rs b/tests/framed_write.rs index 4068a69..918fc07 100644 --- a/tests/framed_write.rs +++ b/tests/framed_write.rs @@ -57,7 +57,7 @@ fn line_write() { let mut framer = FramedWrite::new(curs, LinesCodec {}); executor::block_on(framer.send("Hello\n".to_owned())).unwrap(); executor::block_on(framer.send("World\n".to_owned())).unwrap(); - let (curs, _) = framer.release(); + let curs = framer.into_inner(); assert_eq!(&curs.get_ref()[0..12], b"Hello\nWorld\n"); assert_eq!(curs.position(), 12); } @@ -69,7 +69,7 @@ fn line_write_to_eof() { let mut framer = FramedWrite::new(curs, LinesCodec {}); let _err = executor::block_on(framer.send("This will fill up the buffer\n".to_owned())).unwrap_err(); - let (curs, _) = framer.release(); + let curs = framer.into_inner(); assert_eq!(curs.position(), 16); assert_eq!(&curs.get_ref()[0..16], b"This will fill u"); } @@ -93,7 +93,7 @@ fn send_high_water_mark() { let mut framer = FramedWrite::new(io, BytesCodec {}); framer.set_send_high_water_mark(500); executor::block_on(framer.send_all(&mut stream)).unwrap(); - let (io, _) = framer.release(); + let io = framer.into_inner(); assert_eq!(io.num_poll_write, 2); assert_eq!(io.last_write_size, 499); } diff --git a/tests/length_delimited.rs b/tests/length_delimited.rs index bc2ab13..0b085ef 100644 --- a/tests/length_delimited.rs +++ b/tests/length_delimited.rs @@ -14,7 +14,7 @@ fn same_msgs_are_received_as_were_sent() { }; executor::block_on(send_msgs); - let (mut cur, _) = framed.release(); + let mut cur = framed.into_inner(); cur.set_position(0); let framed = Framed::new(cur, LengthCodec {}); From d950ee44cf4d883f209b3e3f34ac62ebfc9478a7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 31 Jan 2021 18:51:39 +0100 Subject: [PATCH 2/2] *: Update CHANGELOG.md and Cargo.toml --- CHANGELOG.md | 20 ++++++++++++++++++++ Cargo.toml | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..2900782 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic +Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Changed + +- Permit conversion into and creation from "parts" + [#2](https://github.com/mxinden/asynchronous-codec/pull/2). + +## [0.5.0] - 2021-01-06 + +### Changed + +- Update to `bytes` `v1` and `pin-project-lite` `v0.2`. diff --git a/Cargo.toml b/Cargo.toml index a514123..cc29d35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "asynchronous-codec" edition = "2018" -version = "0.5.0" +version = "0.6.0" authors = ["Max Inden "] description = "Utilities for encoding and decoding frames using `async/await`" license = "MIT"