Skip to content

Commit

Permalink
Move signal out of metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
LaylBongers committed Aug 12, 2023
1 parent 6eb9728 commit 171ebd0
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 48 deletions.
17 changes: 12 additions & 5 deletions crates/stewart-http/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.tcp_events.set_signal(meta.signal());
self.actions.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());

self.tcp_events.set_signal(signal.clone());
self.actions.set_signal(signal);

Ok(())
}

fn process(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
fn process(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.process_tcp(meta)?;

// Can't do anything further if we don't have an open TCP connection
Expand All @@ -104,14 +107,18 @@ impl Actor for Service {
self.receive_buffer = right.to_string();

event!(Level::DEBUG, "received request");

let mailbox = Mailbox::default();
mailbox.set_signal(meta.signal());
let signal = world.signal(meta.id());
mailbox.set_signal(signal);

// Send the request event
let event = RequestEvent {
actions: mailbox.sender(),
};
self.http_events.send(HttpEvent::Request(event))?;

// Track the request
self.pending_requests.push_back(mailbox);
}

Expand Down
2 changes: 2 additions & 0 deletions crates/stewart-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![deny(unsafe_code)]

mod connection;
mod listener;

Expand Down
9 changes: 6 additions & 3 deletions crates/stewart-http/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.tcp_events.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());
self.tcp_events.set_signal(signal);

Ok(())
}

Expand All @@ -89,7 +91,8 @@ impl Actor for Service {
match event {
tcp::ListenerEvent::Connected(event) => {
let events = Mailbox::default();
events.set_signal(meta.signal());
let signal = world.signal(meta.id());
events.set_signal(signal);

let actions = connection::open(
world,
Expand Down
2 changes: 2 additions & 0 deletions crates/stewart-mio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![deny(unsafe_code)]

mod event_loop;
pub mod net;
mod registry;
Expand Down
9 changes: 6 additions & 3 deletions crates/stewart-mio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.actions.set_signal(meta.signal());
self.ready.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());

self.actions.set_signal(signal.clone());
self.ready.set_signal(signal);

Ok(())
}

Expand Down
9 changes: 6 additions & 3 deletions crates/stewart-mio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.action_mailbox.set_signal(meta.signal());
self.ready_mailbox.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());

self.action_mailbox.set_signal(signal.clone());
self.ready_mailbox.set_signal(signal);

Ok(())
}

Expand Down
8 changes: 5 additions & 3 deletions crates/stewart-mio/src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.actions.set_signal(meta.signal());
self.ready.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());

self.actions.set_signal(signal.clone());
self.ready.set_signal(signal);

Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions crates/stewart-quic/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![deny(unsafe_code)]

use std::{net::SocketAddr, sync::Arc};

use anyhow::Error;
Expand Down Expand Up @@ -68,8 +70,10 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.event_mailbox.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());
self.event_mailbox.set_signal(signal);

Ok(())
}

Expand Down
21 changes: 9 additions & 12 deletions crates/stewart/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Error;

use crate::{Signal, World};
use crate::{Id, World};

/// Actor identity and implementation trait.
///
Expand All @@ -25,25 +25,22 @@ pub trait Actor: 'static {

/// Metadata of an `Actor` in a `World`.
pub struct Metadata {
signal: Signal,
id: Id,
stop: bool,
}

impl Metadata {
pub(crate) fn new(signal: Signal) -> Self {
Self {
signal,
stop: false,
}
pub(crate) fn new(id: Id) -> Self {
Self { id, stop: false }
}

pub(crate) fn stop(&self) -> bool {
self.stop
/// Get the identifier of this actor.
pub fn id(&self) -> Id {
self.id
}

/// Get a `Signal` that wakes the actor.
pub fn signal(&self) -> Signal {
self.signal.clone()
pub(crate) fn stop(&self) -> bool {
self.stop
}

/// At the end of this processing step, stop the actor.
Expand Down
2 changes: 1 addition & 1 deletion crates/stewart/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ mod world;
pub use self::{
actor::{Actor, Metadata},
signal::Signal,
world::{ProcessError, World},
world::{Id, ProcessError, World},
};
17 changes: 14 additions & 3 deletions crates/stewart/src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use thiserror::Error;
use thunderdome::{Arena, Index};
use tracing::{event, instrument, span, Level};

use crate::{signal::SignalReceiver, Actor, Metadata};
use crate::{signal::SignalReceiver, Actor, Metadata, Signal};

/// Thread-local actor tracking and execution system.
#[derive(Default)]
Expand Down Expand Up @@ -63,6 +63,11 @@ impl World {
Ok(())
}

/// Create a signal for the given actor.
pub fn signal(&self, id: Id) -> Signal {
self.receiver.signal(id.index)
}

/// Process all pending actors, until none are left pending.
#[instrument("World::process", level = "debug", skip_all)]
pub fn process(&mut self) -> Result<(), ProcessError> {
Expand All @@ -86,8 +91,8 @@ impl World {

// Let the actor's implementation process
event!(Level::TRACE, "calling actor");
let signal = self.receiver.signal(index);
let mut meta = Metadata::new(signal);
let id = Id { index };
let mut meta = Metadata::new(id);
let result = f(actor.as_mut(), self, &mut meta);

// Check if processing failed
Expand Down Expand Up @@ -136,6 +141,12 @@ impl World {
}
}

/// Identifier of an actor inserted into a world.
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct Id {
index: Index,
}

/// Error while processing actors.
#[derive(Error, Debug)]
#[error("failed to process world")]
Expand Down
5 changes: 3 additions & 2 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ mod hello_service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
// To wake up our actor when a message gets sent, register it with the mailbox for
// notification
self.mailbox.set_signal(meta.signal());
let signal = world.signal(meta.id());
self.mailbox.set_signal(signal);

Ok(())
}
Expand Down
9 changes: 7 additions & 2 deletions examples/html_hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Error;
use stewart::{message::Mailbox, Actor, Metadata, World};
use stewart_http::{HttpEvent, RequestAction};
use stewart_mio::{Registry, RegistryHandle};
use tracing::{event, Level};

fn main() -> Result<(), Error> {
utils::init_logging();
Expand Down Expand Up @@ -38,15 +39,19 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.http_events.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());
self.http_events.set_signal(signal);

Ok(())
}

fn process(&mut self, _world: &mut World, _meta: &mut Metadata) -> Result<(), Error> {
while let Some(event) = self.http_events.recv() {
let HttpEvent::Request(request) = event;

event!(Level::INFO, "received request");

let body = RESPONSE.into();
request.actions.send(RequestAction::SendResponse(body))?;
}
Expand Down
15 changes: 9 additions & 6 deletions examples/tcp_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,23 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.server_mailbox.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());
self.server_mailbox.set_signal(signal);

Ok(())
}

fn process(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.poll_listener(meta)?;
fn process(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.poll_listener(world, meta)?;
self.poll_connections()?;

Ok(())
}
}

impl Service {
fn poll_listener(&mut self, meta: &mut Metadata) -> Result<(), Error> {
fn poll_listener(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
while let Some(event) = self.server_mailbox.recv() {
match event {
tcp::ListenerEvent::Connected(event) => {
Expand All @@ -89,7 +91,8 @@ impl Service {
event.actions.send(tcp::ConnectionAction::Send(action))?;

// Keep track of the stream
event.events.set_signal(meta.signal());
let signal = world.signal(meta.id());
event.events.set_signal(signal);
let connection = Connection {
event,
pending: Vec::new(),
Expand Down
8 changes: 5 additions & 3 deletions examples/udp_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, _world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
self.server_mailbox.set_signal(meta.signal());
self.client_mailbox.set_signal(meta.signal());
fn register(&mut self, world: &mut World, meta: &mut Metadata) -> Result<(), Error> {
let signal = world.signal(meta.id());

self.server_mailbox.set_signal(signal.clone());
self.client_mailbox.set_signal(signal);

Ok(())
}
Expand Down

0 comments on commit 171ebd0

Please sign in to comment.