Skip to content

Commit

Permalink
Seal FromStream methods with an internal argument (#2894)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored Sep 29, 2020
1 parent dcb1111 commit 971ed2c
Showing 1 changed file with 63 additions and 45 deletions.
108 changes: 63 additions & 45 deletions tokio/src/stream/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pin_project! {
{
#[pin]
stream: T,
collection: U::Collection,
collection: U::InternalCollection,
}
}

Expand All @@ -42,7 +42,7 @@ where
{
pub(super) fn new(stream: T) -> Collect<T, U> {
let (lower, upper) = stream.size_hint();
let collection = U::initialize(lower, upper);
let collection = U::initialize(sealed::Internal, lower, upper);

Collect { stream, collection }
}
Expand All @@ -64,12 +64,12 @@ where
let item = match ready!(me.stream.poll_next(cx)) {
Some(item) => item,
None => {
return Ready(U::finalize(&mut me.collection));
return Ready(U::finalize(sealed::Internal, &mut me.collection));
}
};

if !U::extend(&mut me.collection, item) {
return Ready(U::finalize(&mut me.collection));
if !U::extend(sealed::Internal, &mut me.collection, item) {
return Ready(U::finalize(sealed::Internal, &mut me.collection));
}
}
}
Expand All @@ -80,70 +80,71 @@ where
impl FromStream<()> for () {}

impl sealed::FromStreamPriv<()> for () {
type Collection = ();
type InternalCollection = ();

fn initialize(_lower: usize, _upper: Option<usize>) {}
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}

fn extend(_collection: &mut (), _item: ()) -> bool {
fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
true
}

fn finalize(_collection: &mut ()) {}
fn finalize(_: sealed::Internal, _collection: &mut ()) {}
}

impl<T: AsRef<str>> FromStream<T> for String {}

impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
type Collection = String;
type InternalCollection = String;

fn initialize(_lower: usize, _upper: Option<usize>) -> String {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
String::new()
}

fn extend(collection: &mut String, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
collection.push_str(item.as_ref());
true
}

fn finalize(collection: &mut String) -> String {
fn finalize(_: sealed::Internal, collection: &mut String) -> String {
mem::replace(collection, String::new())
}
}

impl<T> FromStream<T> for Vec<T> {}

impl<T> sealed::FromStreamPriv<T> for Vec<T> {
type Collection = Vec<T>;
type InternalCollection = Vec<T>;

fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> {
fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
Vec::with_capacity(lower)
}

fn extend(collection: &mut Vec<T>, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
collection.push(item);
true
}

fn finalize(collection: &mut Vec<T>) -> Vec<T> {
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
mem::replace(collection, vec![])
}
}

impl<T> FromStream<T> for Box<[T]> {}

impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
type Collection = Vec<T>;
type InternalCollection = Vec<T>;

fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> {
<Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper)
fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
<Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
}

fn extend(collection: &mut Vec<T>, item: T) -> bool {
<Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item)
fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
<Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
}

fn finalize(collection: &mut Vec<T>) -> Box<[T]> {
<Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice()
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
<Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
.into_boxed_slice()
}
}

Expand All @@ -153,18 +154,26 @@ impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
where
U: FromStream<T>,
{
type Collection = Result<U::Collection, E>;

fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> {
Ok(U::initialize(lower, upper))
type InternalCollection = Result<U::InternalCollection, E>;

fn initialize(
_: sealed::Internal,
lower: usize,
upper: Option<usize>,
) -> Result<U::InternalCollection, E> {
Ok(U::initialize(sealed::Internal, lower, upper))
}

fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool {
fn extend(
_: sealed::Internal,
collection: &mut Self::InternalCollection,
item: Result<T, E>,
) -> bool {
assert!(collection.is_ok());
match item {
Ok(item) => {
let collection = collection.as_mut().ok().expect("invalid state");
U::extend(collection, item)
U::extend(sealed::Internal, collection, item)
}
Err(err) => {
*collection = Err(err);
Expand All @@ -173,11 +182,11 @@ where
}
}

fn finalize(collection: &mut Self::Collection) -> Result<U, E> {
fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
if let Ok(collection) = collection.as_mut() {
Ok(U::finalize(collection))
Ok(U::finalize(sealed::Internal, collection))
} else {
let res = mem::replace(collection, Ok(U::initialize(0, Some(0))));
let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));

if let Err(err) = res {
Err(err)
Expand All @@ -191,37 +200,37 @@ where
impl<T: Buf> FromStream<T> for Bytes {}

impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
type Collection = BytesMut;
type InternalCollection = BytesMut;

fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
BytesMut::new()
}

fn extend(collection: &mut BytesMut, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
collection.put(item);
true
}

fn finalize(collection: &mut BytesMut) -> Bytes {
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> Bytes {
mem::replace(collection, BytesMut::new()).freeze()
}
}

impl<T: Buf> FromStream<T> for BytesMut {}

impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
type Collection = BytesMut;
type InternalCollection = BytesMut;

fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
BytesMut::new()
}

fn extend(collection: &mut BytesMut, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
collection.put(item);
true
}

fn finalize(collection: &mut BytesMut) -> BytesMut {
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> BytesMut {
mem::replace(collection, BytesMut::new())
}
}
Expand All @@ -230,17 +239,26 @@ pub(crate) mod sealed {
#[doc(hidden)]
pub trait FromStreamPriv<T> {
/// Intermediate type used during collection process
type Collection;
///
/// The name of this type is internal and cannot be relied upon.
type InternalCollection;

/// Initialize the collection
fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection;
fn initialize(
internal: Internal,
lower: usize,
upper: Option<usize>,
) -> Self::InternalCollection;

/// Extend the collection with the received item
///
/// Return `true` to continue streaming, `false` complete collection.
fn extend(collection: &mut Self::Collection, item: T) -> bool;
fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;

/// Finalize collection into target type.
fn finalize(collection: &mut Self::Collection) -> Self;
fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
}

#[allow(missing_debug_implementations)]
pub struct Internal;
}

0 comments on commit 971ed2c

Please sign in to comment.