From ef15257b733d40bc3a7c598f61918f91385585f9 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 27 Aug 2015 15:43:22 -0700 Subject: [PATCH] fix(client): fix panics when some errors occured inside HttpMessage BREAKING CHANGE: This changes the signature of HttpWriter.end(), returning a `EndError` that is similar to std::io::IntoInnerError, allowing HttpMessage to retrieve the broken connections and not panic. The breaking change isn't exposed in any usage of the `Client` API, but for anyone using `HttpWriter` directly, since this was technically a public method, that change is breaking. --- src/client/response.rs | 10 +++ src/http/h1.rs | 160 +++++++++++++++++++++++++---------------- 2 files changed, 110 insertions(+), 60 deletions(-) diff --git a/src/client/response.rs b/src/client/response.rs index 961a84b790..8b14ef9263 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -226,4 +226,14 @@ mod tests { assert_eq!(read_to_string(res).unwrap(), "1".to_owned()); } + + #[test] + fn test_parse_error_closes() { + let url = Url::parse("http://hyper.rs").unwrap(); + let stream = MockStream::with_input(b"\ + definitely not http + "); + + assert!(Response::new(url, Box::new(stream)).is_err()); + } } diff --git a/src/http/h1.rs b/src/http/h1.rs index 44cd35dbfa..b60f0673f9 100644 --- a/src/http/h1.rs +++ b/src/http/h1.rs @@ -74,12 +74,15 @@ impl Read for Http11Message { impl HttpMessage for Http11Message { fn set_outgoing(&mut self, mut head: RequestHead) -> ::Result { - if self.stream.is_none() { - return Err(From::from(io::Error::new( - io::ErrorKind::Other, - "Message not idle, cannot start new outgoing"))); - } - let mut stream = BufWriter::new(self.stream.take().unwrap()); + let stream = match self.stream.take() { + Some(stream) => stream, + None => { + return Err(From::from(io::Error::new( + io::ErrorKind::Other, + "Message not idle, cannot start new outgoing"))); + } + }; + let mut stream = BufWriter::new(stream); let mut uri = head.url.serialize_path().unwrap(); if let Some(ref q) = head.url.query { @@ -92,72 +95,89 @@ impl HttpMessage for Http11Message { try!(write!(&mut stream, "{} {} {}{}", head.method, uri, version, LINE_ENDING)); - let stream = match &head.method { - &Method::Get | &Method::Head => { + let stream = { + let mut write_headers = |mut stream: BufWriter>, head: &RequestHead| { debug!("headers={:?}", head.headers); - try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); - EmptyWriter(stream) - }, - _ => { - let mut chunked = true; - let mut len = 0; - - match head.headers.get::() { - Some(cl) => { - chunked = false; - len = **cl; - }, - None => () - }; - - // can't do in match above, thanks borrowck - if chunked { - let encodings = match head.headers.get_mut::() { - Some(&mut header::TransferEncoding(ref mut encodings)) => { - //TODO: check if chunked is already in encodings. use HashSet? - encodings.push(header::Encoding::Chunked); - false + match write!(&mut stream, "{}{}", head.headers, LINE_ENDING) { + Ok(_) => Ok(stream), + Err(e) => { + self.stream = Some(stream.into_inner().unwrap()); + Err(e) + } + } + }; + match &head.method { + &Method::Get | &Method::Head => { + EmptyWriter(try!(write_headers(stream, &head))) + }, + _ => { + let mut chunked = true; + let mut len = 0; + + match head.headers.get::() { + Some(cl) => { + chunked = false; + len = **cl; }, - None => true + None => () }; - if encodings { - head.headers.set::( - header::TransferEncoding(vec![header::Encoding::Chunked])) + // can't do in match above, thanks borrowck + if chunked { + let encodings = match head.headers.get_mut::() { + Some(encodings) => { + //TODO: check if chunked is already in encodings. use HashSet? + encodings.push(header::Encoding::Chunked); + false + }, + None => true + }; + + if encodings { + head.headers.set( + header::TransferEncoding(vec![header::Encoding::Chunked])) + } } - } - debug!("headers={:?}", head.headers); - try!(write!(&mut stream, "{}{}", head.headers, LINE_ENDING)); + let stream = try!(write_headers(stream, &head)); - if chunked { - ChunkedWriter(stream) - } else { - SizedWriter(stream, len) + if chunked { + ChunkedWriter(stream) + } else { + SizedWriter(stream, len) + } } } }; - self.method = Some(head.method.clone()); self.writer = Some(stream); + self.method = Some(head.method.clone()); Ok(head) } fn get_incoming(&mut self) -> ::Result { try!(self.flush_outgoing()); - if self.stream.is_none() { - // The message was already in the reading state... - // TODO Decide what happens in case we try to get a new incoming at that point - return Err(From::from( - io::Error::new(io::ErrorKind::Other, - "Read already in progress"))); - } + let stream = match self.stream.take() { + Some(stream) => stream, + None => { + // The message was already in the reading state... + // TODO Decide what happens in case we try to get a new incoming at that point + return Err(From::from( + io::Error::new(io::ErrorKind::Other, + "Read already in progress"))); + } + }; - let stream = self.stream.take().unwrap(); let mut stream = BufReader::new(stream); - let head = try!(parse_response(&mut stream)); + let head = match parse_response(&mut stream) { + Ok(head) => head, + Err(e) => { + self.stream = Some(stream.into_inner()); + return Err(e); + } + }; let raw_status = head.subject; let headers = head.headers; @@ -170,7 +190,7 @@ impl HttpMessage for Http11Message { // 5. Content-Length header has a sized body. // 6. Not Client. // 7. Read till EOF. - let body = match (method, raw_status.0) { + self.reader = Some(match (method, raw_status.0) { (Method::Head, _) => EmptyReader(stream), (_, 100...199) | (_, 204) | (_, 304) => EmptyReader(stream), (Method::Connect, 200...299) => EmptyReader(stream), @@ -192,9 +212,8 @@ impl HttpMessage for Http11Message { EofReader(stream) } } - }; + }); - self.reader = Some(body); Ok(ResponseHead { headers: headers, @@ -292,9 +311,15 @@ impl Http11Message { }; let writer = self.writer.take().unwrap(); - let raw = try!(writer.end()).into_inner().unwrap(); // end() already flushes + // end() already flushes + let raw = match writer.end() { + Ok(buf) => buf.into_inner().unwrap(), + Err(e) => { + self.writer = Some(e.1); + return Err(From::from(e.0)); + } + }; self.stream = Some(raw); - Ok(()) } } @@ -617,10 +642,25 @@ impl HttpWriter { /// A final `write_all()` is called with an empty message, and then flushed. /// The ChunkedWriter variant will use this to write the 0-sized last-chunk. #[inline] - pub fn end(mut self) -> io::Result { - try!(self.write(&[])); - try!(self.flush()); - Ok(self.into_inner()) + pub fn end(mut self) -> Result> { + fn inner(w: &mut W) -> io::Result<()> { + try!(w.write(&[])); + w.flush() + } + + match inner(&mut self) { + Ok(..) => Ok(self.into_inner()), + Err(e) => Err(EndError(e, self)) + } + } +} + +#[derive(Debug)] +pub struct EndError(io::Error, HttpWriter); + +impl From> for io::Error { + fn from(e: EndError) -> io::Error { + e.0 } }