Skip to content

Commit

Permalink
Compile-time checked State (#33)
Browse files Browse the repository at this point in the history
* Compile-time checked State

* Remove references to state map

* Derive From impl for app state

* Added docs and fixed docs
  • Loading branch information
Victor-N-Suadicani authored Jan 4, 2024
1 parent 313c2e8 commit 46fdb7b
Show file tree
Hide file tree
Showing 23 changed files with 403 additions and 285 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ jobs:
- name: Lint
run: cargo clippy --workspace --all-targets -- --deny warnings

# Ensures that the docs can be built properly.
- name: Docs
run: cargo doc --no-deps
env:
RUSTDOCFLAGS: -D warnings

# Runs cargo deny, an auditing tool and dependency checker, among other things. See https://github.com/EmbarkStudios/cargo-deny
audit:
name: Audit
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn echo(Msg(request): Msg<EchoRequest>) -> EchoResponse {

#[tokio::main]
async fn main() -> kanin::Result<()> {
App::new()
App::new(())
.handler("my_routing_key", echo)
.run("amqp_addr")
.await
Expand Down
7 changes: 2 additions & 5 deletions kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand All @@ -13,7 +13,7 @@ readme = "../README.md"

[dependencies]
# Derive macros for traits in kanin.
kanin_derive = "0.5.2"
kanin_derive = "0.6.0"

# Lower level AMQP framework.
lapin = "2.1.0"
Expand Down Expand Up @@ -43,9 +43,6 @@ derive_more = "0.99.17"
# Great for structured errors.
thiserror = "1.0.30"

# Provides a map from types to a single value of that type.
anymap = "1.0.0-beta.1"

[dev-dependencies]
# Concrete logging implementation.
env_logger = "0.10.0"
Expand Down
63 changes: 21 additions & 42 deletions kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
mod task;

use std::{any::Any, sync::Arc};
use std::sync::Arc;

use anymap::Map;
use futures::future::{select_all, SelectAll};
use lapin::{self, Connection, ConnectionProperties};
use tokio::task::JoinHandle;
Expand All @@ -14,34 +13,34 @@ use self::task::TaskFactory;
use crate::{Error, Handler, HandlerConfig, Respond, Result};
use tokio::sync::mpsc;

/// Apps can hold any type as state. These types can then be extracted in handlers. This state is stored in a type-map.
pub(crate) type StateMap = Map<dyn Any + Send + Sync>;

/// The central struct of your application.
#[must_use = "The app will not do anything unless you call `.run`."]
pub struct App {
pub struct App<S> {
/// A map from routing keys to task factories.
/// Task factories are constructed in [`App::handler`] and called in [`App::run`].
handlers: Vec<TaskFactory>,
/// A map from types to a single value of that type.
handlers: Vec<TaskFactory<S>>,
/// This is used to hold the state values that users may want to store before running the app,
/// and then extract in their handlers.
state_map: StateMap,
/// and then extract in their handlers. Types that wish to be extracted via `State<T>` must
/// implement `From<&S>`.
state: S,
}

impl Default for App {
impl<S: Default> Default for App<S> {
fn default() -> Self {
Self {
state_map: Map::new(),
handlers: Vec::default(),
state: S::default(),
}
}
}

impl App {
/// Creates a new kanin app with the default configuration.
pub fn new() -> Self {
Self::default()
impl<S> App<S> {
/// Creates a new kanin app.
pub fn new(state: S) -> Self {
Self {
handlers: Vec::new(),
state,
}
}

/// Registers a new handler for the given routing key with the default prefetch count.
Expand All @@ -50,8 +49,9 @@ impl App {
/// This requires that the response type implements Respond (which is automatically implemented for protobuf messages).
pub fn handler<H, Args, Res>(self, routing_key: impl Into<String>, handler: H) -> Self
where
H: Handler<Args, Res>,
H: Handler<Args, Res, S>,
Res: Respond,
S: Send + Sync + 'static,
{
self.handler_with_config(routing_key, handler, Default::default())
}
Expand All @@ -67,8 +67,9 @@ impl App {
config: HandlerConfig,
) -> Self
where
H: Handler<Args, Res>,
H: Handler<Args, Res, S>,
Res: Respond,
S: Send + Sync + 'static,
{
let routing_key = routing_key.into();
debug!(
Expand All @@ -83,28 +84,6 @@ impl App {
self
}

/// Adds a type as state to this app.
///
/// An `App` may use any number of types as state. The app will contain one instance of each type.
///
/// The state added to the app through this method can subsequently be used in request handlers,
/// by making use of the [`crate::extract::State`] extractor.
///
/// # Panics
/// Panics if the given type has already been registered with the app.
pub fn state<T: Clone + Send + Sync + 'static>(mut self, value: T) -> Self {
debug!("Registering state for type {}", std::any::type_name::<T>());
if self.state_map.insert(value).is_some() {
panic!(
"Attempted to register a state type, `{}` that had already been registered before! \
You can only register one value of each type. If you need multiple values of the same type, \
use the newtype pattern to signify the semantic difference between the two values.",
std::any::type_name::<T>()
);
}
self
}

/// Connects to AMQP with the given address and calls [`run_with_connection`][App::run_with_connection] with the resulting connection.
/// See [`run_with_connection`][App::run_with_connection] for more details.
#[allow(clippy::missing_errors_doc)]
Expand Down Expand Up @@ -171,7 +150,7 @@ impl App {
});

let mut join_handles = Vec::new();
let state_map = Arc::new(self.state_map);
let state = Arc::new(self.state);
for task_factory in self.handlers.into_iter() {
debug!(
"Spawning handler task for routing key: {:?} ...",
Expand All @@ -180,7 +159,7 @@ impl App {

// Construct the task from the factory. This produces a pinned future which we can then spawn.
let task = task_factory
.build(conn, state_map.clone())
.build(conn, state.clone())
.await
.map_err(Error::Lapin)?;

Expand Down
38 changes: 18 additions & 20 deletions kanin/src/app/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};

use crate::{Handler, HandlerConfig, Request, Respond};

use super::StateMap;

/// Handler tasks are the async functions that are run in the tokio tasks to perform handlers.
///
/// They use a given consumer and channel handle in order to receive AMQP deliveries.
Expand All @@ -25,17 +23,18 @@ use super::StateMap;
pub(super) type HandlerTask = Pin<Box<dyn Future<Output = String> + Send>>;

/// Creates the handler task for the given handler and routing key. See [`HandlerTask`].
fn handler_task<H, Args, Res>(
fn handler_task<H, S, Args, Res>(
routing_key: String,
handler: H,
channel: Channel,
mut consumer: Consumer,
state: Arc<StateMap>,
state: Arc<S>,
should_reply: bool,
) -> HandlerTask
where
H: Handler<Args, Res>,
H: Handler<Args, Res, S>,
Res: Respond,
S: Send + Sync + 'static,
{
Box::pin(async move {
// We keep a set of handles to all outstanding spawned tasks.
Expand Down Expand Up @@ -92,13 +91,13 @@ where
/// Handles the given request with the given handler and channel.
///
/// Acks the request and responds with the given acker as appropriate.
async fn handle_request<H, Args, Res>(
mut req: Request,
async fn handle_request<H, S, Args, Res>(
mut req: Request<S>,
handler: H,
channel: Channel,
should_reply: bool,
) where
H: Handler<Args, Res>,
H: Handler<Args, Res, S>,
Res: Respond,
{
let properties = req.properties().cloned();
Expand Down Expand Up @@ -221,33 +220,32 @@ async fn handle_request<H, Args, Res>(
/// 3. User calls [`App::run`][crate::App::run], creating tasks from all the task factories that are then run in tokio.
///
/// [`App`]: crate::App
pub(super) struct TaskFactory {
pub(super) struct TaskFactory<S> {
/// The routing key of the handler task produced by this task factory.
routing_key: String,
/// Configuration for the handler task produced by this task factory.
config: HandlerConfig,
/// The factory function that constructs the handler task from the given channel, consumer and state map.
factory: Box<dyn FnOnce(Channel, Consumer, Arc<StateMap>) -> HandlerTask + Send>,
/// The factory function that constructs the handler task from the given channel, consumer and state.
factory: Box<dyn FnOnce(Channel, Consumer, Arc<S>) -> HandlerTask + Send>,
}

impl TaskFactory {
impl<S> TaskFactory<S> {
/// Constructs a new task factory from the given routing key and handler.
pub(super) fn new<H, Args, Res>(routing_key: String, handler: H, config: HandlerConfig) -> Self
where
H: Handler<Args, Res>,
H: Handler<Args, Res, S>,
Res: Respond,
S: Send + Sync + 'static,
{
let should_reply = config.should_reply;

// A task factory is a closure in a box that produces a handler task.
Self {
routing_key: routing_key.clone(),
config,
factory: Box::new(
move |channel: Channel, consumer: Consumer, state: Arc<StateMap>| {
handler_task(routing_key, handler, channel, consumer, state, should_reply)
},
),
factory: Box::new(move |channel: Channel, consumer: Consumer, state: Arc<S>| {
handler_task(routing_key, handler, channel, consumer, state, should_reply)
}),
}
}

Expand All @@ -260,7 +258,7 @@ impl TaskFactory {
pub(super) async fn build(
self,
conn: &Connection,
state_map: Arc<StateMap>,
state: Arc<S>,
) -> lapin::Result<HandlerTask> {
debug!(
"Building task for handler on routing key {:?}",
Expand Down Expand Up @@ -314,6 +312,6 @@ impl TaskFactory {
)
.await?;

Ok((self.factory)(channel, consumer, state_map))
Ok((self.factory)(channel, consumer, state))
}
}
5 changes: 1 addition & 4 deletions kanin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ pub enum ServerError {
/// A handler attempted to extract the delivery of a message twice.
#[error("The delivery was already extracted from the request and could not be accessed")]
DeliveryAlreadyExtracted,
/// A handler attempted to extract a [`crate::extract::State`] but the state type had not been added to the app.
#[error("The called handler was misconfigured. If you're the app owner, please see the logs for details")]
StateNotFound,
}

/// Types that may be constructed from errors.
Expand All @@ -78,7 +75,7 @@ impl<T> FromError<Infallible> for T {
}
}

/// This impl ensures that if T can be constructed from an error, then Option<T> can also be constructed from an error.
/// This impl ensures that if T can be constructed from an error, then `Option<T>` can also be constructed from an error.
/// Simply by wrapping in Some, obviously.
impl<T> FromError<HandlerError> for Option<T>
where
Expand Down
39 changes: 25 additions & 14 deletions kanin/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@ pub use state::State;
/// Note that extractions might mutate the request in certain ways.
/// Most notably, if extracting the [`Delivery`] or [`Acker`] from a request, it is the responsibility of the handler to acknowledge the message.
#[async_trait]
pub trait Extract: Sized {
pub trait Extract<S>: Sized {
/// The error to return in case extraction fails.
type Error: Error;

/// Extract the type from the request.
async fn extract(req: &mut Request) -> Result<Self, Self::Error>;
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error>;
}

/// Note that when you extract the [`Delivery`], the handler itself must acknowledge the request.
/// Kanin *will not* acknowledge the request for you in this case.
#[async_trait]
impl Extract for Delivery {
impl<S> Extract<S> for Delivery
where
S: Send + Sync,
{
type Error = HandlerError;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
req.delivery
.take()
.ok_or(HandlerError::DELIVERY_ALREADY_EXTRACTED)
Expand All @@ -46,10 +49,13 @@ impl Extract for Delivery {
/// Note that when you extract the [`Acker`], the handler itself must acknowledge the request.
/// kanin *will not* acknowledge the request for you in this case.
#[async_trait]
impl Extract for Acker {
impl<S> Extract<S> for Acker
where
S: Send + Sync,
{
type Error = HandlerError;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
req.delivery
.as_mut()
.ok_or(HandlerError::DELIVERY_ALREADY_EXTRACTED)
Expand All @@ -58,35 +64,40 @@ impl Extract for Acker {
}

#[async_trait]
impl Extract for Channel {
impl<S> Extract<S> for Channel
where
S: Send + Sync,
{
type Error = Infallible;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
Ok(req.channel().clone())
}
}

/// Extracting options simply discards the error and returns None in that case.
#[async_trait]
impl<T> Extract for Option<T>
impl<S, T> Extract<S> for Option<T>
where
T: Extract,
T: Extract<S>,
S: Send + Sync,
{
type Error = Infallible;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
Ok(Extract::extract(req).await.ok())
}
}

#[async_trait]
impl<T> Extract for Result<T, <T as Extract>::Error>
impl<S, T> Extract<S> for Result<T, <T as Extract<S>>::Error>
where
T: Extract,
T: Extract<S>,
S: Send + Sync,
{
type Error = Infallible;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
Ok(Extract::extract(req).await)
}
}
Loading

0 comments on commit 46fdb7b

Please sign in to comment.