Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching #781

Merged
merged 12 commits into from
Apr 24, 2024
371 changes: 271 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,3 @@ serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
strum = { version = "0.26.2", features = ["derive"] }
tracing = { version = "0.1.40", features = ["attributes"] }

[profile.release]
opt-level = 's'
strip = true
lto = "thin"
12 changes: 9 additions & 3 deletions apps/backend/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
[package]
name = "ryot"
version = "5.0.7"
version = "5.0.8"
edition = "2021"
repository = "https://github.com/IgnisDa/ryot"
license = "GPL-3.0"

[dependencies]
anyhow = { workspace = true }
apalis = { version = "0.5.1", features = ["cron", "limit", "sqlite"] }
# TODO: Upgrade when https://github.com/geofmureithi/apalis/issues/301 is fixed
apalis = { version = "0.4.9", features = [
"cron",
"extensions",
"limit",
"sqlite",
] }
argon2 = "0.5.3"
async-graphql = { workspace = true }
async-graphql-axum = "7.0.3"
async-trait = "0.1.80"
aws-sdk-s3 = { version = "1.23.0", features = ["behavior-version-latest"] }
axum = { version = "0.7.5", features = ["macros", "multipart"] }
boilermates = "0.3.0"
cached = { version = "0.49.3", features = ["disk_store"] }
chrono = { workspace = true }
chrono-tz = "0.9.0"
config = { path = "../../libs/config" }
Expand Down Expand Up @@ -52,7 +59,6 @@ openidconnect = "3.5.0"
paginate = "1.1.11"
rand = "0.8.5"
regex = "1.10.4"
retainer = "0.3.0"
rs-utils = { path = "../../libs/rs-utils" }
rust_decimal = "1.35.0"
rust_decimal_macros = "1.34.2"
Expand Down
41 changes: 20 additions & 21 deletions apps/backend/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ impl Job for ScheduledJob {
const NAME: &'static str = "apalis::ScheduledJob";
}

pub async fn media_jobs(
_information: ScheduledJob,
misc_service: Data<Arc<MiscellaneousService>>,
importer_service: Data<Arc<ImporterService>>,
) -> Result<(), Error> {
pub async fn media_jobs(_information: ScheduledJob, ctx: JobContext) -> Result<(), JobError> {
let misc_service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
let importer_service = ctx.data::<Arc<ImporterService>>().unwrap();
if env::var("DISABLE_INVALIDATE_IMPORT_JOBS").is_err() {
tracing::trace!("Invalidating invalid media import jobs");
importer_service.invalidate_import_jobs().await.unwrap();
Expand Down Expand Up @@ -74,26 +72,25 @@ pub async fn media_jobs(
Ok(())
}

pub async fn user_jobs(
_information: ScheduledJob,
service: Data<Arc<MiscellaneousService>>,
) -> Result<(), Error> {
pub async fn user_jobs(_information: ScheduledJob, ctx: JobContext) -> Result<(), JobError> {
let misc_service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
tracing::trace!("Cleaning up user and metadata association");
service
misc_service
.cleanup_user_and_metadata_association()
.await
.unwrap();
tracing::trace!("Removing old user summaries and regenerating them");
service.regenerate_user_summaries().await.unwrap();
misc_service.regenerate_user_summaries().await.unwrap();
Ok(())
}

pub async fn yank_integrations_data(
_information: ScheduledJob,
service: Data<Arc<MiscellaneousService>>,
) -> Result<(), Error> {
ctx: JobContext,
) -> Result<(), JobError> {
let misc_service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
tracing::trace!("Getting data from yanked integrations for all users");
service.yank_integrations_data().await.unwrap();
misc_service.yank_integrations_data().await.unwrap();
Ok(())
}

Expand All @@ -112,8 +109,9 @@ impl Job for CoreApplicationJob {

pub async fn perform_core_application_job(
information: CoreApplicationJob,
misc_service: Data<Arc<MiscellaneousService>>,
) -> Result<(), Error> {
ctx: JobContext,
) -> Result<(), JobError> {
let misc_service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
let name = information.to_string();
tracing::trace!("Started job: {:#?}", name);
let start = Instant::now();
Expand Down Expand Up @@ -157,11 +155,12 @@ impl Job for ApplicationJob {

pub async fn perform_application_job(
information: ApplicationJob,
misc_service: Data<Arc<MiscellaneousService>>,
importer_service: Data<Arc<ImporterService>>,
exporter_service: Data<Arc<ExporterService>>,
exercise_service: Data<Arc<ExerciseService>>,
) -> Result<(), Error> {
ctx: JobContext,
) -> Result<(), JobError> {
let importer_service = ctx.data::<Arc<ImporterService>>().unwrap();
let exporter_service = ctx.data::<Arc<ExporterService>>().unwrap();
let misc_service = ctx.data::<Arc<MiscellaneousService>>().unwrap();
let exercise_service = ctx.data::<Arc<ExerciseService>>().unwrap();
let name = information.to_string();
tracing::trace!("Started job: {:#?}", name);
let start = Instant::now();
Expand Down
3 changes: 2 additions & 1 deletion apps/backend/src/importer/media_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ struct ItemDetails {
}

pub async fn import(input: DeployMediaTrackerImportInput) -> Result<ImportResult> {
let api_url = input.api_url.trim_end_matches('/');
let client: Client = Config::new()
.add_header(USER_AGENT, USER_AGENT_STR)
.unwrap()
.add_header("Access-Token", input.api_key)
.unwrap()
.set_base_url(Url::parse(&format!("{}/api/", input.api_url)).unwrap())
.set_base_url(Url::parse(&format!("{}/api/", api_url)).unwrap())
.try_into()
.unwrap();

Expand Down
17 changes: 7 additions & 10 deletions apps/backend/src/importer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,19 @@ impl ImporterService {
pub async fn deploy_import_job(
&self,
user_id: i32,
mut input: DeployImportJobInput,
input: DeployImportJobInput,
) -> Result<String> {
if let Some(s) = input.media_tracker.as_mut() {
s.api_url = s.api_url.trim_end_matches('/').to_owned()
}
let job = self
let job = ApplicationJob::ImportFromExternalSource(user_id, Box::new(input));
let task = self
.media_service
.perform_application_job
.clone()
.push(ApplicationJob::ImportFromExternalSource(
user_id,
Box::new(input),
))
.push(job)
.await
.unwrap();
Ok(job.to_string())
let job_id = task.to_string();
tracing::debug!("Deployed import job with id = {id}", id = job_id);
Ok(job_id)
}

pub async fn invalidate_import_jobs(&self) -> Result<()> {
Expand Down
90 changes: 42 additions & 48 deletions apps/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use anyhow::{bail, Result};
use apalis::{
cron::{CronStream, Schedule},
layers::{
limit::RateLimitLayer as ApalisRateLimitLayer, tracing::TraceLayer as ApalisTraceLayer,
Extension as ApalisExtension, RateLimitLayer as ApalisRateLimitLayer,
TraceLayer as ApalisTraceLayer,
},
prelude::{Job as ApalisJob, *},
prelude::{timer::TokioTimer as SleepTimer, Job as ApalisJob, *},
sqlite::SqliteStorage,
};
use aws_sdk_s3::config::Region;
Expand All @@ -34,7 +35,6 @@ use sea_orm_migration::MigratorTrait;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{pool::PoolOptions, SqlitePool};
use tokio::{join, net::TcpListener};
use tower::buffer::BufferLayer;
use tower_http::{
catch_panic::CatchPanicLayer as TowerCatchPanicLayer, cors::CorsLayer as TowerCorsLayer,
trace::TraceLayer as TowerTraceLayer,
Expand Down Expand Up @@ -256,77 +256,70 @@ async fn main() -> Result<()> {
let exercise_service_1 = app_services.exercise_service.clone();

let monitor = async {
Monitor::<TokioExecutor>::new()
Monitor::new()
// cron jobs
.register_with_count(1, {
WorkerBuilder::new("general_user_cleanup")
.register_with_count(1, move |c| {
WorkerBuilder::new(format!("general_user_cleanup-{c}"))
.stream(
CronStream::new_with_timezone(
CronStream::new(
Schedule::from_str(&format!("0 0 */{} ? * *", user_cleanup_every))
.unwrap(),
tz,
)
.into_stream(),
.timer(SleepTimer)
.to_stream_with_timezone(tz),
)
.layer(ApalisTraceLayer::new())
.data(media_service_1.clone())
.layer(ApalisExtension(media_service_1.clone()))
.build_fn(user_jobs)
})
.register_with_count(1, {
WorkerBuilder::new("general_media_cleanup_job")
.register_with_count(1, move |c| {
WorkerBuilder::new(format!("general_media_cleanup_job-{c}"))
.stream(
// every day
CronStream::new_with_timezone(
Schedule::from_str("0 0 0 * * *").unwrap(),
tz,
)
.into_stream(),
CronStream::new(Schedule::from_str("0 0 0 * * *").unwrap())
.timer(SleepTimer)
.to_stream_with_timezone(tz),
)
.layer(ApalisTraceLayer::new())
.data(importer_service_2.clone())
.data(media_service_2.clone())
.layer(ApalisExtension(importer_service_2.clone()))
.layer(ApalisExtension(media_service_2.clone()))
.build_fn(media_jobs)
})
.register_with_count(1, {
WorkerBuilder::new("yank_integrations_data")
.register_with_count(1, move |c| {
WorkerBuilder::new(format!("yank_integrations_data-{c}"))
.stream(
CronStream::new_with_timezone(
CronStream::new(
Schedule::from_str(&format!("0 0 */{} ? * *", pull_every)).unwrap(),
tz,
)
.into_stream(),
.timer(SleepTimer)
.to_stream_with_timezone(tz),
)
.layer(ApalisTraceLayer::new())
.data(media_service_3.clone())
.layer(ApalisExtension(media_service_3.clone()))
.build_fn(yank_integrations_data)
})
// application jobs
.register_with_count(1, {
WorkerBuilder::new("perform_core_application_job")
.register_with_count(1, move |c| {
WorkerBuilder::new(format!("perform_core_application_job-{c}"))
.layer(ApalisTraceLayer::new())
.data(media_service_5.clone())
.layer(ApalisExtension(media_service_5.clone()))
.with_storage(perform_core_application_job_storage.clone())
.build_fn(perform_core_application_job)
})
.register_with_count(
1,
WorkerBuilder::new("perform_application_job")
.data(importer_service_1.clone())
.data(exporter_service_1.clone())
.data(media_service_4.clone())
.data(exercise_service_1.clone())
.register_with_count(3, move |c| {
WorkerBuilder::new(format!("perform_application_job-{c}"))
.layer(ApalisTraceLayer::new())
.layer(ApalisRateLimitLayer::new(
rate_limit_count,
Duration::new(5, 0),
))
.layer(ApalisExtension(importer_service_1.clone()))
.layer(ApalisExtension(exporter_service_1.clone()))
.layer(ApalisExtension(media_service_4.clone()))
.layer(ApalisExtension(exercise_service_1.clone()))
.with_storage(perform_application_job_storage.clone())
// DEV: Had to do this fuckery because of https://github.com/geofmureithi/apalis/issues/297
.chain(|s| {
s.layer(BufferLayer::new(1024))
.layer(ApalisRateLimitLayer::new(
rate_limit_count,
Duration::new(5, 0),
))
.layer(ApalisTraceLayer::new())
})
.build_fn(perform_application_job),
)
.build_fn(perform_application_job)
})
.run()
.await
.unwrap();
Expand All @@ -350,8 +343,9 @@ async fn main() -> Result<()> {
async fn create_storage<T: ApalisJob + DeserializeOwned + Serialize>(
pool: SqlitePool,
) -> SqliteStorage<T> {
SqliteStorage::setup(&pool).await.unwrap();
SqliteStorage::new(pool)
let st = SqliteStorage::new(pool);
st.setup().await.unwrap();
st
}

fn init_tracing() -> Result<()> {
Expand Down
Loading