Skip to content

Commit

Permalink
move tunable numeric literals into a HttpConfig struct
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Oct 26, 2023
1 parent 479a2fa commit a70ac33
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 29 deletions.
182 changes: 160 additions & 22 deletions http/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::BufWriter;
use crate::{
copy,
received_body::ReceivedBodyState,
util::encoding,
Body, ConnectionStatus, Error, HeaderValues, Headers,
Body, BufWriter, ConnectionStatus, Error, HeaderName, HeaderValue, HeaderValues, Headers,
KnownHeaderName::{Connection, ContentLength, Date, Expect, Host, Server, TransferEncoding},
Method, ReceivedBody, Result, StateSet, Status, Stopper, Upgrade, Version,
};
Expand All @@ -17,11 +16,10 @@ use std::{
future::Future,
net::IpAddr,
str::FromStr,
sync::Arc,
time::{Instant, SystemTime},
};

const MAX_HEADERS: usize = 128;
const MAX_HEAD_LENGTH: usize = 8 * 1024;
const SERVER: &str = concat!("trillium/", env!("CARGO_PKG_VERSION"));

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -98,11 +96,13 @@ pub struct Conn<Transport> {
pub(crate) after_send: AfterSend,
pub(crate) start_time: Instant,
pub(crate) peer_ip: Option<IpAddr>,
pub(crate) http_config: Arc<HttpConfig>,
}

impl<Transport> Debug for Conn<Transport> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Conn")
.field("http_config", &self.http_config)
.field("request_headers", &self.request_headers)
.field("response_headers", &self.response_headers)
.field("path", &self.path)
Expand All @@ -126,6 +126,77 @@ impl<Transport> Debug for Conn<Transport> {
}
}

#[derive(Clone, Debug)]
pub struct HttpConfig {
stopper: Stopper,
write_buffer_len: usize,
read_buffer_len: usize,
max_head_len: usize,
max_headers: usize,
initial_header_capacity: usize,
copy_loops_per_yield: usize,
}

// impl HttpConfig {
// pub fn with_stopper(mut self, stopper: Stopper) -> Self {
// self.stopper = stopper;
// self
// }

// pub fn with_write_buffer_len(mut self, write_buffer_len: usize) -> Self {
// self.write_buffer_len = write_buffer_len;
// self
// }

// pub fn with_read_buffer_len(mut self, read_buffer_len: usize) -> Self {
// self.read_buffer_len = read_buffer_len;
// self
// }

// pub fn with_max_head_len(mut self, max_head_len: usize) -> Self {
// self.max_head_len = max_head_len;
// self
// }

// pub fn with_max_headers(mut self, max_headers: usize) -> Self {
// self.max_headers = max_headers;
// self
// }

// pub fn with_initial_header_capacity(mut self, initial_header_capacity: usize) -> Self {
// self.initial_header_capacity = initial_header_capacity;
// self
// }

// pub fn with_copy_loops_per_yield(mut self, copy_loops_per_yield: usize) -> Self {
// self.copy_loops_per_yield = copy_loops_per_yield;
// self
// }
// }

impl Default for HttpConfig {
fn default() -> Self {
Self {
stopper: Stopper::default(),
write_buffer_len: 512,
read_buffer_len: 128,
max_head_len: 8 * 1024,
max_headers: 128,
initial_header_capacity: 16,
copy_loops_per_yield: 16,
}
}
}

impl From<Stopper> for HttpConfig {
fn from(stopper: Stopper) -> Self {
Self {
stopper,
..Self::default()
}
}
}

impl<Transport> Conn<Transport>
where
Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
Expand Down Expand Up @@ -159,7 +230,39 @@ where
F: Fn(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
{
let mut conn = Conn::new(transport, None, stopper).await?;
Self::map_with_config(transport, Arc::new(stopper.into()), handler).await
}

/// read any number of new `Conn`s from the transport and call the
/// provided handler function until either the connection is closed or
/// an upgrade is requested. A return value of Ok(None) indicates a
/// closed connection, while a return value of Ok(Some(upgrade))
/// represents an upgrade.
///
/// See the documentation for [`Conn`] for a full example.
///
/// # Errors
///
/// This will return an error variant if:
///
/// * there is an io error when reading from the underlying transport
/// * headers are too long
/// * we are unable to parse some aspect of the request
/// * the request is an unsupported http version
/// * we cannot make sense of the headers, such as if there is a
/// `content-length` header as well as a `transfer-encoding: chunked`
/// header.

async fn map_with_config<F, Fut>(
transport: Transport,
config: Arc<HttpConfig>,
handler: F,
) -> Result<Option<Upgrade<Transport>>>
where
F: Fn(Conn<Transport>) -> Fut,
Fut: Future<Output = Conn<Transport>> + Send,
{
let mut conn = Conn::new_with_config(transport, None, config).await?;

loop {
conn = match handler(conn).await.send().await? {
Expand All @@ -171,7 +274,7 @@ where
}

async fn send(mut self) -> Result<ConnectionStatus<Transport>> {
let mut output_buffer = Vec::with_capacity(512);
let mut output_buffer = Vec::with_capacity(self.http_config.write_buffer_len);
self.write_headers(&mut output_buffer)?;

let mut bufwriter = BufWriter::new_with_buffer(output_buffer, &mut self.transport);
Expand All @@ -180,7 +283,7 @@ where
&& !matches!(self.status, Some(Status::NotModified | Status::NoContent))
{
if let Some(body) = self.response_body.take() {
copy(body, &mut bufwriter).await?;
copy(body, &mut bufwriter, self.http_config.copy_loops_per_yield).await?;
}
}

Expand Down Expand Up @@ -495,16 +598,44 @@ where
bytes: Option<Vec<u8>>,
stopper: Stopper,
) -> Result<Self> {
Self::new_with_config(transport, bytes, Arc::new(stopper.into())).await
}

/// # Create a new `Conn`
///
/// This function creates a new conn from the provided
/// [`Transport`][crate::transport::Transport], as well as any
/// bytes that have already been read from the transport, and a
/// [`Stopper`] instance that will be used to signal graceful
/// shutdown.
///
/// # Errors
///
/// This will return an error variant if:
///
/// * there is an io error when reading from the underlying transport
/// * headers are too long
/// * we are unable to parse some aspect of the request
/// * the request is an unsupported http version
/// * we cannot make sense of the headers, such as if there is a
/// `content-length` header as well as a `transfer-encoding: chunked`
/// header.
async fn new_with_config(
transport: Transport,
bytes: Option<Vec<u8>>,
http_config: Arc<HttpConfig>,
) -> Result<Self> {
let stopper = http_config.stopper.clone();
let (transport, buf, extra_bytes, start_time) =
Self::head(transport, bytes, &stopper).await?;
Self::head(transport, bytes, &http_config).await?;

let buffer = if extra_bytes.is_empty() {
None
} else {
Some(extra_bytes)
};

let mut headers = [EMPTY_HEADER; MAX_HEADERS];
let mut headers = vec![EMPTY_HEADER; http_config.max_headers];
let mut httparse_req = Request::new(&mut headers);

let status = httparse_req.parse(&buf[..])?;
Expand All @@ -530,8 +661,8 @@ where

let mut request_headers = Headers::with_capacity(httparse_req.headers.len());
for header in httparse_req.headers {
let header_name = crate::HeaderName::from_str(header.name)?;
let header_value = crate::HeaderValue::from(header.value.to_owned());
let header_name = HeaderName::from_str(header.name)?;
let header_value = HeaderValue::from(header.value.to_owned());
request_headers.append(header_name, header_value);
}

Expand All @@ -543,7 +674,7 @@ where
.to_owned();
log::debug!("received:\n{method} {path} {version}\n{request_headers}");

let response_headers = Self::build_response_headers();
let response_headers = Self::build_response_headers(&http_config);

Ok(Self {
transport,
Expand All @@ -562,19 +693,20 @@ where
after_send: AfterSend::default(),
start_time,
peer_ip: None,
http_config,
})
}

fn build_response_headers() -> Headers {
[
fn build_response_headers(config: &HttpConfig) -> Headers {
let mut headers = Headers::with_capacity(config.initial_header_capacity);
headers.extend([
(
Date,
HeaderValues::from(httpdate::fmt_http_date(SystemTime::now())),
),
(Server, HeaderValues::from(SERVER)),
]
.into_iter()
.collect()
]);
headers
}

/// predicate function to indicate whether the connection is
Expand Down Expand Up @@ -663,25 +795,29 @@ where
async fn head(
mut transport: Transport,
bytes: Option<Vec<u8>>,
stopper: &Stopper,
http_config: &HttpConfig,
) -> Result<(Transport, Vec<u8>, Vec<u8>, Instant)> {
let mut buf = bytes.unwrap_or_default();
let mut len = 0;
let mut start_with_read = buf.is_empty();
let mut instant = None;
let finder = Finder::new(b"\r\n\r\n");
let mut resize_by = 128;
let mut resize_by = http_config.read_buffer_len;
loop {
if len >= MAX_HEAD_LENGTH {
log::error!("headers were {len}, greater than {MAX_HEAD_LENGTH}");
if len >= http_config.max_head_len {
log::error!(
"headers were {len}, greater than {}",
http_config.max_head_len
);
return Err(Error::HeadersTooLong);
}

let bytes = if start_with_read {
buf.resize(buf.len() + resize_by, 0);
resize_by *= 2;
if len == 0 {
stopper
http_config
.stopper
.stop_future(transport.read(&mut buf[..]))
.await
.ok_or(Error::Closed)??
Expand Down Expand Up @@ -846,6 +982,7 @@ where
after_send,
start_time,
peer_ip,
http_config,
} = self;

Conn {
Expand All @@ -865,6 +1002,7 @@ where
after_send,
start_time,
peer_ip,
http_config,
}
}

Expand Down
10 changes: 5 additions & 5 deletions http/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use std::{
task::{Context, Poll},
};

// to be tuned, not sure exactly how
const MAX_LOOPS: usize = 16;

pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
pub async fn copy<R, W>(reader: R, writer: W, loops_per_yield: usize) -> Result<u64>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
Expand All @@ -18,6 +15,7 @@ where
reader: BufReader<R>,
writer: W,
amt: u64,
loops_per_yield: usize,
}

impl<R, W> Future for CopyFuture<R, W>
Expand All @@ -28,12 +26,13 @@ where
type Output = Result<u64>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for loop_number in 0..MAX_LOOPS {
for loop_number in 0..self.loops_per_yield {
log::trace!("copy loop number: {loop_number}");
let CopyFuture {
reader,
writer,
amt,
..
} = &mut *self;

let writer = Pin::new(writer);
Expand Down Expand Up @@ -61,6 +60,7 @@ where
reader: BufReader::new(reader),
writer,
amt: 0,
loops_per_yield,
};
future.await
}
8 changes: 6 additions & 2 deletions http/src/synthetic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::{
conn::AfterSend, received_body::ReceivedBodyState, transport::Transport, Conn, Headers,
KnownHeaderName, Method, StateSet, Stopper, Version,
conn::{AfterSend, HttpConfig},
received_body::ReceivedBodyState,
transport::Transport,
Conn, Headers, KnownHeaderName, Method, StateSet, Stopper, Version,
};
use futures_lite::io::{AsyncRead, AsyncWrite, Result};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
Expand Down Expand Up @@ -140,6 +143,7 @@ impl Conn<Synthetic> {
after_send: AfterSend::default(),
start_time: Instant::now(),
peer_ip: None,
http_config: Arc::new(HttpConfig::default()),
}
}

Expand Down

0 comments on commit a70ac33

Please sign in to comment.