Skip to content

Commit

Permalink
chore: Add crontab runner to the worker main run function
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Dec 20, 2022
1 parent 15f8330 commit 340445a
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ hex = "0.4.3"

[dev-dependencies]
tokio = { version = "1.23.0", features = ["macros"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
2 changes: 1 addition & 1 deletion crates/crontab_parser/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use archimedes_crontab_types::Crontab;
use archimedes_crontab_types::Crontab;
pub use nom::error::ErrorKind;
use nom_crontab::nom_crontab;
use thiserror::Error;
Expand Down
4 changes: 2 additions & 2 deletions crates/crontab_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, warn};
pub use crate::sql::ScheduleCronJobError;
use crate::{
sql::{schedule_cron_jobs, CrontabJob},
utils::{round_date_minute, sleep_until, DURATION_ZERO, ONE_MINUTE},
utils::{round_date_minute, sleep_until, ONE_MINUTE},
};

mod backfill;
Expand Down Expand Up @@ -42,7 +42,7 @@ pub async fn cron_main<'e>(
let current_ts = round_date_minute(Local::now(), false);
let ts_delta = current_ts - ts;

match ts_delta.cmp(&*DURATION_ZERO) {
match ts_delta.num_minutes().cmp(&0) {
Ordering::Greater => {
warn!(
"Cron fired {}s too early (clock skew?); rescheduling",
Expand Down
28 changes: 21 additions & 7 deletions crates/crontab_runner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CrontabJob {
pub struct CrontabJobInner {
task: String,
payload: Option<serde_json::Value>,
queue_name: Option<String>,
Expand All @@ -69,6 +69,13 @@ pub struct CrontabJob {
priority: Option<i16>,
}

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CrontabJob {
identifier: String,
job: CrontabJobInner,
}

#[derive(Error, Debug)]
pub enum ScheduleCronJobError {
#[error("An sql error occured while scheduling cron job : {0}")]
Expand Down Expand Up @@ -155,12 +162,19 @@ impl CrontabJob {
}

Self {
task: crontab.task_identifier.to_owned(),
payload: crontab.payload.to_owned(),
queue_name: crontab.options.queue.to_owned(),
run_at: ts.with_timezone(&Local),
max_attempts: crontab.options.max.to_owned(),
priority: crontab.options.priority.to_owned(),
identifier: crontab
.options()
.id()
.to_owned()
.unwrap_or_else(|| crontab.task_identifier.to_owned()),
job: CrontabJobInner {
task: crontab.task_identifier.to_owned(),
payload: crontab.payload.to_owned(),
queue_name: crontab.options.queue.to_owned(),
run_at: ts.with_timezone(&Local),
max_attempts: crontab.options.max.to_owned(),
priority: crontab.options.priority.to_owned(),
},
}
}
}
57 changes: 57 additions & 0 deletions examples/crontab.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::str::FromStr;

use archimedes::{WorkerContext, WorkerOptions};
use serde::Deserialize;
use sqlx::postgres::PgConnectOptions;
use tracing_subscriber::{
filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
};

#[derive(Deserialize)]
struct HelloPayload {
message: String,
}

async fn say_hello(_ctx: WorkerContext, payload: HelloPayload) -> Result<(), ()> {
println!("Hello {} !", payload.message);
Ok(())
}

fn enable_logs() {
let fmt_layer = tracing_subscriber::fmt::layer();
// Log level set to debug except for sqlx set at warn (to not show all sql requests)
let filter_layer = EnvFilter::try_new("debug,sqlx=warn").unwrap();

tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}

#[tokio::main]
async fn main() {
enable_logs();

let pg_options = PgConnectOptions::from_str("postgres://postgres:root@localhost:5432").unwrap();

let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect_with(pg_options)
.await
.unwrap();

WorkerOptions::default()
.concurrency(2)
.schema("example_simple_worker")
.define_job("say_hello", say_hello)
.pg_pool(pg_pool)
// Run say_hello every two minutes
.with_crontab(r#"*/2 * * * * say_hello ?fill=10m {message:"Crontab"}"#)
.unwrap()
.init()
.await
.unwrap()
.run()
.await
.unwrap();
}
16 changes: 15 additions & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::str::FromStr;
use archimedes::{WorkerContext, WorkerOptions};
use serde::Deserialize;
use sqlx::postgres::PgConnectOptions;
use tracing_subscriber::{
filter::EnvFilter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
};

#[derive(Deserialize)]
struct HelloPayload {
Expand All @@ -14,9 +17,20 @@ async fn say_hello(_ctx: WorkerContext, payload: HelloPayload) -> Result<(), ()>
Ok(())
}

fn enable_logs() {
let fmt_layer = tracing_subscriber::fmt::layer();
// Log level set to debug except for sqlx set at warn (to not show all sql requests)
let filter_layer = EnvFilter::try_new("debug,sqlx=warn").unwrap();

tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}

#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "WARN");
enable_logs();

let pg_options = PgConnectOptions::from_str("postgres://postgres:root@localhost:5432").unwrap();

Expand Down
22 changes: 22 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::runner::WorkerFn;
use crate::sql::task_identifiers::get_tasks_details;
use crate::utils::escape_identifier;
use crate::{Worker, WorkerContext};
use archimedes_crontab_parser::{parse_crontab, CrontabParseError};
use archimedes_crontab_types::Crontab;
use archimedes_migrations::migrate;
use futures::FutureExt;
use rand::RngCore;
Expand All @@ -25,6 +27,8 @@ pub struct WorkerOptions {
max_pg_conn: Option<u32>,
schema: Option<String>,
forbidden_flags: Vec<String>,
crontabs: Option<Vec<Crontab>>,
use_local_time: bool,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -78,6 +82,8 @@ impl WorkerOptions {
escaped_schema,
task_details,
forbidden_flags: self.forbidden_flags,
crontabs: self.crontabs.unwrap_or_default(),
use_local_time: self.use_local_time,
};

Ok(worker)
Expand Down Expand Up @@ -149,4 +155,20 @@ impl WorkerOptions {
self.forbidden_flags.push(flag.into());
self
}

pub fn with_crontab(mut self, input: &str) -> Result<Self, CrontabParseError> {
let mut crontabs = parse_crontab(input)?;
match self.crontabs.as_mut() {
Some(c) => c.append(&mut crontabs),
None => {
self.crontabs = Some(crontabs);
}
}
Ok(self)
}

pub fn use_local_time(mut self, value: bool) -> Self {
self.use_local_time = value;
self
}
}
37 changes: 34 additions & 3 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::errors::ArchimedesError;
use crate::sql::get_job::Job;
use crate::sql::{get_job::get_job, task_identifiers::TaskDetails};
use crate::streams::job_signal_stream;
use futures::{StreamExt, TryStreamExt};
use archimedes_crontab_runner::{cron_main, ScheduleCronJobError};
use archimedes_crontab_types::Crontab;
use futures::{try_join, StreamExt, TryStreamExt};
use getset::Getters;
use thiserror::Error;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -45,14 +47,18 @@ pub struct Worker {
pub(crate) escaped_schema: String,
pub(crate) task_details: TaskDetails,
pub(crate) forbidden_flags: Vec<String>,
pub(crate) crontabs: Vec<Crontab>,
pub(crate) use_local_time: bool,
}

#[derive(Error, Debug)]
pub enum WorkerRuntimeError {
#[error("Unexpected error occured while processing job : '{0}'")]
ProcessJobError(#[from] ProcessJobError),
ProcessJob(#[from] ProcessJobError),
#[error("Failed to listen to postgres notifications : '{0}'")]
PgListenError(#[from] ArchimedesError),
PgListen(#[from] ArchimedesError),
#[error("Error occured while trying to schedule cron job : {0}")]
Crontab(#[from] ScheduleCronJobError),
}

impl Worker {
Expand All @@ -61,6 +67,15 @@ impl Worker {
}

pub async fn run(&self) -> Result<(), WorkerRuntimeError> {
let job_runner = self.job_runner();
let crontab_scheduler = self.crontab_scheduler();

try_join!(crontab_scheduler, job_runner)?;

Ok(())
}

async fn job_runner(&self) -> Result<(), WorkerRuntimeError> {
let job_signal = job_signal_stream(self.pg_pool.clone(), self.poll_interval).await?;

job_signal
Expand All @@ -75,6 +90,22 @@ impl Worker {

Ok(())
}

async fn crontab_scheduler<'e>(&self) -> Result<(), WorkerRuntimeError> {
if self.crontabs().is_empty() {
return Ok(());
}

cron_main(
self.pg_pool(),
self.escaped_schema(),
self.crontabs(),
*self.use_local_time(),
)
.await?;

Ok(())
}
}

#[derive(Error, Debug)]
Expand Down

0 comments on commit 340445a

Please sign in to comment.