Skip to content

Commit

Permalink
Trailer headers (#256)
Browse files Browse the repository at this point in the history
Add new API for accessing response trailer headers.

See #157.
  • Loading branch information
sagebind authored May 13, 2021
1 parent 68c6006 commit c3f70e9
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
timeout-minutes: 20
env:
RUST_BACKTRACE: 1
RUST_LOG: isahc=debug
Expand All @@ -41,6 +42,7 @@ jobs:

analyze:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v2
with:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async-channel = "1.6"
crossbeam-utils = "0.8"
curl = "0.4.36"
curl-sys = "0.4.42"
event-listener = "2.5"
futures-lite = "1.11"
http = "0.2.1"
log = "0.4"
Expand Down
29 changes: 25 additions & 4 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
metrics::Metrics,
parsing::{parse_header, parse_status_line},
response::{LocalAddr, RemoteAddr},
trailer::TrailerWriter,
};
use async_channel::Sender;
use curl::easy::{InfoType, ReadError, SeekResult, WriteError};
Expand Down Expand Up @@ -84,6 +85,10 @@ pub(crate) struct RequestHandler {
/// an agent when the request is initialized.
response_body_waker: Option<Waker>,

/// Holds the response trailer, if any. Used to communicate the trailer
/// headers out-of-band from the response headers and body.
response_trailer_writer: TrailerWriter,

/// Metrics object for publishing metrics data to. Lazily initialized.
metrics: Option<Metrics>,

Expand Down Expand Up @@ -131,6 +136,7 @@ impl RequestHandler {
response_headers: http::HeaderMap::new(),
response_body_writer,
response_body_waker: None,
response_trailer_writer: TrailerWriter::new(),
metrics: None,
handle: ptr::null_mut(),
};
Expand Down Expand Up @@ -175,10 +181,7 @@ impl RequestHandler {
}

fn is_future_canceled(&self) -> bool {
self.sender
.as_ref()
.map(Sender::is_closed)
.unwrap_or(false)
self.sender.as_ref().map(Sender::is_closed).unwrap_or(false)
}

/// Initialize the handler and prepare it for the request to begin.
Expand Down Expand Up @@ -211,6 +214,9 @@ impl RequestHandler {
tracing::debug!("attempted to set error multiple times");
}

// Flush the trailer, if we haven't already.
self.response_trailer_writer.flush();

// Complete the response future, if we haven't already.
self.complete_response_future();
}
Expand Down Expand Up @@ -262,6 +268,10 @@ impl RequestHandler {
// it. Otherwise we're just going to drop it later.
builder = builder.extension(RequestBody(mem::take(&mut self.request_body)));

// Include a handle to the trailer headers. We won't know if there
// are any until we reach the end of the response body.
builder = builder.extension(self.response_trailer_writer.trailer());

// Include metrics in response, but only if it was created. If
// metrics are disabled then it won't have been created.
if let Some(metrics) = self.metrics.clone() {
Expand Down Expand Up @@ -377,6 +387,17 @@ impl curl::easy::Handler for RequestHandler {
let span = tracing::trace_span!(parent: &self.span, "header");
let _enter = span.enter();

// If we already returned the response headers, then this header is from
// the trailer.
if self.sender.is_none() {
if let Some(trailer_headers) = self.response_trailer_writer.get_mut() {
if let Some((name, value)) = parse_header(data) {
trailer_headers.append(name, value);
return true;
}
}
}

// Curl calls this function for all lines in the response not part of
// the response body, not just for headers. We need to inspect the
// contents of the string in order to determine what it is and how to
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ mod request;
mod response;
mod task;
mod text;
mod trailer;

pub mod auth;
pub mod config;
Expand All @@ -273,6 +274,7 @@ pub use crate::{
metrics::Metrics,
request::RequestExt,
response::{AsyncReadResponseExt, ReadResponseExt, ResponseExt},
trailer::Trailer,
};

/// Re-export of HTTP types.
Expand Down
34 changes: 33 additions & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{metrics::Metrics, redirect::EffectiveUri};
use crate::{metrics::Metrics, redirect::EffectiveUri, trailer::Trailer};
use futures_lite::io::{copy as copy_async, AsyncRead, AsyncWrite};
use http::{Response, Uri};
use std::{
Expand All @@ -10,6 +10,31 @@ use std::{

/// Provides extension methods for working with HTTP responses.
pub trait ResponseExt<T> {
/// Get the trailer of the response containing headers that were received
/// after the response body.
///
/// See the documentation for [`Trailer`] for more details on how to handle
/// trailing headers.
///
/// # Examples
///
/// ```no_run
/// use isahc::prelude::*;
///
/// let mut response = isahc::get("https://my-site-with-trailers.com")?;
///
/// println!("Status: {}", response.status());
/// println!("Headers: {:#?}", response.headers());
///
/// // Read and discard the response body until the end.
/// response.consume()?;
///
/// // Now the trailer will be available as well.
/// println!("Trailing headers: {:#?}", response.trailer().try_get().unwrap());
/// # Ok::<(), isahc::Error>(())
/// ```
fn trailer(&self) -> &Trailer;

/// Get the effective URI of this response. This value differs from the
/// original URI provided when making the request if at least one redirect
/// was followed.
Expand Down Expand Up @@ -68,6 +93,13 @@ pub trait ResponseExt<T> {
}

impl<T> ResponseExt<T> for Response<T> {
fn trailer(&self) -> &Trailer {
// Return a static empty trailer if the extension does not exist. This
// offers a more convenient API so that users do not have to unwrap the
// trailer from an extra Option.
self.extensions().get().unwrap_or_else(|| Trailer::empty())
}

fn effective_uri(&self) -> Option<&Uri> {
self.extensions().get::<EffectiveUri>().map(|v| &v.0)
}
Expand Down
209 changes: 209 additions & 0 deletions src/trailer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use event_listener::Event;
use http::HeaderMap;
use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};

/// Holds the current state of a trailer for a response.
///
/// This object acts as a shared handle that can be cloned and polled from
/// multiple threads to wait for and act on the response trailer.
///
/// There are two typical workflows for accessing trailer headers:
///
/// - If you are consuming the response body and then accessing the headers
/// afterward, then all trailers are guaranteed to have arrived (if any).
/// [`Trailer::try_get`] will allow you to access them without extra overhead.
/// - If you are handling trailers in a separate task, callback, or thread, then
/// either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
/// for the trailer headers to arrive and then handle them.
///
/// Note that in either approach, trailer headers are delivered to your
/// application as a single [`HeaderMap`]; it is not possible to handle
/// individual headers as they arrive.
#[derive(Clone, Debug)]
pub struct Trailer {
shared: Arc<Shared>,
}

#[derive(Debug)]
struct Shared {
headers: OnceCell<HeaderMap>,
ready: Event,
}

impl Trailer {
/// Get a populated trailer handle containing no headers.
pub(crate) fn empty() -> &'static Self {
static EMPTY: OnceCell<Trailer> = OnceCell::new();

EMPTY.get_or_init(|| Self {
shared: Arc::new(Shared {
headers: OnceCell::from(HeaderMap::new()),
ready: Event::new(),
}),
})
}

/// Returns true if the trailer has been received (if any).
///
/// The trailer will not be received until the body stream associated with
/// this response has been fully consumed.
#[inline]
pub fn is_ready(&self) -> bool {
self.try_get().is_some()
}

/// Attempt to get the trailer headers without blocking. Returns `None` if
/// the trailer has not been received yet.
#[inline]
pub fn try_get(&self) -> Option<&HeaderMap> {
self.shared.headers.get()
}

/// Block the current thread until the trailer headers arrive, and then
/// return them.
///
/// This is a blocking operation! If you are writing an asynchronous
/// application, then you probably want to use [`Trailer::wait_async`]
/// instead.
pub fn wait(&self) -> &HeaderMap {
loop {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return headers;
}

// Headers not set, jump into the slow path by creating a new
// listener for the ready event.
let listener = self.shared.ready.listen();

// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return headers;
}

// Otherwise, block until they are set.
listener.wait();

// If we got the notification, then the headers are likely to be
// set.
if let Some(headers) = self.try_get() {
return headers;
}
}
}

/// Block the current thread until the trailer headers arrive or a timeout
/// expires.
///
/// If the given timeout expired before the trailer arrived then `None` is
/// returned.
///
/// This is a blocking operation! If you are writing an asynchronous
/// application, then you probably want to use [`Trailer::wait_async`]
/// instead.
pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return Some(headers);
}

// Headers not set, jump into the slow path by creating a new listener
// for the ready event.
let listener = self.shared.ready.listen();

// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return Some(headers);
}

// Otherwise, block with a timeout.
if listener.wait_timeout(timeout) {
self.try_get()
} else {
None
}
}

/// Wait asynchronously until the trailer headers arrive, and then return
/// them.
pub async fn wait_async(&self) -> &HeaderMap {
loop {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return headers;
}

// Headers not set, jump into the slow path by creating a new
// listener for the ready event.
let listener = self.shared.ready.listen();

// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return headers;
}

// Otherwise, wait asynchronously until they are.
listener.await;

// If we got the notification, then the headers are likely to be
// set.
if let Some(headers) = self.try_get() {
return headers;
}
}
}
}

pub(crate) struct TrailerWriter {
shared: Arc<Shared>,
headers: Option<HeaderMap>,
}

impl TrailerWriter {
pub(crate) fn new() -> Self {
Self {
shared: Arc::new(Shared {
headers: Default::default(),
ready: Event::new(),
}),
headers: Some(HeaderMap::new()),
}
}

pub(crate) fn trailer(&self) -> Trailer {
Trailer {
shared: self.shared.clone(),
}
}

pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
self.headers.as_mut()
}

#[inline]
pub(crate) fn flush(&mut self) {
if !self.flush_impl() {
tracing::warn!("tried to flush trailer multiple times");
}
}

fn flush_impl(&mut self) -> bool {
if let Some(headers) = self.headers.take() {
let _ = self.shared.headers.set(headers);

// Wake up any calls waiting for the headers.
self.shared.ready.notify(usize::max_value());

true
} else {
false
}
}
}

impl Drop for TrailerWriter {
fn drop(&mut self) {
self.flush_impl();
}
}
Loading

0 comments on commit c3f70e9

Please sign in to comment.