Skip to content

Commit

Permalink
Merge pull request rust-lang#151 from stepancheg/catch-unwind
Browse files Browse the repository at this point in the history
Stream::catch_unwind
  • Loading branch information
alexcrichton authored Sep 14, 2016
2 parents 2541570 + 8324361 commit fc3773d
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/stream/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::prelude::v1::*;
use std::any::Any;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
use std::mem;

use super::super::{Poll, Async};
use super::Stream;

/// Stream for the `catch_unwind` combinator.
///
/// This is created by this `Stream::catch_unwind` method.
#[must_use = "streams do nothing unless polled"]
pub struct CatchUnwind<S> where S: Stream {
state: CatchUnwindState<S>,
}

pub fn new<S>(stream: S) -> CatchUnwind<S>
where S: Stream + UnwindSafe,
{
CatchUnwind {
state: CatchUnwindState::Stream(stream),
}
}

enum CatchUnwindState<S> {
Stream(S),
Eof,
Done,
}

impl<S> Stream for CatchUnwind<S>
where S: Stream + UnwindSafe,
{
type Item = Result<S::Item, S::Error>;
type Error = Box<Any + Send>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) {
CatchUnwindState::Done => panic!("cannot poll after eof"),
CatchUnwindState::Eof => {
self.state = CatchUnwindState::Done;
return Ok(Async::Ready(None));
}
CatchUnwindState::Stream(stream) => stream,
};
let res = catch_unwind(|| (stream.poll(), stream));
match res {
Err(e) => Err(e), // and state is already Eof
Ok((poll, stream)) => {
self.state = CatchUnwindState::Stream(stream);
match poll {
Err(e) => Ok(Async::Ready(Some(Err(e)))),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
}
}
}
}
}

impl<S: Stream> Stream for AssertUnwindSafe<S> {
type Item = S::Item;
type Error = S::Error;

fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
self.0.poll()
}
}
41 changes: 41 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ pub use self::zip::Zip;
pub use self::peek::Peekable;

if_std! {
use std;

mod buffered;
mod buffer_unordered;
mod catch_unwind;
mod channel;
mod collect;
mod wait;
pub use self::buffered::Buffered;
pub use self::buffer_unordered::BufferUnordered;
pub use self::catch_unwind::CatchUnwind;
pub use self::channel::{channel, Sender, Receiver, FutureSender};
pub use self::collect::Collect;
pub use self::wait::Wait;
Expand Down Expand Up @@ -634,6 +638,43 @@ pub trait Stream {
fuse::new(self)
}

/// Catches unwinding panics while polling the stream.
///
/// Caught panic (if any) will be the last element of the resulting stream.
///
/// In general, panics within a stream can propagate all the way out to the
/// task level. This combinator makes it possible to halt unwinding within
/// the stream itself. It's most commonly used within task executors.
///
/// Note that this method requires the `UnwindSafe` bound from the standard
/// library. This isn't always applied automatically, and the standard
/// library provides an `AssertUnwindSafe` wrapper type to apply it
/// after-the fact. To assist using this method, the `Stream` trait is also
/// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
///
/// # Examples
///
/// ```rust
/// use futures::stream;
/// use futures::stream::Stream;
///
/// let stream = stream::iter::<_, Option<i32>, bool>(vec![
/// Some(10), None, Some(11)].into_iter().map(Ok));
/// // panic on second element
/// let stream_panicking = stream.map(|o| o.unwrap());
/// let mut iter = stream_panicking.catch_unwind().wait();
///
/// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
/// assert!(iter.next().unwrap().is_err());
/// assert!(iter.next().is_none());
/// ```
#[cfg(feature = "use_std")]
fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + std::panic::UnwindSafe
{
catch_unwind::new(self)
}

/// An adaptor for creating a buffered list of pending futures.
///
/// If this stream's item can be converted into a future, then this adaptor
Expand Down
31 changes: 31 additions & 0 deletions tests/stream_catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
extern crate futures;

use futures::stream;
use futures::stream::Stream;

#[test]
fn panic_in_the_middle_of_the_stream() {
let stream = stream::iter::<_, Option<i32>, bool>(vec![
Some(10), None, Some(11)].into_iter().map(Ok));

// panic on second element
let stream_panicking = stream.map(|o| o.unwrap());
let mut iter = stream_panicking.catch_unwind().wait();

assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
assert!(iter.next().unwrap().is_err());
assert!(iter.next().is_none());
}

#[test]
fn no_panic() {
let stream = stream::iter::<_, _, bool>(vec![
10, 11, 12].into_iter().map(Ok));

let mut iter = stream.catch_unwind().wait();

assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap());
assert_eq!(Ok(12), iter.next().unwrap().ok().unwrap());
assert!(iter.next().is_none());
}

0 comments on commit fc3773d

Please sign in to comment.