Skip to content

Commit

Permalink
Add Meta helper
Browse files Browse the repository at this point in the history
  • Loading branch information
LaylBongers committed Aug 9, 2023
1 parent a2a1341 commit eb31501
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 129 deletions.
8 changes: 4 additions & 4 deletions crates/stewart-http/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Error;
use bytes::{BufMut, Bytes, BytesMut};
use stewart::{Actor, Id, World};
use stewart::{Actor, Meta, World};
use stewart_mio::net::tcp;
use tracing::{event, Level};

Expand Down Expand Up @@ -40,12 +40,12 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.event.event_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.event.event_mailbox.set_signal(meta.signal());
Ok(())
}

fn process(&mut self, _world: &mut World, _id: Id) -> Result<(), Error> {
fn process(&mut self, _world: &mut World, _meta: &mut Meta) -> Result<(), Error> {
while let Some(event) = self.event.event_mailbox.recv() {
match event {
tcp::StreamEvent::Recv(event) => {
Expand Down
10 changes: 5 additions & 5 deletions crates/stewart-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Error;
use bytes::Bytes;
use stewart::{
message::{Mailbox, Sender},
Actor, Id, World,
Actor, Meta, World,
};
use stewart_mio::{net::tcp, RegistryHandle};
use tracing::{event, Level};
Expand Down Expand Up @@ -61,18 +61,18 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.listener_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.listener_mailbox.set_signal(meta.signal());
Ok(())
}

fn process(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn process(&mut self, world: &mut World, meta: &mut Meta) -> Result<(), Error> {
while let Some(event) = self.listener_mailbox.recv() {
match event {
tcp::ListenerEvent::Connected(event) => {
connection::open(world, event, self.body.clone())?
}
tcp::ListenerEvent::Closed => world.stop(id),
tcp::ListenerEvent::Closed => meta.set_stop(),
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/stewart-mio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Error;
use mio::{Interest, Token};
use stewart::{
message::{Mailbox, Sender},
Actor, Id, World,
Actor, Meta, World,
};
use tracing::{event, instrument, Level};

Expand Down Expand Up @@ -105,13 +105,13 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.action_mailbox.set_signal(world.signal(id));
self.ready_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.action_mailbox.set_signal(meta.signal());
self.ready_mailbox.set_signal(meta.signal());
Ok(())
}

fn process(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn process(&mut self, world: &mut World, meta: &mut Meta) -> Result<(), Error> {
let mut readable = false;
while let Some(ready) = self.ready_mailbox.recv() {
readable |= ready.readable;
Expand All @@ -122,7 +122,7 @@ impl Actor for Service {
}

while let Some(_action) = self.action_mailbox.recv() {
world.stop(id);
meta.set_stop();
}

Ok(())
Expand Down
26 changes: 13 additions & 13 deletions crates/stewart-mio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::{Buf, Bytes, BytesMut};
use mio::{Interest, Token};
use stewart::{
message::{Mailbox, Sender},
Actor, Id, World,
Actor, Meta, World,
};
use tracing::{event, Level};

Expand Down Expand Up @@ -105,34 +105,34 @@ impl Drop for Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.action_mailbox.set_signal(world.signal(id));
self.ready_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.action_mailbox.set_signal(meta.signal());
self.ready_mailbox.set_signal(meta.signal());
Ok(())
}

fn process(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.poll_actions(world, id)?;
self.poll_ready(world, id)?;
fn process(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.poll_actions(meta)?;
self.poll_ready(meta)?;

Ok(())
}
}

impl Service {
fn poll_actions(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn poll_actions(&mut self, meta: &mut Meta) -> Result<(), Error> {
// Handle actions
while let Some(action) = self.action_mailbox.recv() {
match action {
StreamAction::Send(action) => self.on_action_send(action)?,
StreamAction::Close => world.stop(id),
StreamAction::Close => meta.set_stop(),
}
}

Ok(())
}

fn poll_ready(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn poll_ready(&mut self, meta: &mut Meta) -> Result<(), Error> {
// Handle ready
let mut readable = false;
let mut writable = false;
Expand All @@ -142,7 +142,7 @@ impl Service {
}

if readable {
self.on_ready_readable(world, id)?;
self.on_ready_readable(meta)?;
}

if writable {
Expand All @@ -152,7 +152,7 @@ impl Service {
Ok(())
}

fn on_ready_readable(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn on_ready_readable(&mut self, meta: &mut Meta) -> Result<(), Error> {
// Make sure we have at least a minimum amount of buffer space left
if self.buffer.len() < 1024 {
self.buffer.resize(2048, 0);
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Service {

// If the stream got closed, stop the actor
if closed {
world.stop(id);
meta.set_stop();
}

Ok(())
Expand Down
16 changes: 8 additions & 8 deletions crates/stewart-mio/src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::{Bytes, BytesMut};
use mio::{Interest, Token};
use stewart::{
message::{Mailbox, Sender},
Actor, Id, World,
Actor, Meta, World,
};
use tracing::{event, instrument, Level};

Expand Down Expand Up @@ -101,27 +101,27 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.action_mailbox.set_signal(world.signal(id));
self.ready_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.action_mailbox.set_signal(meta.signal());
self.ready_mailbox.set_signal(meta.signal());

Ok(())
}

fn process(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.poll_actions(world, id)?;
fn process(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.poll_actions(meta)?;
self.poll_ready()?;

Ok(())
}
}

impl Service {
fn poll_actions(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn poll_actions(&mut self, meta: &mut Meta) -> Result<(), Error> {
while let Some(message) = self.action_mailbox.recv() {
match message {
Action::Send(packet) => self.on_action_send(packet)?,
Action::Close => world.stop(id),
Action::Close => meta.set_stop(),
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/stewart-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use quinn_proto::{DatagramEvent, Endpoint, EndpointConfig, ServerConfig};
use rustls::{Certificate, PrivateKey};
use stewart::{
message::{Mailbox, Sender},
Actor, Id, World,
Actor, Meta, World,
};
use stewart_mio::{net::udp, RegistryHandle};
use tracing::{event, Level};
Expand Down Expand Up @@ -68,12 +68,12 @@ impl Service {
}

impl Actor for Service {
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
self.event_mailbox.set_signal(world.signal(id));
fn register(&mut self, _world: &mut World, meta: &mut Meta) -> Result<(), Error> {
self.event_mailbox.set_signal(meta.signal());
Ok(())
}

fn process(&mut self, _world: &mut World, _id: Id) -> Result<(), Error> {
fn process(&mut self, _world: &mut World, _meta: &mut Meta) -> Result<(), Error> {
while let Some(packet) = self.event_mailbox.recv() {
event!(Level::TRACE, "received packet");

Expand Down
35 changes: 32 additions & 3 deletions crates/stewart/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use anyhow::Error;

use crate::{Id, World};
use crate::{Signal, World};

/// Actor identity and implementation trait.
pub trait Actor: 'static {
/// Called when an actor is inserted into a `World`.
///
/// This is useful to receive the `Signal` for this actor.
#[allow(unused_variables)]
fn register(&mut self, world: &mut World, id: Id) -> Result<(), Error> {
fn register(&mut self, world: &mut World, meta: &mut Meta) -> Result<(), Error> {
Ok(())
}

Expand All @@ -20,5 +20,34 @@ pub trait Actor: 'static {
///
/// You should *always* prefer this over panicking, as this crashes the entire runtime.
/// Instead of using `unwrap` or `expect`, use `context` from the `anyhow` crate.
fn process(&mut self, world: &mut World, id: Id) -> Result<(), Error>;
fn process(&mut self, world: &mut World, meta: &mut Meta) -> Result<(), Error>;
}

/// Metadata of an `Actor` in a `World`.
pub struct Meta {
signal: Signal,
stop: bool,
}

impl Meta {
pub(crate) fn new(signal: Signal) -> Self {
Self {
signal,
stop: false,
}
}

pub(crate) fn stop(&self) -> bool {
self.stop
}

/// Get a `Signal` that wakes the actor.
pub fn signal(&self) -> Signal {
self.signal.clone()
}

/// At the end of this processing step, stop the actor.
pub fn set_stop(&mut self) {
self.stop = true;
}
}
4 changes: 2 additions & 2 deletions crates/stewart/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod signal;
mod world;

pub use self::{
actor::Actor,
actor::{Actor, Meta},
signal::Signal,
world::{Id, ProcessError, World},
world::{ProcessError, World},
};
Loading

0 comments on commit eb31501

Please sign in to comment.