Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seal FromStream methods with an internal argument #2894

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 63 additions & 45 deletions tokio/src/stream/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pin_project! {
{
#[pin]
stream: T,
collection: U::Collection,
collection: U::InternalCollection,
}
}

Expand All @@ -42,7 +42,7 @@ where
{
pub(super) fn new(stream: T) -> Collect<T, U> {
let (lower, upper) = stream.size_hint();
let collection = U::initialize(lower, upper);
let collection = U::initialize(sealed::Internal, lower, upper);

Collect { stream, collection }
}
Expand All @@ -64,12 +64,12 @@ where
let item = match ready!(me.stream.poll_next(cx)) {
Some(item) => item,
None => {
return Ready(U::finalize(&mut me.collection));
return Ready(U::finalize(sealed::Internal, &mut me.collection));
}
};

if !U::extend(&mut me.collection, item) {
return Ready(U::finalize(&mut me.collection));
if !U::extend(sealed::Internal, &mut me.collection, item) {
return Ready(U::finalize(sealed::Internal, &mut me.collection));
}
}
}
Expand All @@ -80,70 +80,71 @@ where
impl FromStream<()> for () {}

impl sealed::FromStreamPriv<()> for () {
type Collection = ();
type InternalCollection = ();

fn initialize(_lower: usize, _upper: Option<usize>) {}
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}

fn extend(_collection: &mut (), _item: ()) -> bool {
fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
true
}

fn finalize(_collection: &mut ()) {}
fn finalize(_: sealed::Internal, _collection: &mut ()) {}
}

impl<T: AsRef<str>> FromStream<T> for String {}

impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
type Collection = String;
type InternalCollection = String;

fn initialize(_lower: usize, _upper: Option<usize>) -> String {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
String::new()
}

fn extend(collection: &mut String, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
collection.push_str(item.as_ref());
true
}

fn finalize(collection: &mut String) -> String {
fn finalize(_: sealed::Internal, collection: &mut String) -> String {
mem::replace(collection, String::new())
}
}

impl<T> FromStream<T> for Vec<T> {}

impl<T> sealed::FromStreamPriv<T> for Vec<T> {
type Collection = Vec<T>;
type InternalCollection = Vec<T>;

fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> {
fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
Vec::with_capacity(lower)
}

fn extend(collection: &mut Vec<T>, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
collection.push(item);
true
}

fn finalize(collection: &mut Vec<T>) -> Vec<T> {
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
mem::replace(collection, vec![])
}
}

impl<T> FromStream<T> for Box<[T]> {}

impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
type Collection = Vec<T>;
type InternalCollection = Vec<T>;

fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> {
<Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper)
fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
<Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
}

fn extend(collection: &mut Vec<T>, item: T) -> bool {
<Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item)
fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
<Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
}

fn finalize(collection: &mut Vec<T>) -> Box<[T]> {
<Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice()
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
<Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
.into_boxed_slice()
}
}

Expand All @@ -153,18 +154,26 @@ impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
where
U: FromStream<T>,
{
type Collection = Result<U::Collection, E>;

fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> {
Ok(U::initialize(lower, upper))
type InternalCollection = Result<U::InternalCollection, E>;

fn initialize(
_: sealed::Internal,
lower: usize,
upper: Option<usize>,
) -> Result<U::InternalCollection, E> {
Ok(U::initialize(sealed::Internal, lower, upper))
}

fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool {
fn extend(
_: sealed::Internal,
collection: &mut Self::InternalCollection,
item: Result<T, E>,
) -> bool {
assert!(collection.is_ok());
match item {
Ok(item) => {
let collection = collection.as_mut().ok().expect("invalid state");
U::extend(collection, item)
U::extend(sealed::Internal, collection, item)
}
Err(err) => {
*collection = Err(err);
Expand All @@ -173,11 +182,11 @@ where
}
}

fn finalize(collection: &mut Self::Collection) -> Result<U, E> {
fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
if let Ok(collection) = collection.as_mut() {
Ok(U::finalize(collection))
Ok(U::finalize(sealed::Internal, collection))
} else {
let res = mem::replace(collection, Ok(U::initialize(0, Some(0))));
let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));

if let Err(err) = res {
Err(err)
Expand All @@ -191,37 +200,37 @@ where
impl<T: Buf> FromStream<T> for Bytes {}

impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
type Collection = BytesMut;
type InternalCollection = BytesMut;

fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
BytesMut::new()
}

fn extend(collection: &mut BytesMut, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
collection.put(item);
true
}

fn finalize(collection: &mut BytesMut) -> Bytes {
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> Bytes {
mem::replace(collection, BytesMut::new()).freeze()
}
}

impl<T: Buf> FromStream<T> for BytesMut {}

impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
type Collection = BytesMut;
type InternalCollection = BytesMut;

fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
BytesMut::new()
}

fn extend(collection: &mut BytesMut, item: T) -> bool {
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
collection.put(item);
true
}

fn finalize(collection: &mut BytesMut) -> BytesMut {
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> BytesMut {
mem::replace(collection, BytesMut::new())
}
}
Expand All @@ -230,17 +239,26 @@ pub(crate) mod sealed {
#[doc(hidden)]
pub trait FromStreamPriv<T> {
/// Intermediate type used during collection process
type Collection;
///
/// The name of this type is internal and cannot be relied upon.
type InternalCollection;

/// Initialize the collection
fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection;
fn initialize(
internal: Internal,
lower: usize,
upper: Option<usize>,
) -> Self::InternalCollection;

/// Extend the collection with the received item
///
/// Return `true` to continue streaming, `false` complete collection.
fn extend(collection: &mut Self::Collection, item: T) -> bool;
fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;

/// Finalize collection into target type.
fn finalize(collection: &mut Self::Collection) -> Self;
fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
}

#[allow(missing_debug_implementations)]
pub struct Internal;
}