Skip to content

Commit

Permalink
http bugfix: add a received body length limiter with a default value
Browse files Browse the repository at this point in the history
and address a bug in polling chunked ReceivedBodies with long chunk sizes and short buffers
  • Loading branch information
jbr committed Oct 26, 2023
1 parent a70ac33 commit 7f56166
Show file tree
Hide file tree
Showing 10 changed files with 655 additions and 393 deletions.
50 changes: 50 additions & 0 deletions http/src/after_send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum SendStatus {
Success,
Failure,
}
impl From<bool> for SendStatus {
fn from(success: bool) -> Self {
if success {
Self::Success
} else {
Self::Failure
}
}
}

impl SendStatus {
pub fn is_success(self) -> bool {
SendStatus::Success == self
}
}

#[derive(Default)]
pub(crate) struct AfterSend(Option<Box<dyn FnOnce(SendStatus) + Send + Sync + 'static>>);

impl AfterSend {
pub(crate) fn call(&mut self, send_status: SendStatus) {
if let Some(after_send) = self.0.take() {
after_send(send_status);
}
}

pub(crate) fn append<F>(&mut self, after_send: F)
where
F: FnOnce(SendStatus) + Send + Sync + 'static,
{
self.0 = Some(match self.0.take() {
Some(existing_after_send) => Box::new(move |ss| {
existing_after_send(ss);
after_send(ss);
}),
None => Box::new(after_send),
});
}
}

impl Drop for AfterSend {
fn drop(&mut self) {
self.call(SendStatus::Failure);
}
}
171 changes: 14 additions & 157 deletions http/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::{
after_send::{AfterSend, SendStatus},
copy,
http_config::DEFAULT_CONFIG,
received_body::ReceivedBodyState,
util::encoding,
Body, BufWriter, ConnectionStatus, Error, HeaderName, HeaderValue, HeaderValues, Headers,
HttpConfig,
KnownHeaderName::{Connection, ContentLength, Date, Expect, Host, Server, TransferEncoding},
Method, ReceivedBody, Result, StateSet, Status, Stopper, Upgrade, Version,
};
Expand All @@ -16,63 +19,11 @@ use std::{
future::Future,
net::IpAddr,
str::FromStr,
sync::Arc,
time::{Instant, SystemTime},
};

const SERVER: &str = concat!("trillium/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum SendStatus {
Success,
Failure,
}
impl From<bool> for SendStatus {
fn from(success: bool) -> Self {
if success {
Self::Success
} else {
Self::Failure
}
}
}

impl SendStatus {
pub fn is_success(self) -> bool {
SendStatus::Success == self
}
}

#[derive(Default)]
pub(crate) struct AfterSend(Option<Box<dyn FnOnce(SendStatus) + Send + Sync + 'static>>);

impl AfterSend {
pub(crate) fn call(&mut self, send_status: SendStatus) {
if let Some(after_send) = self.0.take() {
after_send(send_status);
}
}

pub(crate) fn append<F>(&mut self, after_send: F)
where
F: FnOnce(SendStatus) + Send + Sync + 'static,
{
self.0 = Some(match self.0.take() {
Some(existing_after_send) => Box::new(move |ss| {
existing_after_send(ss);
after_send(ss);
}),
None => Box::new(after_send),
});
}
}

impl Drop for AfterSend {
fn drop(&mut self) {
self.call(SendStatus::Failure);
}
}

/** A http connection
Unlike in other rust http implementations, this struct represents both
Expand All @@ -96,7 +47,7 @@ pub struct Conn<Transport> {
pub(crate) after_send: AfterSend,
pub(crate) start_time: Instant,
pub(crate) peer_ip: Option<IpAddr>,
pub(crate) http_config: Arc<HttpConfig>,
pub(crate) http_config: HttpConfig,
}

impl<Transport> Debug for Conn<Transport> {
Expand Down Expand Up @@ -126,77 +77,6 @@ impl<Transport> Debug for Conn<Transport> {
}
}

#[derive(Clone, Debug)]
pub struct HttpConfig {
stopper: Stopper,
write_buffer_len: usize,
read_buffer_len: usize,
max_head_len: usize,
max_headers: usize,
initial_header_capacity: usize,
copy_loops_per_yield: usize,
}

// impl HttpConfig {
// pub fn with_stopper(mut self, stopper: Stopper) -> Self {
// self.stopper = stopper;
// self
// }

// pub fn with_write_buffer_len(mut self, write_buffer_len: usize) -> Self {
// self.write_buffer_len = write_buffer_len;
// self
// }

// pub fn with_read_buffer_len(mut self, read_buffer_len: usize) -> Self {
// self.read_buffer_len = read_buffer_len;
// self
// }

// pub fn with_max_head_len(mut self, max_head_len: usize) -> Self {
// self.max_head_len = max_head_len;
// self
// }

// pub fn with_max_headers(mut self, max_headers: usize) -> Self {
// self.max_headers = max_headers;
// self
// }

// pub fn with_initial_header_capacity(mut self, initial_header_capacity: usize) -> Self {
// self.initial_header_capacity = initial_header_capacity;
// self
// }

// pub fn with_copy_loops_per_yield(mut self, copy_loops_per_yield: usize) -> Self {
// self.copy_loops_per_yield = copy_loops_per_yield;
// self
// }
// }

impl Default for HttpConfig {
fn default() -> Self {
Self {
stopper: Stopper::default(),
write_buffer_len: 512,
read_buffer_len: 128,
max_head_len: 8 * 1024,
max_headers: 128,
initial_header_capacity: 16,
copy_loops_per_yield: 16,
}
}
}

impl From<Stopper> for HttpConfig {
fn from(stopper: Stopper) -> Self {
Self {
stopper,
..Self::default()
}
}
}

impl<Transport> Conn<Transport>
where
Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
Expand Down Expand Up @@ -230,39 +110,20 @@ where
F: Fn(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
{
Self::map_with_config(transport, Arc::new(stopper.into()), handler).await
Self::map_with_config(DEFAULT_CONFIG, stopper, transport, handler).await
}

/// read any number of new `Conn`s from the transport and call the
/// provided handler function until either the connection is closed or
/// an upgrade is requested. A return value of Ok(None) indicates a
/// closed connection, while a return value of Ok(Some(upgrade))
/// represents an upgrade.
///
/// See the documentation for [`Conn`] for a full example.
///
/// # Errors
///
/// This will return an error variant if:
///
/// * there is an io error when reading from the underlying transport
/// * headers are too long
/// * we are unable to parse some aspect of the request
/// * the request is an unsupported http version
/// * we cannot make sense of the headers, such as if there is a
/// `content-length` header as well as a `transfer-encoding: chunked`
/// header.

async fn map_with_config<F, Fut>(
http_config: HttpConfig,
stopper: Stopper,
transport: Transport,
config: Arc<HttpConfig>,
handler: F,
) -> Result<Option<Upgrade<Transport>>>
where
F: Fn(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
{
let mut conn = Conn::new_with_config(transport, None, config).await?;
let mut conn = Conn::new_with_config(http_config, transport, None, stopper).await?;

loop {
conn = match handler(conn).await.send().await? {
Expand Down Expand Up @@ -598,7 +459,7 @@ where
bytes: Option<Vec<u8>>,
stopper: Stopper,
) -> Result<Self> {
Self::new_with_config(transport, bytes, Arc::new(stopper.into())).await
Self::new_with_config(DEFAULT_CONFIG, transport, bytes, stopper).await
}

/// # Create a new `Conn`
Expand All @@ -621,13 +482,13 @@ where
/// `content-length` header as well as a `transfer-encoding: chunked`
/// header.
async fn new_with_config(
http_config: HttpConfig,
transport: Transport,
bytes: Option<Vec<u8>>,
http_config: Arc<HttpConfig>,
stopper: Stopper,
) -> Result<Self> {
let stopper = http_config.stopper.clone();
let (transport, buf, extra_bytes, start_time) =
Self::head(transport, bytes, &http_config).await?;
Self::head(transport, bytes, &stopper, &http_config).await?;

let buffer = if extra_bytes.is_empty() {
None
Expand Down Expand Up @@ -795,6 +656,7 @@ where
async fn head(
mut transport: Transport,
bytes: Option<Vec<u8>>,
stopper: &Stopper,
http_config: &HttpConfig,
) -> Result<(Transport, Vec<u8>, Vec<u8>, Instant)> {
let mut buf = bytes.unwrap_or_default();
Expand All @@ -805,19 +667,14 @@ where
let mut resize_by = http_config.read_buffer_len;
loop {
if len >= http_config.max_head_len {
log::error!(
"headers were {len}, greater than {}",
http_config.max_head_len
);
return Err(Error::HeadersTooLong);
}

let bytes = if start_with_read {
buf.resize(buf.len() + resize_by, 0);
resize_by *= 2;
if len == 0 {
http_config
.stopper
stopper
.stop_future(transport.read(&mut buf[..]))
.await
.ok_or(Error::Closed)??
Expand Down
12 changes: 9 additions & 3 deletions http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,16 @@ pub enum Error {
#[error("unexpected header: {0}")]
UnexpectedHeader(&'static str),

/// for security reasons, we do not allow request headers beyond
/// 8kb.
#[error("Head byte length should be less than 8kb")]
/// to mitigate against malicious http clients, we do not allow request headers beyond this
/// length.
#[error("Headers were malformed or longer than allowed")]
HeadersTooLong,

/// to mitigate against malicious http clients, we do not read received bodies beyond this
/// length to memory. If you need to receive longer bodies, use the Stream or AsyncRead
/// implementation on ReceivedBody
#[error("Received body too long. Maximum {0} bytes")]
ReceivedBodyTooLong(u64),
}

/// this crate's result type
Expand Down
Loading

0 comments on commit 7f56166

Please sign in to comment.