From 4861dd41ce6c381e712bb28855f67b4cde149f9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Coletta?= Date: Mon, 25 Dec 2023 13:08:17 +0100 Subject: [PATCH] fix: Fix a bug where jobs would block event loop --- crates/crontab_runner/src/lib.rs | 2 -- crates/crontab_types/src/lib.rs | 2 ++ examples/crontab.rs | 10 ++++++++-- justfile | 5 +++++ src/runner.rs | 17 ++++++++++------- src/streams.rs | 4 ++-- 6 files changed, 27 insertions(+), 13 deletions(-) create mode 100644 justfile diff --git a/crates/crontab_runner/src/lib.rs b/crates/crontab_runner/src/lib.rs index 9a0e033..08d6785 100644 --- a/crates/crontab_runner/src/lib.rs +++ b/crates/crontab_runner/src/lib.rs @@ -48,8 +48,6 @@ pub async fn cron_main<'e>( let current_ts = round_date_minute(Local::now(), false); let ts_delta = current_ts - ts; - dbg!(ts_delta, current_ts, ts); - match ts_delta.num_minutes().cmp(&0) { Ordering::Greater => { warn!( diff --git a/crates/crontab_types/src/lib.rs b/crates/crontab_types/src/lib.rs index ac82443..6cb8334 100644 --- a/crates/crontab_types/src/lib.rs +++ b/crates/crontab_types/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::non_canonical_partial_ord_impl)] + use chrono::prelude::*; use getset::Getters; diff --git a/examples/crontab.rs b/examples/crontab.rs index 6914230..780793c 100644 --- a/examples/crontab.rs +++ b/examples/crontab.rs @@ -43,12 +43,18 @@ async fn main() { .unwrap(); WorkerOptions::default() - .concurrency(2) + .concurrency(10) .schema("example_simple_worker") .define_job("say_hello", say_hello) + .define_job("say_hello_2", say_hello) .pg_pool(pg_pool) // Run say_hello every two minutes with a backfill of 10 minutes - .with_crontab(r#"*/2 * * * * say_hello ?fill=10m {message:"Crontab"}"#) + .with_crontab( + r#" + */2 * * * * say_hello ?fill=10m {message:"Crontab"} + */1 * * * * say_hello_2 ?fill=10m {message:"Crontab"} + "#, + ) .unwrap() .init() .await diff --git a/justfile b/justfile new file mode 100644 index 0000000..2c5fd6b --- /dev/null +++ b/justfile @@ -0,0 +1,5 @@ +check-fmt: + cargo fmt --all -- --check + +check-clippy: + cargo clippy --all -- -D warnings diff --git a/src/runner.rs b/src/runner.rs index 42e9e0e..0dcaf67 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,5 +1,5 @@ use std::fmt::Debug; -use std::future::{ready, Future}; +use std::future::Future; use std::pin::Pin; use std::time::Duration; use std::{collections::HashMap, time::Instant}; @@ -86,12 +86,15 @@ impl Worker { .await?; job_signal - .then(|source| process_one_job(self, source)) - .try_for_each_concurrent(self.concurrency, |j| { - if let Some(j) = &j { - debug!("Job id={} processed", j.id()); + .map(Ok::<_, ProcessJobError>) + .try_for_each_concurrent(self.concurrency, |source| async move { + let res = process_one_job(self, source).await?; + + if let Some(job) = res { + debug!(job_id = job.id(), "Job processed"); } - ready(Ok(())) + + Ok(()) }) .await?; @@ -145,7 +148,7 @@ async fn process_one_job( Some(job) => { let job_result = run_job(&job, worker, &source).await; release_job(job_result, &job, worker).await.map_err(|e| { - error!("{:?}", e); + error!("Release job error : {:?}", e); e })?; Ok(Some(job)) diff --git a/src/streams.rs b/src/streams.rs index 954cb3d..1d7bcd3 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -26,8 +26,8 @@ pub async fn job_signal_stream( let stream = stream::unfold((interval, pg_listener, shutdown_signal), |mut f| async { tokio::select! { - _ = (&mut f.0).tick() => Some((StreamSource::Polling, f)), - _ = (&mut f.1).recv() => Some((StreamSource::PgListener, f)), + _ = (f.0).tick() => Some((StreamSource::Polling, f)), + _ = (f.1).recv() => Some((StreamSource::PgListener, f)), _ = &mut f.2 => None, } });