From bc44fe62c8aece5db14937362da0c39f78d0c29a Mon Sep 17 00:00:00 2001 From: Layl Bongers <3094382+LaylBongers@users.noreply.github.com> Date: Sat, 12 Aug 2023 23:02:19 +0200 Subject: [PATCH] Slightly improve HTTP server implementation --- Cargo.toml | 1 - crates/stewart-http/src/connection.rs | 72 ++++++++++++++-------- crates/stewart-http/src/lib.rs | 2 +- crates/stewart-http/src/listener.rs | 5 +- crates/stewart-mio/src/net/tcp/listener.rs | 2 +- examples/html_hello.rs | 2 +- 6 files changed, 51 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7880c94..2136b4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ thiserror = "1.0" thunderdome = "0.6.0" tracing = "0.1.37" tracing-subscriber = "0.3.16" -tracing-test = "0.2.4" uuid = "1.4" stewart = { version = "0.10.0-dev", path = "./crates/stewart" } stewart-http = { version = "0.1.0-dev", path = "./crates/stewart-http" } diff --git a/crates/stewart-http/src/connection.rs b/crates/stewart-http/src/connection.rs index 3923753..1b056ca 100644 --- a/crates/stewart-http/src/connection.rs +++ b/crates/stewart-http/src/connection.rs @@ -41,8 +41,8 @@ struct Service { events: Sender, http_events: Sender, - receive_buffer: String, tcp_closed: bool, + receive_buffer: BytesMut, pending_requests: VecDeque>, } @@ -62,8 +62,8 @@ impl Service { events, http_events, - receive_buffer: String::new(), tcp_closed: false, + receive_buffer: BytesMut::new(), pending_requests: VecDeque::new(), } } @@ -99,28 +99,7 @@ impl Actor for Service { } self.process_actions()?; - - // Check if we have a full request worth of data - // TODO: This is very incorrect and really should be be redesigned entirely - let split = self.receive_buffer.split_once("\r\n\r\n"); - if let Some((_left, right)) = split { - self.receive_buffer = right.to_string(); - - event!(Level::DEBUG, "received request"); - - let mailbox = Mailbox::default(); - let signal = world.signal(meta.id()); - mailbox.set_signal(signal); - - // Send the request event - let event = RequestEvent { - actions: mailbox.sender(), - }; - self.http_events.send(HttpEvent::Request(event))?; - - // Track the request - self.pending_requests.push_back(mailbox); - } + self.process_received(world, meta)?; // Check requests we can resolve // HTTP 1.1 sends back responses in the same order as requests, so we only check the first @@ -145,9 +124,7 @@ impl Service { match event { tcp::ConnectionEvent::Recv(event) => { event!(Level::TRACE, bytes = event.data.len(), "received data"); - - let data = std::str::from_utf8(&event.data)?; - self.receive_buffer.push_str(data); + self.receive_buffer.extend(&event.data); } tcp::ConnectionEvent::Closed => { event!(Level::DEBUG, "connection closed"); @@ -173,6 +150,47 @@ impl Service { Ok(()) } + /// Process pending data previously received, but not yet processed. + fn process_received(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + // TODO: Instead of checking every time if we have a full header's worth, do something + // smarter. + // TODO: Handle requests with body content + + loop { + // Check if we have a full request worth of data left in the buffer + let location = self + .receive_buffer + .windows(4) + .enumerate() + .find(|(_, window)| window == b"\r\n\r\n") + .map(|(i, _)| i); + let Some(location) = location else { break; }; + + event!(Level::DEBUG, "received request"); + + // Split off the request we have to process + let request = self.receive_buffer.split_to(location + 4); + let data = std::str::from_utf8(&request)?; + println!("REQUEST: {:?}", data); + + // Create the mailbox to send a response back through + let mailbox = Mailbox::default(); + let signal = world.signal(meta.id()); + mailbox.set_signal(signal); + + // Send the request event + let event = RequestEvent { + actions: mailbox.sender(), + }; + self.http_events.send(HttpEvent::Request(event))?; + + // Track the request + self.pending_requests.push_back(mailbox); + } + + Ok(()) + } + fn send_response(&mut self, body: Bytes) -> Result<(), Error> { // Send the response let mut data = BytesMut::new(); diff --git a/crates/stewart-http/src/lib.rs b/crates/stewart-http/src/lib.rs index 1d6971f..deb41cd 100644 --- a/crates/stewart-http/src/lib.rs +++ b/crates/stewart-http/src/lib.rs @@ -6,7 +6,7 @@ mod listener; use bytes::Bytes; use stewart::message::Sender; -pub use self::listener::listen; +pub use self::listener::bind; pub enum HttpEvent { Request(RequestEvent), diff --git a/crates/stewart-http/src/listener.rs b/crates/stewart-http/src/listener.rs index ff07671..adcf953 100644 --- a/crates/stewart-http/src/listener.rs +++ b/crates/stewart-http/src/listener.rs @@ -10,14 +10,15 @@ use tracing::{event, Level}; use crate::{connection, HttpEvent}; -pub fn listen( +/// Open a HTTP listener on the given address. +pub fn bind( world: &mut World, registry: RegistryRef, addr: SocketAddr, http_events: Sender, ) -> Result<(), Error> { let actor = Service::new(world, registry, addr, http_events)?; - world.insert("http-server", actor)?; + world.insert("http-listener", actor)?; Ok(()) } diff --git a/crates/stewart-mio/src/net/tcp/listener.rs b/crates/stewart-mio/src/net/tcp/listener.rs index 8abf3ed..1aa514a 100644 --- a/crates/stewart-mio/src/net/tcp/listener.rs +++ b/crates/stewart-mio/src/net/tcp/listener.rs @@ -36,7 +36,7 @@ pub struct ConnectedEvent { /// /// TCP, unlike UDP, works with ongoing connections. /// Before a connection is established, you first need to 'listen' for those on a port. -#[instrument("tcp::listen", skip_all)] +#[instrument("tcp::bind", skip_all)] pub fn bind( world: &mut World, registry: RegistryRef, diff --git a/examples/html_hello.rs b/examples/html_hello.rs index 84e278c..c511865 100644 --- a/examples/html_hello.rs +++ b/examples/html_hello.rs @@ -31,7 +31,7 @@ impl Service { let http_events = Mailbox::default(); let addr = "127.0.0.1:1234".parse()?; - stewart_http::listen(world, registry, addr, http_events.sender())?; + stewart_http::bind(world, registry, addr, http_events.sender())?; let actor = Service { http_events }; Ok(actor)