Skip to content

Commit

Permalink
Slightly improve HTTP server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
LaylBongers committed Aug 12, 2023
1 parent 076b60c commit bc44fe6
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 33 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ thiserror = "1.0"
thunderdome = "0.6.0"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tracing-test = "0.2.4"
uuid = "1.4"
stewart = { version = "0.10.0-dev", path = "./crates/stewart" }
stewart-http = { version = "0.1.0-dev", path = "./crates/stewart-http" }
Expand Down
72 changes: 45 additions & 27 deletions crates/stewart-http/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ struct Service {
events: Sender<ConnectionEvent>,
http_events: Sender<HttpEvent>,

receive_buffer: String,
tcp_closed: bool,
receive_buffer: BytesMut,
pending_requests: VecDeque<Mailbox<RequestAction>>,
}

Expand All @@ -62,8 +62,8 @@ impl Service {
events,
http_events,

receive_buffer: String::new(),
tcp_closed: false,
receive_buffer: BytesMut::new(),
pending_requests: VecDeque::new(),
}
}
Expand Down Expand Up @@ -99,28 +99,7 @@ impl Actor for Service {
}

self.process_actions()?;

// Check if we have a full request worth of data
// TODO: This is very incorrect and really should be be redesigned entirely
let split = self.receive_buffer.split_once("\r\n\r\n");
if let Some((_left, right)) = split {
self.receive_buffer = right.to_string();

event!(Level::DEBUG, "received request");

let mailbox = Mailbox::default();
let signal = world.signal(meta.id());
mailbox.set_signal(signal);

// Send the request event
let event = RequestEvent {
actions: mailbox.sender(),
};
self.http_events.send(HttpEvent::Request(event))?;

// Track the request
self.pending_requests.push_back(mailbox);
}
self.process_received(world, meta)?;

// Check requests we can resolve
// HTTP 1.1 sends back responses in the same order as requests, so we only check the first
Expand All @@ -145,9 +124,7 @@ impl Service {
match event {
tcp::ConnectionEvent::Recv(event) => {
event!(Level::TRACE, bytes = event.data.len(), "received data");

let data = std::str::from_utf8(&event.data)?;
self.receive_buffer.push_str(data);
self.receive_buffer.extend(&event.data);
}
tcp::ConnectionEvent::Closed => {
event!(Level::DEBUG, "connection closed");
Expand All @@ -173,6 +150,47 @@ impl Service {
Ok(())
}

/// Process pending data previously received, but not yet processed.
fn process_received(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
// TODO: Instead of checking every time if we have a full header's worth, do something
// smarter.
// TODO: Handle requests with body content

loop {
// Check if we have a full request worth of data left in the buffer
let location = self
.receive_buffer
.windows(4)
.enumerate()
.find(|(_, window)| window == b"\r\n\r\n")
.map(|(i, _)| i);
let Some(location) = location else { break; };

event!(Level::DEBUG, "received request");

// Split off the request we have to process
let request = self.receive_buffer.split_to(location + 4);
let data = std::str::from_utf8(&request)?;
println!("REQUEST: {:?}", data);

// Create the mailbox to send a response back through
let mailbox = Mailbox::default();
let signal = world.signal(meta.id());
mailbox.set_signal(signal);

// Send the request event
let event = RequestEvent {
actions: mailbox.sender(),
};
self.http_events.send(HttpEvent::Request(event))?;

// Track the request
self.pending_requests.push_back(mailbox);
}

Ok(())
}

fn send_response(&mut self, body: Bytes) -> Result<(), Error> {
// Send the response
let mut data = BytesMut::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/stewart-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod listener;
use bytes::Bytes;
use stewart::message::Sender;

pub use self::listener::listen;
pub use self::listener::bind;

pub enum HttpEvent {
Request(RequestEvent),
Expand Down
5 changes: 3 additions & 2 deletions crates/stewart-http/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use tracing::{event, Level};

use crate::{connection, HttpEvent};

pub fn listen(
/// Open a HTTP listener on the given address.
pub fn bind(
world: &mut World,
registry: RegistryRef,
addr: SocketAddr,
http_events: Sender<HttpEvent>,
) -> Result<(), Error> {
let actor = Service::new(world, registry, addr, http_events)?;
world.insert("http-server", actor)?;
world.insert("http-listener", actor)?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/stewart-mio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct ConnectedEvent {
///
/// TCP, unlike UDP, works with ongoing connections.
/// Before a connection is established, you first need to 'listen' for those on a port.
#[instrument("tcp::listen", skip_all)]
#[instrument("tcp::bind", skip_all)]
pub fn bind(
world: &mut World,
registry: RegistryRef,
Expand Down
2 changes: 1 addition & 1 deletion examples/html_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Service {
let http_events = Mailbox::default();

let addr = "127.0.0.1:1234".parse()?;
stewart_http::listen(world, registry, addr, http_events.sender())?;
stewart_http::bind(world, registry, addr, http_events.sender())?;

let actor = Service { http_events };
Ok(actor)
Expand Down

0 comments on commit bc44fe6

Please sign in to comment.