Skip to content

Commit

Permalink
Merge pull request #384 from yjhmelody/stream-max_by
Browse files Browse the repository at this point in the history
add stream::max_by method
  • Loading branch information
yoshuawuyts authored Oct 29, 2019
2 parents 5ff4ef8 + b57849e commit 2b1c6f0
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
57 changes: 57 additions & 0 deletions src/stream/stream/max_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::cmp::Ordering;
use std::pin::Pin;

use pin_project_lite::pin_project;

use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MaxByFuture<S, F, T> {
#[pin]
stream: S,
compare: F,
max: Option<T>,
}
}

impl<S, F, T> MaxByFuture<S, F, T> {
pub(super) fn new(stream: S, compare: F) -> Self {
MaxByFuture {
stream,
compare,
max: None,
}
}
}

impl<S, F> Future for MaxByFuture<S, F, S::Item>
where
S: Stream,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));

match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.max.take() {
None => *this.max = Some(new),
Some(old) => match (this.compare)(&new, &old) {
Ordering::Greater => *this.max = Some(new),
_ => *this.max = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.max.take()),
}
}
}
41 changes: 41 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod last;
mod le;
mod lt;
mod map;
mod max_by;
mod min_by;
mod min_by_key;
mod next;
Expand Down Expand Up @@ -69,6 +70,7 @@ use gt::GtFuture;
use last::LastFuture;
use le::LeFuture;
use lt::LtFuture;
use max_by::MaxByFuture;
use min_by::MinByFuture;
use min_by_key::MinByKeyFuture;
use next::NextFuture;
Expand Down Expand Up @@ -758,6 +760,45 @@ extension_trait! {
MinByFuture::new(self, compare)
}

#[doc = r#"
Returns the element that gives the maximum value with respect to the
specified comparison function. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let max = s.clone().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, Some(3));
let max = s.max_by(|x, y| y.cmp(x)).await;
assert_eq!(max, Some(1));
let max = VecDeque::<usize>::new().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, None);
#
# }) }
```
"#]
fn max_by<F>(
self,
compare: F,
) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MaxByFuture::new(self, compare)
}

#[doc = r#"
Returns the nth element of the stream.
Expand Down

0 comments on commit 2b1c6f0

Please sign in to comment.