diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f9cdc516..a2eeab33 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,7 +24,7 @@ jobs: - uses: actions-rs/cargo@v1 with: command: check - args: --features async-std-comp --no-default-features + args: --workspace --all-features test: name: Test Suite @@ -39,7 +39,7 @@ jobs: - uses: actions-rs/cargo@v1 with: command: test - args: --features tokio-comp + args: --workspace --all-features fmt: @@ -72,4 +72,4 @@ jobs: - uses: actions-rs/cargo@v1 with: command: clippy - args: --features tokio-comp -- -D warnings + args: --workspace --all-features diff --git a/examples/async-std-runtime/src/main.rs b/examples/async-std-runtime/src/main.rs index 646bb05c..0fcd9955 100644 --- a/examples/async-std-runtime/src/main.rs +++ b/examples/async-std-runtime/src/main.rs @@ -23,7 +23,7 @@ impl From> for Reminder { async fn send_in_background(reminder: Reminder) { apalis_core::sleep(Duration::from_secs(2)).await; - debug!("Called at {reminder:?}"); + debug!("Called at {:?}", reminder.0); } async fn send_reminder(reminder: Reminder, worker: WorkerCtx) -> bool { // this will happen in the workers background and wont block the next tasks diff --git a/examples/basics/src/main.rs b/examples/basics/src/main.rs index cc7fe38d..341f8f55 100644 --- a/examples/basics/src/main.rs +++ b/examples/basics/src/main.rs @@ -98,7 +98,7 @@ async fn main() -> Result<(), std::io::Error> { Monitor::::new() .register_with_count(2, { - WorkerBuilder::new("tasty-banana".to_string()) + WorkerBuilder::new("tasty-banana") .layer(TraceLayer::new()) .layer(LogLayer::new("some-log-example")) // Add shared context to all jobs executed by this worker diff --git a/examples/mysql/src/main.rs b/examples/mysql/src/main.rs index 848f6f0b..9ec2dca3 100644 --- a/examples/mysql/src/main.rs +++ b/examples/mysql/src/main.rs @@ -34,7 +34,7 @@ async fn main() -> Result<()> { Monitor::new_with_executor(TokioExecutor) .register_with_count(1, { - WorkerBuilder::new(format!("tasty-avocado")) + WorkerBuilder::new("tasty-avocado") .layer(TraceLayer::new()) .with_storage(mysql) .build_fn(send_email) diff --git a/examples/postgres/src/main.rs b/examples/postgres/src/main.rs index 88650dc8..b4983a11 100644 --- a/examples/postgres/src/main.rs +++ b/examples/postgres/src/main.rs @@ -2,9 +2,8 @@ use anyhow::Result; use apalis::layers::retry::RetryPolicy; use apalis::postgres::PgPool; use apalis::prelude::*; -use apalis::{layers::tracing::TraceLayer, postgres::PostgresStorage}; +use apalis::{layers::retry::RetryLayer, layers::tracing::TraceLayer, postgres::PostgresStorage}; use email_service::{send_email, Email}; -use tower::retry::RetryLayer; use tracing::{debug, info}; async fn produce_jobs(storage: &PostgresStorage) -> Result<()> { diff --git a/examples/tracing/Cargo.toml b/examples/tracing/Cargo.toml index 7872597c..c7b09360 100644 --- a/examples/tracing/Cargo.toml +++ b/examples/tracing/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" anyhow = "1" apalis = { path = "../../", features = ["redis"] } serde = "1" -tokio = { version ="1", features = ["macros"]} +tokio = { version ="1", features = ["full"]} env_logger = "0.10" tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/examples/tracing/src/main.rs b/examples/tracing/src/main.rs index ad56a8f5..cce99bdb 100644 --- a/examples/tracing/src/main.rs +++ b/examples/tracing/src/main.rs @@ -29,10 +29,11 @@ impl fmt::Display for InvalidEmailError { impl Error for InvalidEmailError {} -async fn email_service(_email: Email) { +async fn email_service(email: Email) -> Result<(), InvalidEmailError> { tracing::info!("Checking if dns configured"); sleep(Duration::from_millis(1008)).await; tracing::info!("Sent in 1 sec"); + Err(InvalidEmailError { email: email.to }) } async fn produce_jobs(mut storage: RedisStorage) -> Result<()> { diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index f39b3629..ffb391c5 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -496,7 +496,7 @@ where async fn len(&self) -> Result { let mut conn = self.conn.clone(); let all_jobs: i64 = redis::cmd("HLEN") - .arg(&self.queue.job_data_hash.to_string()) + .arg(self.queue.job_data_hash.to_string()) .query_async(&mut conn) .await?; let done_jobs: i64 = redis::cmd("ZCOUNT") @@ -511,7 +511,7 @@ where async fn fetch_by_id(&self, job_id: &TaskId) -> Result>, RedisError> { let mut conn = self.conn.clone(); let data: Value = redis::cmd("HMGET") - .arg(&self.queue.job_data_hash.to_string()) + .arg(self.queue.job_data_hash.to_string()) .arg(job_id.to_string()) .query_async(&mut conn) .await?; @@ -538,7 +538,7 @@ where .encode(&job) .map_err(|e| (ErrorKind::IoError, "Encode error", e.to_string()))?; let _: i64 = redis::cmd("HSET") - .arg(&self.queue.job_data_hash.to_string()) + .arg(self.queue.job_data_hash.to_string()) .arg(job.ctx.id.to_string()) .arg(bytes) .query_async(&mut conn) diff --git a/src/layers/prometheus/mod.rs b/src/layers/prometheus/mod.rs index 7202a02a..bb39cd2d 100644 --- a/src/layers/prometheus/mod.rs +++ b/src/layers/prometheus/mod.rs @@ -88,10 +88,9 @@ where ("name", this.operation.to_string()), ("namespace", this.job_type.to_string()), ("status", status), - ("latency", latency.to_string()), ]; - metrics::counter!("requests_total", &labels); - metrics::histogram!("request_duration_seconds", &labels); + metrics::counter!("requests_total", &labels).increment(1); + metrics::histogram!("request_duration_seconds", &labels).record(latency); Poll::Ready(response) } } diff --git a/src/layers/retry/mod.rs b/src/layers/retry/mod.rs index 982fabad..5b5ae460 100644 --- a/src/layers/retry/mod.rs +++ b/src/layers/retry/mod.rs @@ -35,9 +35,9 @@ impl Policy, Res, Err> for RetryPolicy where T: Clone, { - type Future = future::Ready; + type Future = future::Ready<()>; - fn retry(&self, req: &Req, result: Result<&Res, &Err>) -> Option { + fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { let ctx = req.get::().cloned().unwrap_or_default(); match result { Ok(_) => { @@ -45,12 +45,12 @@ where // so don't retry... None } - Err(_) if (self.retries - ctx.current() > 0) => Some(future::ready(self.clone())), + Err(_) if (self.retries - ctx.current() > 0) => Some(future::ready(())), Err(_) => None, } } - fn clone_request(&self, req: &Req) -> Option> { + fn clone_request(&mut self, req: &Req) -> Option> { let mut req = req.clone(); let value = req .get::()