From 88bd2a20bcd9b4dea073b2ef8bd311a1b7d21860 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Sat, 28 Sep 2024 07:06:46 +0300 Subject: [PATCH] fix: apply `FromRequest` for items in `Parts` Problem: We are missing crucial `FromRequest` impls for: - TaskId - Attempt - Namespace Also removed `Context` Solution: Implement `FromRequest` for these Types. --- examples/fn-args/src/main.rs | 16 +++++++++------- packages/apalis-core/src/task/namespace.rs | 11 +++++++++++ packages/apalis-core/src/task/task_id.rs | 8 ++++++++ packages/apalis-core/src/worker/mod.rs | 7 ------- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/examples/fn-args/src/main.rs b/examples/fn-args/src/main.rs index 4a28d285..1e5655f6 100644 --- a/examples/fn-args/src/main.rs +++ b/examples/fn-args/src/main.rs @@ -19,17 +19,18 @@ struct SimpleJob {} // A task can have up to 16 arguments async fn simple_job( - _: SimpleJob, // Required, must be of the type of the job/message - worker_id: Data, // The worker running the job, added by worker - _worker_ctx: Context, // The worker context, added by worker + _: SimpleJob, // Required, must be of the type of the job/message + worker_id: Data, // The worker running the job, added by worker + _worker_ctx: Data>, // The worker context, added by worker _sqlite: Data>, // The source, added by storage - task_id: Data, // The task id, added by storage - ctx: SqlContext, // The task context - count: Data, // Our custom data added via layer + task_id: TaskId, // The task id, added by storage + attempt: Attempt, // The current attempt + ctx: SqlContext, // The task context provided by the backend + count: Data, // Our custom data added via layer ) { // increment the counter let current = count.fetch_add(1, Ordering::Relaxed); - info!("worker: {worker_id:?}; task_id: {task_id:?}, ctx: {ctx:?}, count: {current:?}"); + info!("worker: {worker_id:?}; task_id: {task_id:?}, ctx: {ctx:?}, attempt:{attempt:?} count: {current:?}"); } async fn produce_jobs(storage: &mut SqliteStorage) { @@ -62,6 +63,7 @@ async fn main() -> Result<(), std::io::Error> { .register_with_count(2, { WorkerBuilder::new("tasty-banana") .data(Count::default()) + .data(sqlite.clone()) .backend(sqlite) .build_fn(simple_job) }) diff --git a/packages/apalis-core/src/task/namespace.rs b/packages/apalis-core/src/task/namespace.rs index 16a5c9d0..dfed96be 100644 --- a/packages/apalis-core/src/task/namespace.rs +++ b/packages/apalis-core/src/task/namespace.rs @@ -4,6 +4,10 @@ use std::ops::Deref; use serde::{Deserialize, Serialize}; +use crate::error::Error; +use crate::request::Request; +use crate::service_fn::FromRequest; + /// A wrapper type that defines a task's namespace. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Namespace(pub String); @@ -39,3 +43,10 @@ impl AsRef for Namespace { &self.0 } } + +impl FromRequest> for Namespace { + fn from_request(req: &Request) -> Result { + let msg = "Missing `Namespace`. This is a bug, please file a report with the backend you are using".to_owned(); + req.parts.namespace.clone().ok_or(Error::MissingData(msg)) + } +} diff --git a/packages/apalis-core/src/task/task_id.rs b/packages/apalis-core/src/task/task_id.rs index 455e531f..22967055 100644 --- a/packages/apalis-core/src/task/task_id.rs +++ b/packages/apalis-core/src/task/task_id.rs @@ -6,6 +6,8 @@ use std::{ use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use ulid::Ulid; +use crate::{error::Error, request::Request, service_fn::FromRequest}; + /// A wrapper type that defines a task id. #[derive(Debug, Clone, Eq, Hash, PartialEq)] pub struct TaskId(Ulid); @@ -58,6 +60,12 @@ impl<'de> Deserialize<'de> for TaskId { } } +impl FromRequest> for TaskId { + fn from_request(req: &Request) -> Result { + Ok(req.parts.task_id.clone()) + } +} + struct TaskIdVisitor; impl<'de> Visitor<'de> for TaskIdVisitor { diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index e5cf5c69..99a1bb39 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -6,7 +6,6 @@ use crate::monitor::{Monitor, MonitorContext}; use crate::notify::Notify; use crate::poller::FetchNext; use crate::request::Request; -use crate::service_fn::FromRequest; use crate::Backend; use futures::future::Shared; use futures::{Future, FutureExt}; @@ -534,12 +533,6 @@ impl fmt::Debug for Context { } } -impl FromRequest> for Context { - fn from_request(req: &Request) -> Result { - req.get_checked::().cloned() - } -} - pin_project! { struct Tracked { worker: Context,