diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71ce612b..a7577475 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,6 +16,7 @@ jobs: - macos-latest - windows-latest runs-on: ${{ matrix.os }} + timeout-minutes: 20 env: RUST_BACKTRACE: 1 RUST_LOG: isahc=debug @@ -41,6 +42,7 @@ jobs: analyze: runs-on: ubuntu-latest + timeout-minutes: 20 steps: - uses: actions/checkout@v2 with: diff --git a/Cargo.toml b/Cargo.toml index ab46a316..5cd4fde0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ async-channel = "1.6" crossbeam-utils = "0.8" curl = "0.4.36" curl-sys = "0.4.42" +event-listener = "2.5" futures-lite = "1.11" http = "0.2.1" log = "0.4" diff --git a/src/handler.rs b/src/handler.rs index fb680ee4..259e0162 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -6,6 +6,7 @@ use crate::{ metrics::Metrics, parsing::{parse_header, parse_status_line}, response::{LocalAddr, RemoteAddr}, + trailer::TrailerWriter, }; use async_channel::Sender; use curl::easy::{InfoType, ReadError, SeekResult, WriteError}; @@ -84,6 +85,10 @@ pub(crate) struct RequestHandler { /// an agent when the request is initialized. response_body_waker: Option, + /// Holds the response trailer, if any. Used to communicate the trailer + /// headers out-of-band from the response headers and body. + response_trailer_writer: TrailerWriter, + /// Metrics object for publishing metrics data to. Lazily initialized. metrics: Option, @@ -131,6 +136,7 @@ impl RequestHandler { response_headers: http::HeaderMap::new(), response_body_writer, response_body_waker: None, + response_trailer_writer: TrailerWriter::new(), metrics: None, handle: ptr::null_mut(), }; @@ -175,10 +181,7 @@ impl RequestHandler { } fn is_future_canceled(&self) -> bool { - self.sender - .as_ref() - .map(Sender::is_closed) - .unwrap_or(false) + self.sender.as_ref().map(Sender::is_closed).unwrap_or(false) } /// Initialize the handler and prepare it for the request to begin. @@ -211,6 +214,9 @@ impl RequestHandler { tracing::debug!("attempted to set error multiple times"); } + // Flush the trailer, if we haven't already. + self.response_trailer_writer.flush(); + // Complete the response future, if we haven't already. self.complete_response_future(); } @@ -262,6 +268,10 @@ impl RequestHandler { // it. Otherwise we're just going to drop it later. builder = builder.extension(RequestBody(mem::take(&mut self.request_body))); + // Include a handle to the trailer headers. We won't know if there + // are any until we reach the end of the response body. + builder = builder.extension(self.response_trailer_writer.trailer()); + // Include metrics in response, but only if it was created. If // metrics are disabled then it won't have been created. if let Some(metrics) = self.metrics.clone() { @@ -377,6 +387,17 @@ impl curl::easy::Handler for RequestHandler { let span = tracing::trace_span!(parent: &self.span, "header"); let _enter = span.enter(); + // If we already returned the response headers, then this header is from + // the trailer. + if self.sender.is_none() { + if let Some(trailer_headers) = self.response_trailer_writer.get_mut() { + if let Some((name, value)) = parse_header(data) { + trailer_headers.append(name, value); + return true; + } + } + } + // Curl calls this function for all lines in the response not part of // the response body, not just for headers. We need to inspect the // contents of the string in order to determine what it is and how to diff --git a/src/lib.rs b/src/lib.rs index 148636e5..94c6b801 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -254,6 +254,7 @@ mod request; mod response; mod task; mod text; +mod trailer; pub mod auth; pub mod config; @@ -273,6 +274,7 @@ pub use crate::{ metrics::Metrics, request::RequestExt, response::{AsyncReadResponseExt, ReadResponseExt, ResponseExt}, + trailer::Trailer, }; /// Re-export of HTTP types. diff --git a/src/response.rs b/src/response.rs index b67006c6..4fbe83a6 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,4 +1,4 @@ -use crate::{metrics::Metrics, redirect::EffectiveUri}; +use crate::{metrics::Metrics, redirect::EffectiveUri, trailer::Trailer}; use futures_lite::io::{copy as copy_async, AsyncRead, AsyncWrite}; use http::{Response, Uri}; use std::{ @@ -10,6 +10,31 @@ use std::{ /// Provides extension methods for working with HTTP responses. pub trait ResponseExt { + /// Get the trailer of the response containing headers that were received + /// after the response body. + /// + /// See the documentation for [`Trailer`] for more details on how to handle + /// trailing headers. + /// + /// # Examples + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// let mut response = isahc::get("https://my-site-with-trailers.com")?; + /// + /// println!("Status: {}", response.status()); + /// println!("Headers: {:#?}", response.headers()); + /// + /// // Read and discard the response body until the end. + /// response.consume()?; + /// + /// // Now the trailer will be available as well. + /// println!("Trailing headers: {:#?}", response.trailer().try_get().unwrap()); + /// # Ok::<(), isahc::Error>(()) + /// ``` + fn trailer(&self) -> &Trailer; + /// Get the effective URI of this response. This value differs from the /// original URI provided when making the request if at least one redirect /// was followed. @@ -68,6 +93,13 @@ pub trait ResponseExt { } impl ResponseExt for Response { + fn trailer(&self) -> &Trailer { + // Return a static empty trailer if the extension does not exist. This + // offers a more convenient API so that users do not have to unwrap the + // trailer from an extra Option. + self.extensions().get().unwrap_or_else(|| Trailer::empty()) + } + fn effective_uri(&self) -> Option<&Uri> { self.extensions().get::().map(|v| &v.0) } diff --git a/src/trailer.rs b/src/trailer.rs new file mode 100644 index 00000000..70cad5f0 --- /dev/null +++ b/src/trailer.rs @@ -0,0 +1,209 @@ +use event_listener::Event; +use http::HeaderMap; +use once_cell::sync::OnceCell; +use std::{sync::Arc, time::Duration}; + +/// Holds the current state of a trailer for a response. +/// +/// This object acts as a shared handle that can be cloned and polled from +/// multiple threads to wait for and act on the response trailer. +/// +/// There are two typical workflows for accessing trailer headers: +/// +/// - If you are consuming the response body and then accessing the headers +/// afterward, then all trailers are guaranteed to have arrived (if any). +/// [`Trailer::try_get`] will allow you to access them without extra overhead. +/// - If you are handling trailers in a separate task, callback, or thread, then +/// either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait +/// for the trailer headers to arrive and then handle them. +/// +/// Note that in either approach, trailer headers are delivered to your +/// application as a single [`HeaderMap`]; it is not possible to handle +/// individual headers as they arrive. +#[derive(Clone, Debug)] +pub struct Trailer { + shared: Arc, +} + +#[derive(Debug)] +struct Shared { + headers: OnceCell, + ready: Event, +} + +impl Trailer { + /// Get a populated trailer handle containing no headers. + pub(crate) fn empty() -> &'static Self { + static EMPTY: OnceCell = OnceCell::new(); + + EMPTY.get_or_init(|| Self { + shared: Arc::new(Shared { + headers: OnceCell::from(HeaderMap::new()), + ready: Event::new(), + }), + }) + } + + /// Returns true if the trailer has been received (if any). + /// + /// The trailer will not be received until the body stream associated with + /// this response has been fully consumed. + #[inline] + pub fn is_ready(&self) -> bool { + self.try_get().is_some() + } + + /// Attempt to get the trailer headers without blocking. Returns `None` if + /// the trailer has not been received yet. + #[inline] + pub fn try_get(&self) -> Option<&HeaderMap> { + self.shared.headers.get() + } + + /// Block the current thread until the trailer headers arrive, and then + /// return them. + /// + /// This is a blocking operation! If you are writing an asynchronous + /// application, then you probably want to use [`Trailer::wait_async`] + /// instead. + pub fn wait(&self) -> &HeaderMap { + loop { + // Fast path: If the headers are already set, return them. + if let Some(headers) = self.try_get() { + return headers; + } + + // Headers not set, jump into the slow path by creating a new + // listener for the ready event. + let listener = self.shared.ready.listen(); + + // Double-check that the headers are not set. + if let Some(headers) = self.try_get() { + return headers; + } + + // Otherwise, block until they are set. + listener.wait(); + + // If we got the notification, then the headers are likely to be + // set. + if let Some(headers) = self.try_get() { + return headers; + } + } + } + + /// Block the current thread until the trailer headers arrive or a timeout + /// expires. + /// + /// If the given timeout expired before the trailer arrived then `None` is + /// returned. + /// + /// This is a blocking operation! If you are writing an asynchronous + /// application, then you probably want to use [`Trailer::wait_async`] + /// instead. + pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> { + // Fast path: If the headers are already set, return them. + if let Some(headers) = self.try_get() { + return Some(headers); + } + + // Headers not set, jump into the slow path by creating a new listener + // for the ready event. + let listener = self.shared.ready.listen(); + + // Double-check that the headers are not set. + if let Some(headers) = self.try_get() { + return Some(headers); + } + + // Otherwise, block with a timeout. + if listener.wait_timeout(timeout) { + self.try_get() + } else { + None + } + } + + /// Wait asynchronously until the trailer headers arrive, and then return + /// them. + pub async fn wait_async(&self) -> &HeaderMap { + loop { + // Fast path: If the headers are already set, return them. + if let Some(headers) = self.try_get() { + return headers; + } + + // Headers not set, jump into the slow path by creating a new + // listener for the ready event. + let listener = self.shared.ready.listen(); + + // Double-check that the headers are not set. + if let Some(headers) = self.try_get() { + return headers; + } + + // Otherwise, wait asynchronously until they are. + listener.await; + + // If we got the notification, then the headers are likely to be + // set. + if let Some(headers) = self.try_get() { + return headers; + } + } + } +} + +pub(crate) struct TrailerWriter { + shared: Arc, + headers: Option, +} + +impl TrailerWriter { + pub(crate) fn new() -> Self { + Self { + shared: Arc::new(Shared { + headers: Default::default(), + ready: Event::new(), + }), + headers: Some(HeaderMap::new()), + } + } + + pub(crate) fn trailer(&self) -> Trailer { + Trailer { + shared: self.shared.clone(), + } + } + + pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> { + self.headers.as_mut() + } + + #[inline] + pub(crate) fn flush(&mut self) { + if !self.flush_impl() { + tracing::warn!("tried to flush trailer multiple times"); + } + } + + fn flush_impl(&mut self) -> bool { + if let Some(headers) = self.headers.take() { + let _ = self.shared.headers.set(headers); + + // Wake up any calls waiting for the headers. + self.shared.ready.notify(usize::max_value()); + + true + } else { + false + } + } +} + +impl Drop for TrailerWriter { + fn drop(&mut self) { + self.flush_impl(); + } +} diff --git a/tests/headers.rs b/tests/headers.rs index 0c0d85fd..066ff264 100644 --- a/tests/headers.rs +++ b/tests/headers.rs @@ -1,4 +1,11 @@ +use futures_lite::future::block_on; use isahc::{prelude::*, HttpClient, Request}; +use std::{ + io::{self, Write}, + net::{Shutdown, TcpListener, TcpStream}, + thread, + time::Duration, +}; use testserver::mock; #[test] @@ -185,3 +192,151 @@ fn headers_in_request_builder_must_override_multiple_headers_in_httpclient_build .expect_header("accept-encoding", "deflate, gzip"); m.request().expect_header("X-header", "some-value3"); } + +#[test] +fn trailer_headers() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + + thread::spawn(move || { + let mut stream = listener.accept().unwrap().0; + + consume_request_in_background(&stream); + + stream + .write_all( + b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + trailer: foo\r\n\ + \r\n\ + 2\r\n\ + OK\r\n\ + 0\r\n\ + foo: bar\r\n\ + \r\n\ + ", + ) + .unwrap(); + + let _ = stream.shutdown(Shutdown::Write); + }); + + let mut body = None; + let response = isahc::get(url).unwrap().map(|b| { + body = Some(b); + () + }); + + thread::spawn(move || { + io::copy(body.as_mut().unwrap(), &mut io::sink()).unwrap(); + }); + + assert_eq!(response.trailer().wait().get("foo").unwrap(), "bar"); +} + +#[test] +fn trailer_headers_async() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + + thread::spawn(move || { + let mut stream = listener.accept().unwrap().0; + + consume_request_in_background(&stream); + + stream + .write_all( + b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + trailer: foo\r\n\ + \r\n\ + 2\r\n\ + OK\r\n\ + 0\r\n\ + foo: bar\r\n\ + \r\n\ + ", + ) + .unwrap(); + + let _ = stream.shutdown(Shutdown::Write); + }); + + block_on(async move { + let mut body = None; + let response = isahc::get_async(url).await.unwrap().map(|b| { + body = Some(b); + () + }); + + thread::spawn(move || { + block_on(async move { + futures_lite::io::copy(body.as_mut().unwrap(), &mut futures_lite::io::sink()) + .await + .unwrap(); + }) + }); + + assert_eq!( + response.trailer().wait_async().await.get("foo").unwrap(), + "bar" + ); + }); +} + +#[test] +fn trailer_headers_timeout() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + + thread::spawn(move || { + let mut stream = listener.accept().unwrap().0; + stream.set_nodelay(true).unwrap(); + + consume_request_in_background(&stream); + + stream + .write_all( + b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + trailer: foo\r\n\ + \r\n", + ) + .unwrap(); + + for _ in 0..1000 { + stream.write_all(b"5\r\nhello\r\n").unwrap(); + } + + stream.write_all(b"0\r\n").unwrap(); + + thread::sleep(Duration::from_millis(200)); + + stream.write_all(b"foo: bar\r\n\r\n").unwrap(); + + let _ = stream.shutdown(Shutdown::Write); + }); + + let response = isahc::get(url).unwrap(); + + // Since we don't consume the response body and the trailer is in a separate + // packet from the header, we won't receive the trailer in time. + assert!( + response + .trailer() + .wait_timeout(Duration::from_millis(10)) + .is_none() + ); +} + +fn consume_request_in_background(stream: &TcpStream) { + let mut stream = stream.try_clone().unwrap(); + + thread::spawn(move || { + let _ = io::copy(&mut stream, &mut io::sink()); + let _ = stream.shutdown(Shutdown::Read); + }); +}