From 171ebd01e6c5377efc54392c5859b8f80fc5bf3d Mon Sep 17 00:00:00 2001 From: Layl Bongers <3094382+LaylBongers@users.noreply.github.com> Date: Sat, 12 Aug 2023 16:46:46 +0200 Subject: [PATCH] Move signal out of metadata --- crates/stewart-http/src/connection.rs | 17 ++++++++++++----- crates/stewart-http/src/lib.rs | 2 ++ crates/stewart-http/src/listener.rs | 9 ++++++--- crates/stewart-mio/src/lib.rs | 2 ++ crates/stewart-mio/src/net/tcp/listener.rs | 9 ++++++--- crates/stewart-mio/src/net/tcp/stream.rs | 9 ++++++--- crates/stewart-mio/src/net/udp.rs | 8 +++++--- crates/stewart-quic/src/lib.rs | 8 ++++++-- crates/stewart/src/actor.rs | 21 +++++++++------------ crates/stewart/src/lib.rs | 2 +- crates/stewart/src/world.rs | 17 ++++++++++++++--- examples/hello.rs | 5 +++-- examples/html_hello.rs | 9 +++++++-- examples/tcp_echo.rs | 15 +++++++++------ examples/udp_echo.rs | 8 +++++--- 15 files changed, 93 insertions(+), 48 deletions(-) diff --git a/crates/stewart-http/src/connection.rs b/crates/stewart-http/src/connection.rs index ab3e1a2..3923753 100644 --- a/crates/stewart-http/src/connection.rs +++ b/crates/stewart-http/src/connection.rs @@ -81,13 +81,16 @@ impl Drop for Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.tcp_events.set_signal(meta.signal()); - self.actions.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + + self.tcp_events.set_signal(signal.clone()); + self.actions.set_signal(signal); + Ok(()) } - fn process(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + fn process(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { self.process_tcp(meta)?; // Can't do anything further if we don't have an open TCP connection @@ -104,14 +107,18 @@ impl Actor for Service { self.receive_buffer = right.to_string(); event!(Level::DEBUG, "received request"); + let mailbox = Mailbox::default(); - mailbox.set_signal(meta.signal()); + let signal = world.signal(meta.id()); + mailbox.set_signal(signal); + // Send the request event let event = RequestEvent { actions: mailbox.sender(), }; self.http_events.send(HttpEvent::Request(event))?; + // Track the request self.pending_requests.push_back(mailbox); } diff --git a/crates/stewart-http/src/lib.rs b/crates/stewart-http/src/lib.rs index 1cc68c1..1d6971f 100644 --- a/crates/stewart-http/src/lib.rs +++ b/crates/stewart-http/src/lib.rs @@ -1,3 +1,5 @@ +#![deny(unsafe_code)] + mod connection; mod listener; diff --git a/crates/stewart-http/src/listener.rs b/crates/stewart-http/src/listener.rs index 2159c73..a7a999c 100644 --- a/crates/stewart-http/src/listener.rs +++ b/crates/stewart-http/src/listener.rs @@ -78,8 +78,10 @@ impl Drop for Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.tcp_events.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + self.tcp_events.set_signal(signal); + Ok(()) } @@ -89,7 +91,8 @@ impl Actor for Service { match event { tcp::ListenerEvent::Connected(event) => { let events = Mailbox::default(); - events.set_signal(meta.signal()); + let signal = world.signal(meta.id()); + events.set_signal(signal); let actions = connection::open( world, diff --git a/crates/stewart-mio/src/lib.rs b/crates/stewart-mio/src/lib.rs index 91e5fcd..b090f2c 100644 --- a/crates/stewart-mio/src/lib.rs +++ b/crates/stewart-mio/src/lib.rs @@ -1,3 +1,5 @@ +#![deny(unsafe_code)] + mod event_loop; pub mod net; mod registry; diff --git a/crates/stewart-mio/src/net/tcp/listener.rs b/crates/stewart-mio/src/net/tcp/listener.rs index 7129ab1..3bd31fb 100644 --- a/crates/stewart-mio/src/net/tcp/listener.rs +++ b/crates/stewart-mio/src/net/tcp/listener.rs @@ -103,9 +103,12 @@ impl Drop for Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.actions.set_signal(meta.signal()); - self.ready.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + + self.actions.set_signal(signal.clone()); + self.ready.set_signal(signal); + Ok(()) } diff --git a/crates/stewart-mio/src/net/tcp/stream.rs b/crates/stewart-mio/src/net/tcp/stream.rs index 218e628..bafd224 100644 --- a/crates/stewart-mio/src/net/tcp/stream.rs +++ b/crates/stewart-mio/src/net/tcp/stream.rs @@ -105,9 +105,12 @@ impl Drop for Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.action_mailbox.set_signal(meta.signal()); - self.ready_mailbox.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + + self.action_mailbox.set_signal(signal.clone()); + self.ready_mailbox.set_signal(signal); + Ok(()) } diff --git a/crates/stewart-mio/src/net/udp.rs b/crates/stewart-mio/src/net/udp.rs index 666628c..9147c79 100644 --- a/crates/stewart-mio/src/net/udp.rs +++ b/crates/stewart-mio/src/net/udp.rs @@ -101,9 +101,11 @@ impl Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.actions.set_signal(meta.signal()); - self.ready.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + + self.actions.set_signal(signal.clone()); + self.ready.set_signal(signal); Ok(()) } diff --git a/crates/stewart-quic/src/lib.rs b/crates/stewart-quic/src/lib.rs index 8418c83..fa02eae 100644 --- a/crates/stewart-quic/src/lib.rs +++ b/crates/stewart-quic/src/lib.rs @@ -1,3 +1,5 @@ +#![deny(unsafe_code)] + use std::{net::SocketAddr, sync::Arc}; use anyhow::Error; @@ -68,8 +70,10 @@ impl Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.event_mailbox.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + self.event_mailbox.set_signal(signal); + Ok(()) } diff --git a/crates/stewart/src/actor.rs b/crates/stewart/src/actor.rs index 6889c37..8131ab9 100644 --- a/crates/stewart/src/actor.rs +++ b/crates/stewart/src/actor.rs @@ -1,6 +1,6 @@ use anyhow::Error; -use crate::{Signal, World}; +use crate::{Id, World}; /// Actor identity and implementation trait. /// @@ -25,25 +25,22 @@ pub trait Actor: 'static { /// Metadata of an `Actor` in a `World`. pub struct Metadata { - signal: Signal, + id: Id, stop: bool, } impl Metadata { - pub(crate) fn new(signal: Signal) -> Self { - Self { - signal, - stop: false, - } + pub(crate) fn new(id: Id) -> Self { + Self { id, stop: false } } - pub(crate) fn stop(&self) -> bool { - self.stop + /// Get the identifier of this actor. + pub fn id(&self) -> Id { + self.id } - /// Get a `Signal` that wakes the actor. - pub fn signal(&self) -> Signal { - self.signal.clone() + pub(crate) fn stop(&self) -> bool { + self.stop } /// At the end of this processing step, stop the actor. diff --git a/crates/stewart/src/lib.rs b/crates/stewart/src/lib.rs index 5b56620..13a51c9 100644 --- a/crates/stewart/src/lib.rs +++ b/crates/stewart/src/lib.rs @@ -13,5 +13,5 @@ mod world; pub use self::{ actor::{Actor, Metadata}, signal::Signal, - world::{ProcessError, World}, + world::{Id, ProcessError, World}, }; diff --git a/crates/stewart/src/world.rs b/crates/stewart/src/world.rs index bd2214c..bcd32f5 100644 --- a/crates/stewart/src/world.rs +++ b/crates/stewart/src/world.rs @@ -3,7 +3,7 @@ use thiserror::Error; use thunderdome::{Arena, Index}; use tracing::{event, instrument, span, Level}; -use crate::{signal::SignalReceiver, Actor, Metadata}; +use crate::{signal::SignalReceiver, Actor, Metadata, Signal}; /// Thread-local actor tracking and execution system. #[derive(Default)] @@ -63,6 +63,11 @@ impl World { Ok(()) } + /// Create a signal for the given actor. + pub fn signal(&self, id: Id) -> Signal { + self.receiver.signal(id.index) + } + /// Process all pending actors, until none are left pending. #[instrument("World::process", level = "debug", skip_all)] pub fn process(&mut self) -> Result<(), ProcessError> { @@ -86,8 +91,8 @@ impl World { // Let the actor's implementation process event!(Level::TRACE, "calling actor"); - let signal = self.receiver.signal(index); - let mut meta = Metadata::new(signal); + let id = Id { index }; + let mut meta = Metadata::new(id); let result = f(actor.as_mut(), self, &mut meta); // Check if processing failed @@ -136,6 +141,12 @@ impl World { } } +/// Identifier of an actor inserted into a world. +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct Id { + index: Index, +} + /// Error while processing actors. #[derive(Error, Debug)] #[error("failed to process world")] diff --git a/examples/hello.rs b/examples/hello.rs index 37994a0..664b49e 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -141,10 +141,11 @@ mod hello_service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { // To wake up our actor when a message gets sent, register it with the mailbox for // notification - self.mailbox.set_signal(meta.signal()); + let signal = world.signal(meta.id()); + self.mailbox.set_signal(signal); Ok(()) } diff --git a/examples/html_hello.rs b/examples/html_hello.rs index 4b6b1f0..bf8189b 100644 --- a/examples/html_hello.rs +++ b/examples/html_hello.rs @@ -4,6 +4,7 @@ use anyhow::Error; use stewart::{message::Mailbox, Actor, Metadata, World}; use stewart_http::{HttpEvent, RequestAction}; use stewart_mio::{Registry, RegistryHandle}; +use tracing::{event, Level}; fn main() -> Result<(), Error> { utils::init_logging(); @@ -38,8 +39,10 @@ impl Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.http_events.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + self.http_events.set_signal(signal); + Ok(()) } @@ -47,6 +50,8 @@ impl Actor for Service { while let Some(event) = self.http_events.recv() { let HttpEvent::Request(request) = event; + event!(Level::INFO, "received request"); + let body = RESPONSE.into(); request.actions.send(RequestAction::SendResponse(body))?; } diff --git a/examples/tcp_echo.rs b/examples/tcp_echo.rs index aea23d1..2657094 100644 --- a/examples/tcp_echo.rs +++ b/examples/tcp_echo.rs @@ -63,13 +63,15 @@ impl Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.server_mailbox.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + self.server_mailbox.set_signal(signal); + Ok(()) } - fn process(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.poll_listener(meta)?; + fn process(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + self.poll_listener(world, meta)?; self.poll_connections()?; Ok(()) @@ -77,7 +79,7 @@ impl Actor for Service { } impl Service { - fn poll_listener(&mut self, meta: &mut Metadata) -> Result<(), Error> { + fn poll_listener(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { while let Some(event) = self.server_mailbox.recv() { match event { tcp::ListenerEvent::Connected(event) => { @@ -89,7 +91,8 @@ impl Service { event.actions.send(tcp::ConnectionAction::Send(action))?; // Keep track of the stream - event.events.set_signal(meta.signal()); + let signal = world.signal(meta.id()); + event.events.set_signal(signal); let connection = Connection { event, pending: Vec::new(), diff --git a/examples/udp_echo.rs b/examples/udp_echo.rs index d51f67f..832226c 100644 --- a/examples/udp_echo.rs +++ b/examples/udp_echo.rs @@ -88,9 +88,11 @@ impl Service { } impl Actor for Service { - fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> { - self.server_mailbox.set_signal(meta.signal()); - self.client_mailbox.set_signal(meta.signal()); + fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> { + let signal = world.signal(meta.id()); + + self.server_mailbox.set_signal(signal.clone()); + self.client_mailbox.set_signal(signal); Ok(()) }