Skip to content

Commit

Permalink
fix: Fix a bug where jobs would block event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Dec 25, 2023
1 parent 985cc77 commit 4861dd4
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 13 deletions.
2 changes: 0 additions & 2 deletions crates/crontab_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ pub async fn cron_main<'e>(
let current_ts = round_date_minute(Local::now(), false);
let ts_delta = current_ts - ts;

dbg!(ts_delta, current_ts, ts);

match ts_delta.num_minutes().cmp(&0) {
Ordering::Greater => {
warn!(
Expand Down
2 changes: 2 additions & 0 deletions crates/crontab_types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::non_canonical_partial_ord_impl)]

use chrono::prelude::*;
use getset::Getters;

Expand Down
10 changes: 8 additions & 2 deletions examples/crontab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ async fn main() {
.unwrap();

WorkerOptions::default()
.concurrency(2)
.concurrency(10)
.schema("example_simple_worker")
.define_job("say_hello", say_hello)
.define_job("say_hello_2", say_hello)
.pg_pool(pg_pool)
// Run say_hello every two minutes with a backfill of 10 minutes
.with_crontab(r#"*/2 * * * * say_hello ?fill=10m {message:"Crontab"}"#)
.with_crontab(
r#"
*/2 * * * * say_hello ?fill=10m {message:"Crontab"}
*/1 * * * * say_hello_2 ?fill=10m {message:"Crontab"}
"#,
)
.unwrap()
.init()
.await
Expand Down
5 changes: 5 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
check-fmt:
cargo fmt --all -- --check

check-clippy:
cargo clippy --all -- -D warnings
17 changes: 10 additions & 7 deletions src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fmt::Debug;
use std::future::{ready, Future};
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use std::{collections::HashMap, time::Instant};
Expand Down Expand Up @@ -86,12 +86,15 @@ impl Worker {
.await?;

job_signal
.then(|source| process_one_job(self, source))
.try_for_each_concurrent(self.concurrency, |j| {
if let Some(j) = &j {
debug!("Job id={} processed", j.id());
.map(Ok::<_, ProcessJobError>)
.try_for_each_concurrent(self.concurrency, |source| async move {
let res = process_one_job(self, source).await?;

if let Some(job) = res {
debug!(job_id = job.id(), "Job processed");
}
ready(Ok(()))

Ok(())
})
.await?;

Expand Down Expand Up @@ -145,7 +148,7 @@ async fn process_one_job(
Some(job) => {
let job_result = run_job(&job, worker, &source).await;
release_job(job_result, &job, worker).await.map_err(|e| {
error!("{:?}", e);
error!("Release job error : {:?}", e);
e
})?;
Ok(Some(job))
Expand Down
4 changes: 2 additions & 2 deletions src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub async fn job_signal_stream(

let stream = stream::unfold((interval, pg_listener, shutdown_signal), |mut f| async {
tokio::select! {
_ = (&mut f.0).tick() => Some((StreamSource::Polling, f)),
_ = (&mut f.1).recv() => Some((StreamSource::PgListener, f)),
_ = (f.0).tick() => Some((StreamSource::Polling, f)),
_ = (f.1).recv() => Some((StreamSource::PgListener, f)),
_ = &mut f.2 => None,
}
});
Expand Down

0 comments on commit 4861dd4

Please sign in to comment.