Skip to content

Commit

Permalink
chore: Refactor folder structure
Browse files Browse the repository at this point in the history
- put migrations in its own crate
- remove unnecessary worker module
- remove unused folders
  • Loading branch information
leo91000 committed Dec 19, 2022
1 parent 361906e commit ed29cec
Show file tree
Hide file tree
Showing 22 changed files with 53 additions and 65 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ members = [

[features]
default = ["runtime-tokio-native-tls"]
runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls", "tokio"]
runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls", "tokio"]
runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls", "tokio", "crontab_runner/runtime-tokio-rustls", "archimedes_migrations/runtime-tokio-rustls"]
runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls", "tokio", "crontab_runner/runtime-tokio-native-tls", "archimedes_migrations/runtime-tokio-native-tls"]
# For now we don't support async std
# runtime-async-std-rustls = ["sqlx/runtime-async-std-rustls"]
# runtime-async-std-native-tls = ["sqlx/runtime-async-std-native-tls"]

[dependencies]
crontab_runner = { path = "./crates/crontab_runner" }
crontab_types = { path = "./crates/crontab_types" }
crontab_parser = { path = "./crates/crontab_parser" }
archimedes_migrations = { path = "./crates/migrations" }
anyhow = "1.0.66"
async-trait = "0.1.58"
cargo-insta = "1.21.1"
Expand Down
16 changes: 16 additions & 0 deletions crates/migrations/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "archimedes_migrations"
version = "0.0.0"
edition = "2021"

[features]
default = ["runtime-tokio-native-tls"]
runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"]
runtime-tokio-native-tls = ["sqlx/runtime-tokio-native-tls"]

[dependencies]
sqlx = "0.6.2"
tracing = "0.1.37"

[dev-dependencies]
tokio = "1.23.0"
7 changes: 3 additions & 4 deletions src/migrations/mod.rs → crates/migrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod m000011;
mod m000012;
mod m000013;

use crate::errors::Result;
use sqlx::{query, Acquire, Error as SqlxError, PgExecutor, Postgres, Row};
use tracing::info;

Expand Down Expand Up @@ -46,7 +45,7 @@ pub const MIGRATIONS: &[&[&str]] = &[
M000013_MIGRATION,
];

async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<()>
async fn install_schema<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error>
where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Clone,
{
Expand Down Expand Up @@ -75,7 +74,7 @@ where
Ok(())
}

pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<()>
pub async fn migrate<'e, E>(executor: E, escaped_schema: &str) -> Result<(), sqlx::Error>
where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone,
{
Expand Down Expand Up @@ -129,7 +128,7 @@ where

#[cfg(test)]
mod tests {
use crate::migrate::migrate;
use super::*;
use sqlx::postgres::PgPoolOptions;

#[tokio::test]
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
20 changes: 10 additions & 10 deletions src/worker/builder.rs → src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use crate::runner::WorkerFn;
use crate::sql::task_identifiers::get_tasks_details;
use crate::utils::escape_identifier;
use crate::{Worker, WorkerContext};
use archimedes_migrations::migrate;
use futures::FutureExt;
use rand::RngCore;
use serde::Deserialize;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use futures::FutureExt;
use rand::RngCore;
use serde::Deserialize;
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
use thiserror::Error;
use crate::migrations::migrate;
use crate::sql::task_identifiers::get_tasks_details;
use crate::utils::escape_identifier;
use crate::{Worker, WorkerContext};
use crate::worker::WorkerFn;

#[derive(Default)]
pub struct WorkerOptions {
Expand Down
1 change: 0 additions & 1 deletion src/cron/mod.rs

This file was deleted.

31 changes: 0 additions & 31 deletions src/cron/parser.rs

This file was deleted.

10 changes: 6 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod builder;
pub mod errors;
pub mod migrations;
mod runner;
mod sql;
mod streams;
mod utils;
mod worker;

pub use worker::builder::{WorkerBuildError, WorkerOptions};
pub use worker::{Worker, WorkerContext};
pub use crontab_parser::parse_crontab;

pub use builder::{WorkerBuildError, WorkerOptions};
pub use runner::{Worker, WorkerContext};
24 changes: 11 additions & 13 deletions src/worker/mod.rs → src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub mod builder;

use std::fmt::Debug;
use std::future::{ready, Future};
use std::pin::Pin;
Expand All @@ -10,13 +8,13 @@ use crate::errors::ArchimedesError;
use crate::sql::get_job::Job;
use crate::sql::{get_job::get_job, task_identifiers::TaskDetails};
use crate::streams::job_signal_stream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use getset::Getters;
use thiserror::Error;
use tracing::{debug, error, info, warn};

use crate::builder::WorkerOptions;
use crate::sql::complete_job::complete_job;
use crate::worker::builder::WorkerOptions;
use crate::{sql::fail_job::fail_job, streams::StreamSource};

#[derive(Clone, Getters)]
Expand All @@ -33,20 +31,20 @@ impl From<&Worker> for WorkerContext {
}
}

type WorkerFn =
pub type WorkerFn =
Box<dyn Fn(WorkerContext, String) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>>;

#[derive(Getters)]
#[getset(get = "pub")]
pub struct Worker {
worker_id: String,
concurrency: usize,
poll_interval: Duration,
jobs: HashMap<String, WorkerFn>,
pg_pool: sqlx::PgPool,
escaped_schema: String,
task_details: TaskDetails,
forbidden_flags: Vec<String>,
pub(crate) worker_id: String,
pub(crate) concurrency: usize,
pub(crate) poll_interval: Duration,
pub(crate) jobs: HashMap<String, WorkerFn>,
pub(crate) pg_pool: sqlx::PgPool,
pub(crate) escaped_schema: String,
pub(crate) task_details: TaskDetails,
pub(crate) forbidden_flags: Vec<String>,
}

#[derive(Error, Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/sql/get_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::task_identifiers::TaskDetails;

#[derive(FromRow, Getters, Debug)]
#[getset(get = "pub")]
#[allow(dead_code)]
pub struct Job {
id: i64,
/// FK to tasks
Expand Down

0 comments on commit ed29cec

Please sign in to comment.