diff --git a/Cargo.toml b/Cargo.toml index dfa0551..878eab8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,13 +10,17 @@ members = [ [features] default = ["runtime-tokio-native-tls"] -runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls", "tokio"] -runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls", "tokio"] +runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls", "tokio", "crontab_runner/runtime-tokio-rustls", "archimedes_migrations/runtime-tokio-rustls"] +runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls", "tokio", "crontab_runner/runtime-tokio-native-tls", "archimedes_migrations/runtime-tokio-native-tls"] # For now we don't support async std # runtime-async-std-rustls = ["sqlx/runtime-async-std-rustls"] # runtime-async-std-native-tls = ["sqlx/runtime-async-std-native-tls"] [dependencies] +crontab_runner = { path = "./crates/crontab_runner" } +crontab_types = { path = "./crates/crontab_types" } +crontab_parser = { path = "./crates/crontab_parser" } +archimedes_migrations = { path = "./crates/migrations" } anyhow = "1.0.66" async-trait = "0.1.58" cargo-insta = "1.21.1" diff --git a/crates/migrations/Cargo.toml b/crates/migrations/Cargo.toml new file mode 100644 index 0000000..b7a1bde --- /dev/null +++ b/crates/migrations/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "archimedes_migrations" +version = "0.0.0" +edition = "2021" + +[features] +default = ["runtime-tokio-native-tls"] +runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"] +runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls"] + +[dependencies] +sqlx = "0.6.2" +tracing = "0.1.37" + +[dev-dependencies] +tokio = "1.23.0" diff --git a/src/migrations/mod.rs b/crates/migrations/src/lib.rs similarity index 97% rename from src/migrations/mod.rs rename to crates/migrations/src/lib.rs index e5bde5e..6cd8d3d 100644 --- a/src/migrations/mod.rs +++ b/crates/migrations/src/lib.rs @@ -12,7 +12,6 @@ mod m000011; mod m000012; mod m000013; -use crate::errors::Result; use sqlx::{query, Acquire, Error as SqlxError, PgExecutor, Postgres, Row}; use tracing::info; @@ -46,7 +45,7 @@ pub const MIGRATIONS: &[&[&str]] = &[ M000013_MIGRATION, ]; -async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<()> +async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error> where E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Clone, { @@ -75,7 +74,7 @@ where Ok(()) } -pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<()> +pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error> where E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone, { @@ -129,7 +128,7 @@ where #[cfg(test)] mod tests { - use crate::migrate::migrate; + use super::*; use sqlx::postgres::PgPoolOptions; #[tokio::test] diff --git a/src/migrations/m000001.rs b/crates/migrations/src/m000001.rs similarity index 100% rename from src/migrations/m000001.rs rename to crates/migrations/src/m000001.rs diff --git a/src/migrations/m000002.rs b/crates/migrations/src/m000002.rs similarity index 100% rename from src/migrations/m000002.rs rename to crates/migrations/src/m000002.rs diff --git a/src/migrations/m000003.rs b/crates/migrations/src/m000003.rs similarity index 100% rename from src/migrations/m000003.rs rename to crates/migrations/src/m000003.rs diff --git a/src/migrations/m000004.rs b/crates/migrations/src/m000004.rs similarity index 100% rename from src/migrations/m000004.rs rename to crates/migrations/src/m000004.rs diff --git a/src/migrations/m000005.rs b/crates/migrations/src/m000005.rs similarity index 100% rename from src/migrations/m000005.rs rename to crates/migrations/src/m000005.rs diff --git a/src/migrations/m000006.rs b/crates/migrations/src/m000006.rs similarity index 100% rename from src/migrations/m000006.rs rename to crates/migrations/src/m000006.rs diff --git a/src/migrations/m000007.rs b/crates/migrations/src/m000007.rs similarity index 100% rename from src/migrations/m000007.rs rename to crates/migrations/src/m000007.rs diff --git a/src/migrations/m000008.rs b/crates/migrations/src/m000008.rs similarity index 100% rename from src/migrations/m000008.rs rename to crates/migrations/src/m000008.rs diff --git a/src/migrations/m000009.rs b/crates/migrations/src/m000009.rs similarity index 100% rename from src/migrations/m000009.rs rename to crates/migrations/src/m000009.rs diff --git a/src/migrations/m000010.rs b/crates/migrations/src/m000010.rs similarity index 100% rename from src/migrations/m000010.rs rename to crates/migrations/src/m000010.rs diff --git a/src/migrations/m000011.rs b/crates/migrations/src/m000011.rs similarity index 100% rename from src/migrations/m000011.rs rename to crates/migrations/src/m000011.rs diff --git a/src/migrations/m000012.rs b/crates/migrations/src/m000012.rs similarity index 100% rename from src/migrations/m000012.rs rename to crates/migrations/src/m000012.rs diff --git a/src/migrations/m000013.rs b/crates/migrations/src/m000013.rs similarity index 100% rename from src/migrations/m000013.rs rename to crates/migrations/src/m000013.rs diff --git a/src/worker/builder.rs b/src/builder.rs similarity index 98% rename from src/worker/builder.rs rename to src/builder.rs index 483460d..dd3857b 100644 --- a/src/worker/builder.rs +++ b/src/builder.rs @@ -1,19 +1,19 @@ +use crate::runner::WorkerFn; +use crate::sql::task_identifiers::get_tasks_details; +use crate::utils::escape_identifier; +use crate::{Worker, WorkerContext}; +use archimedes_migrations::migrate; +use futures::FutureExt; +use rand::RngCore; +use serde::Deserialize; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; use std::collections::HashMap; use std::fmt::Debug; use std::future::Future; use std::sync::Arc; use std::time::Duration; -use futures::FutureExt; -use rand::RngCore; -use serde::Deserialize; -use sqlx::PgPool; -use sqlx::postgres::PgPoolOptions; use thiserror::Error; -use crate::migrations::migrate; -use crate::sql::task_identifiers::get_tasks_details; -use crate::utils::escape_identifier; -use crate::{Worker, WorkerContext}; -use crate::worker::WorkerFn; #[derive(Default)] pub struct WorkerOptions { diff --git a/src/cron/mod.rs b/src/cron/mod.rs deleted file mode 100644 index b93e263..0000000 --- a/src/cron/mod.rs +++ /dev/null @@ -1 +0,0 @@ -mod parser; diff --git a/src/cron/parser.rs b/src/cron/parser.rs deleted file mode 100644 index a8587d3..0000000 --- a/src/cron/parser.rs +++ /dev/null @@ -1,31 +0,0 @@ -use serde::Deserialize; - -pub struct Cron { - timing: CronTiming, - task: String, - options: CronOptions, - payload: serde_json::Value, -} - -pub struct CronTiming { - minutes: CronTimingType, - hours: CronTimingType, - days: CronTimingType, - months: CronTimingType, - days_of_week: CronTimingType, -} - -pub enum CronTimingType { - Numbers(Vec), - Range(u8, u8), - Wildcard(Option), -} - -#[derive(Deserialize)] -pub struct CronOptions { - identifier: Option, - backfill_period: Option, - max_attempts: Option, - queue_name: Option, - priority: Option, -} diff --git a/src/lib.rs b/src/lib.rs index f1706d7..61806bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ +mod builder; pub mod errors; -pub mod migrations; +mod runner; mod sql; mod streams; mod utils; -mod worker; -pub use worker::builder::{WorkerBuildError, WorkerOptions}; -pub use worker::{Worker, WorkerContext}; +pub use crontab_parser::parse_crontab; + +pub use builder::{WorkerBuildError, WorkerOptions}; +pub use runner::{Worker, WorkerContext}; diff --git a/src/worker/mod.rs b/src/runner.rs similarity index 93% rename from src/worker/mod.rs rename to src/runner.rs index 446932e..f74da90 100644 --- a/src/worker/mod.rs +++ b/src/runner.rs @@ -1,5 +1,3 @@ -pub mod builder; - use std::fmt::Debug; use std::future::{ready, Future}; use std::pin::Pin; @@ -10,13 +8,13 @@ use crate::errors::ArchimedesError; use crate::sql::get_job::Job; use crate::sql::{get_job::get_job, task_identifiers::TaskDetails}; use crate::streams::job_signal_stream; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use getset::Getters; use thiserror::Error; use tracing::{debug, error, info, warn}; +use crate::builder::WorkerOptions; use crate::sql::complete_job::complete_job; -use crate::worker::builder::WorkerOptions; use crate::{sql::fail_job::fail_job, streams::StreamSource}; #[derive(Clone, Getters)] @@ -33,20 +31,20 @@ impl From<&Worker> for WorkerContext { } } -type WorkerFn = +pub type WorkerFn = Box Pin> + Send>>>; #[derive(Getters)] #[getset(get = "pub")] pub struct Worker { - worker_id: String, - concurrency: usize, - poll_interval: Duration, - jobs: HashMap, - pg_pool: sqlx::PgPool, - escaped_schema: String, - task_details: TaskDetails, - forbidden_flags: Vec, + pub(crate) worker_id: String, + pub(crate) concurrency: usize, + pub(crate) poll_interval: Duration, + pub(crate) jobs: HashMap, + pub(crate) pg_pool: sqlx::PgPool, + pub(crate) escaped_schema: String, + pub(crate) task_details: TaskDetails, + pub(crate) forbidden_flags: Vec, } #[derive(Error, Debug)] diff --git a/src/sql/get_job.rs b/src/sql/get_job.rs index a263bd6..4cbc934 100644 --- a/src/sql/get_job.rs +++ b/src/sql/get_job.rs @@ -9,6 +9,7 @@ use super::task_identifiers::TaskDetails; #[derive(FromRow, Getters, Debug)] #[getset(get = "pub")] +#[allow(dead_code)] pub struct Job { id: i64, /// FK to tasks