diff --git a/nft_ingester/Cargo.lock b/nft_ingester/Cargo.lock index 15d58d4ee..d73a77b33 100644 --- a/nft_ingester/Cargo.lock +++ b/nft_ingester/Cargo.lock @@ -285,6 +285,55 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e579a7752471abc2a8268df8b20005e3eadd975f585398f17efcfd8d4927371" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is-terminal", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d" + +[[package]] +name = "anstyle-parse" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e765fd216e48e067936442276d1d57399e37bce53c264d6fefbe298080cb57ee" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys 0.48.0", +] + +[[package]] +name = "anstyle-wincon" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcd8291a340dd8ac70e18878bc4501dd7b4ff970cfa21c207d36ece51ea88fd" +dependencies = [ + "anstyle", + "windows-sys 0.48.0", +] + [[package]] name = "anyhow" version = "1.0.69" @@ -903,7 +952,7 @@ checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", "bitflags", - "clap_lex", + "clap_lex 0.2.4", "indexmap", "once_cell", "strsim 0.10.0", @@ -911,6 +960,43 @@ dependencies = [ "textwrap 0.16.0", ] +[[package]] +name = "clap" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b802d85aaf3a1cdb02b224ba472ebdea62014fccfcb269b95a4d76443b5ee5a" +dependencies = [ + "clap_builder", + "clap_derive", + "once_cell", +] + +[[package]] +name = "clap_builder" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14a1a858f532119338887a4b8e1af9c60de8249cd7bafd68036a489e261e37b6" +dependencies = [ + "anstream", + "anstyle", + "bitflags", + "clap_lex 0.4.1", + "once_cell", + "strsim 0.10.0", +] + +[[package]] +name = "clap_derive" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9644cd56d6b87dbe899ef8b053e331c0637664e9e21a33dfcdc36093f5c5c4" +dependencies = [ + "heck 0.4.1", + "proc-macro2 1.0.52", + "quote 1.0.26", + "syn 2.0.12", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -920,6 +1006,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clap_lex" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a2dd5a6fe8c6e3502f568a6353e5273bbb15193ad9a89e457b9970798efbea1" + [[package]] name = "codespan-reporting" version = "0.11.1" @@ -930,6 +1022,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "combine" version = "4.6.6" @@ -1589,6 +1687,8 @@ dependencies = [ "atomic", "pear", "serde", + "serde_yaml 0.9.21", + "toml", "uncased", "version_check", ] @@ -2627,6 +2727,7 @@ dependencies = [ "cadence", "cadence-macros", "chrono", + "clap 4.2.2", "digital_asset_types", "env_logger 0.10.0", "figment", @@ -4182,6 +4283,19 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "serde_yaml" +version = "0.9.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9d684e3ec7de3bf5466b32bd75303ac16f0736426e5a4e0d6e489559ce1249c" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -4454,7 +4568,7 @@ dependencies = [ "lazy_static", "serde", "serde_derive", - "serde_yaml", + "serde_yaml 0.8.26", "solana-clap-utils", "solana-sdk", "url", @@ -5297,6 +5411,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927" +dependencies = [ + "proc-macro2 1.0.52", + "quote 1.0.26", + "unicode-ident", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -5797,6 +5922,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsafe-libyaml" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1865806a559042e51ab5414598446a5871b561d21b6764f2eabb0dd481d880a6" + [[package]] name = "untrusted" version = "0.7.1" @@ -5830,6 +5961,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.3.0" @@ -6050,13 +6187,13 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -6065,7 +6202,16 @@ version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" dependencies = [ - "windows-targets", + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", ] [[package]] @@ -6074,13 +6220,28 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] [[package]] @@ -6089,42 +6250,84 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + [[package]] name = "windows_i686_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + [[package]] name = "windows_i686_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + [[package]] name = "winnow" version = "0.3.5" diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 662043048..7b8e73df4 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -35,7 +35,7 @@ uuid = "1.0.0" async-trait = "0.1.53" num-traits = "0.2.15" blockbuster = { version = "0.7.3" } -figment = { version = "0.10.6", features = ["env"] } +figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } cadence = "0.29.0" cadence-macros = "0.29.0" solana-sdk = "~1.14" @@ -57,6 +57,7 @@ tracing-subscriber = { version = "0.3.16", features = [ "env-filter", "ansi", ] } +clap = { version = "4.2.2", features = ["derive", "cargo"] } [dependencies.num-integer] version = "0.1.44" diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 9b05d3e37..8dba535c2 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -1,13 +1,17 @@ -use std::fmt::{Display, Formatter}; +use std::{fmt::{Display, Formatter}, sync::Arc}; -use figment::{providers::Env, value::Value, Figment}; +use figment::{providers::{Env, Format, Yaml}, value::Value, Figment}; use plerkle_messenger::MessengerConfig; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use serde::Deserialize; use std::env; +use std::path::PathBuf; use tracing_subscriber::fmt; -use crate::error::IngesterError; +use crate::{ + error::IngesterError, + tasks::BackgroundTaskRunnerConfig, +}; #[derive(Deserialize, PartialEq, Debug, Clone)] pub struct IngesterConfig { @@ -23,6 +27,7 @@ pub struct IngesterConfig { pub account_stream_worker_count: Option, pub transaction_stream_worker_count: Option, pub code_version: Option<&'static str>, + pub background_task_runner_config: Option, } impl IngesterConfig { @@ -76,6 +81,7 @@ pub const RPC_URL_KEY: &str = "url"; pub const RPC_COMMITMENT_KEY: &str = "commitment"; pub const CODE_VERSION: &str = env!("CARGO_PKG_VERSION"); + #[derive(Deserialize, PartialEq, Eq, Debug, Clone)] pub enum IngesterRole { All, @@ -84,6 +90,12 @@ pub enum IngesterRole { Ingester, } +impl Default for IngesterRole { + fn default() -> Self { + IngesterRole::All + } +} + impl Display for IngesterRole { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -103,9 +115,16 @@ pub fn rand_string() -> String { .collect() } -pub fn setup_config() -> IngesterConfig { - let mut config: IngesterConfig = Figment::new() - .join(Env::prefixed("INGESTER_")) +pub fn setup_config(config_file: Option<&PathBuf>) -> IngesterConfig { + let mut figment = Figment::new() + .join(Env::prefixed("INGESTER_")); + + if let Some(config_file) = config_file { + figment = figment.join(Yaml::file(config_file)); + } + + let mut config: IngesterConfig = + figment .extract() .map_err(|config_error| IngesterError::ConfigurationError { msg: format!("{}", config_error), diff --git a/nft_ingester/src/database.rs b/nft_ingester/src/database.rs index 209480135..5b02a6c10 100644 --- a/nft_ingester/src/database.rs +++ b/nft_ingester/src/database.rs @@ -20,7 +20,7 @@ pub async fn setup_database(config: IngesterConfig) -> PgPool { let mut options: PgConnectOptions = url.parse().unwrap(); options.log_statements(log::LevelFilter::Trace); - options.log_slow_statements(log::LevelFilter::Info, std::time::Duration::from_millis(500)); + options.log_slow_statements(log::LevelFilter::Debug, std::time::Duration::from_millis(500)); let pool = PgPoolOptions::new() .min_connections(BARE_MINIMUM_CONNECTIONS) diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 80b132f70..eca96f5df 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -48,6 +48,8 @@ pub enum IngesterError { UnrecoverableTaskError, #[error("Cache Storage Write Error {0}")] CacheStorageWriteError(String), + #[error("HttpError {status_code}")] + HttpError { status_code: String }, } impl From for IngesterError { diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 76c81e934..18413f660 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -35,19 +35,47 @@ use tokio::{ task::{JoinSet}, }; +use std::{ + path::PathBuf, + time +}; +use clap::{arg, command, value_parser, ArgAction, Command}; + #[tokio::main(flavor = "multi_thread")] pub async fn main() -> Result<(), IngesterError> { init_logger(); info!("Starting nft_ingester"); + + let matches = command!() + .arg( + arg!( + -c --config "Sets a custom config file" + ) + // We don't have syntax yet for optional options, so manually calling `required` + .required(false) + .value_parser(value_parser!(PathBuf)), + ) + .get_matches(); + + let config_path = matches.get_one::("config"); + if let Some(config_path) = config_path { + println!("Loading config from: {}", config_path.display()); + } + // Setup Configuration and Metrics --------------------------------------------- + // Pull Env variables into config struct - let config = setup_config(); + let config = setup_config(config_path); + // Optionally setup metrics if config demands it setup_metrics(&config); + // One pool many clones, this thing is thread safe and send sync let database_pool = setup_database(config.clone()).await; + // The role determines the processes that get run. let role = config.clone().role.unwrap_or(IngesterRole::All); + info!("Starting Program with Role {}", role); // Tasks Setup ----------------------------------------------- // This joinset maages all the tasks that are spawned. @@ -56,7 +84,12 @@ pub async fn main() -> Result<(), IngesterError> { // BACKGROUND TASKS -------------------------------------------- //Setup definitions for background tasks - let bg_task_definitions: Vec> = vec![Box::new(DownloadMetadataTask {})]; + let task_runner_config = config.background_task_runner_config.clone().unwrap_or_default(); + let bg_task_definitions: Vec> = vec![Box::new(DownloadMetadataTask { + lock_duration: task_runner_config.lock_duration, + max_attempts: task_runner_config.max_attempts, + timeout: Some(time::Duration::from_secs(task_runner_config.timeout.unwrap_or(3))), + })]; let mut background_task_manager = TaskManager::new(rand_string(), database_pool.clone(), bg_task_definitions); @@ -121,7 +154,8 @@ pub async fn main() -> Result<(), IngesterError> { // Setup Stream Size Timers, these are small processes that run every 60 seconds and farm metrics for the size of the streams. // If metrics are disabled, these will not run. if role == IngesterRole::BackgroundTaskRunner || role == IngesterRole::All { - tasks.spawn(background_task_manager.start_runner()); + let background_runner_config = config.clone().background_task_runner_config;; + tasks.spawn(background_task_manager.start_runner(background_runner_config)); } // Backfiller Setup ------------------------------------------ if role == IngesterRole::Backfiller || role == IngesterRole::All { diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs index 1389c91f9..0a5087162 100644 --- a/nft_ingester/src/tasks/common/mod.rs +++ b/nft_ingester/src/tasks/common/mod.rs @@ -48,19 +48,27 @@ impl FromTaskData for DownloadMetadata { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DownloadMetadataTask {} +pub struct DownloadMetadataTask { + pub lock_duration: Option, + pub max_attempts: Option, + pub timeout: Option, +} impl DownloadMetadataTask { - async fn request_metadata(uri: String) -> Result { + async fn request_metadata(uri: String, timeout: Duration) -> Result { let client = ClientBuilder::new() - .timeout(Duration::from_secs(3)) + .timeout(timeout) .build()?; - let val: serde_json::Value = Client::get(&client, uri) // Need to check for malicious sites ? + let response = Client::get(&client, uri) // Need to check for malicious sites ? .send() - .await? - .json() .await?; - Ok(val) + + if response.status() != reqwest::StatusCode::OK { + Err(IngesterError::HttpError{ status_code: response.status().as_str().to_string() }) + } else { + let val: serde_json::Value = response.json().await?; + Ok(val) + } } } @@ -71,11 +79,11 @@ impl BgTask for DownloadMetadataTask { } fn lock_duration(&self) -> i64 { - 5 + self.lock_duration.unwrap_or(5) } fn max_attempts(&self) -> i16 { - 3 + self.max_attempts.unwrap_or(3) } async fn task( @@ -86,7 +94,9 @@ impl BgTask for DownloadMetadataTask { let download_metadata: DownloadMetadata = serde_json::from_value(data)?; let meta_url = Url::parse(&download_metadata.uri); let body = match meta_url { - Ok(_) => DownloadMetadataTask::request_metadata(download_metadata.uri).await?, + Ok(_) => DownloadMetadataTask::request_metadata( + download_metadata.uri, + self.timeout.unwrap_or(Duration::from_secs(3))).await?, _ => serde_json::Value::String("Invalid Uri".to_string()), //TODO -> enumize this. }; let model = asset_data::ActiveModel { diff --git a/nft_ingester/src/tasks/mod.rs b/nft_ingester/src/tasks/mod.rs index af25cf150..3ea951116 100644 --- a/nft_ingester/src/tasks/mod.rs +++ b/nft_ingester/src/tasks/mod.rs @@ -4,7 +4,7 @@ use cadence_macros::{is_global_default_set, statsd_count, statsd_histogram}; use chrono::{Duration, NaiveDateTime, Utc}; use crypto::{digest::Digest, sha2::Sha256}; use digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}; -use log::{debug, error, warn}; +use log::{debug, info, error, warn}; use sea_orm::{ entity::*, query::*, sea_query::Expr, ActiveValue::Set, ColumnTrait, DatabaseConnection, DeleteResult, SqlxPostgresConnector, @@ -16,6 +16,8 @@ use tokio::{ task::JoinHandle, time, }; +use serde::Deserialize; +use std::time::Duration as StdDuration; mod common; pub use common::*; @@ -32,9 +34,38 @@ pub trait BgTask: Send + Sync { ) -> Result<(), IngesterError>; } -const RETRY_INTERVAL: u64 = 1000; -const DELETE_INTERVAL: u64 = 30000; -const MAX_TASK_BATCH_SIZE: u64 = 100; +pub const RETRY_INTERVAL: u64 = 1000; +pub const DELETE_INTERVAL: u64 = 30000; +pub const MAX_TASK_BATCH_SIZE: u64 = 100; +pub const PURGE_TIME: u64 = 3600; + +/** + * Configuration for the background task runner, to be used in config file loading e.g. + */ +#[derive(Deserialize, PartialEq, Debug, Clone)] +pub struct BackgroundTaskRunnerConfig { + pub delete_interval: Option, + pub retry_interval: Option, + pub purge_time: Option, + pub batch_size: Option, + pub lock_duration: Option, + pub max_attempts: Option, + pub timeout: Option, +} + +impl Default for BackgroundTaskRunnerConfig { + fn default() -> Self { + BackgroundTaskRunnerConfig { + delete_interval: Some(DELETE_INTERVAL), + retry_interval: Some(RETRY_INTERVAL), + purge_time: Some(PURGE_TIME), + batch_size: Some(MAX_TASK_BATCH_SIZE), + lock_duration: Some(5), + max_attempts: Some(3), + timeout: Some(3), + } + } +} pub struct TaskData { pub name: &'static str, @@ -123,18 +154,29 @@ impl TaskManager { task.errors = Set(Some(e.to_string())); task.locked_until = Set(None); - if e == IngesterError::BatchInitNetworkingError { - // Network errors are common for off-chain JSONs. - // Logging these as errors is far too noisy. - metric! { - statsd_count!("ingester.bgtask.network_error", 1, "type" => task_name); - } - warn!("Task failed due to network error: {}", e); - } else { - metric! { - statsd_count!("ingester.bgtask.error", 1, "type" => task_name); + match e { + IngesterError::BatchInitNetworkingError => { + // Network errors are common for off-chain JSONs. + // Logging these as errors is far too noisy. + metric! { + statsd_count!("ingester.bgtask.network_error", 1, "type" => task_name); + } + warn!("Task failed due to network error: {}", e); + }, + IngesterError::HttpError { ref status_code } => { + metric! { + statsd_count!("ingester.bgtask.http_error", 1, + "status" => &status_code, + "type" => task_name); + } + warn!("Task failed due to HTTP error: {}", e); + }, + _ => { + metric! { + statsd_count!("ingester.bgtask.error", 1, "type" => task_name); + } + error!("Task Run Error: {}", e); } - error!("Task Run Error: {}", e); } } } @@ -143,6 +185,7 @@ impl TaskManager { pub async fn get_pending_tasks( conn: &DatabaseConnection, + batch_size: u64, ) -> Result, IngesterError> { tasks::Entity::find() .filter( @@ -159,7 +202,7 @@ impl TaskManager { ), ) .order_by_desc(tasks::Column::CreatedAt) - .limit(MAX_TASK_BATCH_SIZE) + .limit(batch_size) .all(conn) .await .map_err(|e| e.into()) @@ -233,8 +276,9 @@ impl TaskManager { }) } - pub async fn purge_old_tasks(conn: &DatabaseConnection) -> Result { - let cod = Expr::cust("NOW() - created_at::timestamp > interval '60 minute'"); //TOdo parametrize + pub async fn purge_old_tasks(conn: &DatabaseConnection, task_max_age: time::Duration) -> Result { + let interval = format!("NOW() - created_at::timestamp > interval '{} seconds'", task_max_age.as_secs()); + let cod = Expr::cust(&interval); tasks::Entity::delete_many() .filter(Condition::all().add(cod)) .exec(conn) @@ -281,6 +325,9 @@ impl TaskManager { } continue; } + metric! { + statsd_count!("ingester.bgtask.new", 1, "type" => &task.name); + } TaskManager::new_task_handler( pool.clone(), instance_name.clone(), @@ -294,33 +341,64 @@ impl TaskManager { }) } - pub fn start_runner(&self) -> JoinHandle<()> { + pub fn start_runner(&self, config: Option) -> JoinHandle<()> { let task_map = self.registered_task_types.clone(); - let pool = self.pool.clone(); let instance_name = self.instance_name.clone(); + + // Load the config values + // For backwards compatibility reasons, the logic is a bit convoluted. + let config = config.unwrap_or_default(); + + let delete_interval = tokio::time::Duration::from_millis( + config.delete_interval.unwrap_or( + BackgroundTaskRunnerConfig::default().delete_interval.unwrap() + )); + + let retry_interval = tokio::time::Duration::from_millis( + config.retry_interval.unwrap_or( + BackgroundTaskRunnerConfig::default().retry_interval.unwrap())); + + let purge_time = tokio::time::Duration::from_secs( + config.purge_time.unwrap_or( + BackgroundTaskRunnerConfig::default().purge_time.unwrap())); + + let batch_size = config.batch_size.unwrap_or( + BackgroundTaskRunnerConfig::default().batch_size.unwrap()); + + // Loop to purge tasks + let pool = self.pool.clone(); + let task_name = instance_name.clone(); tokio::spawn(async move { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); - let mut interval = time::interval(tokio::time::Duration::from_millis(DELETE_INTERVAL)); + let mut interval = time::interval(delete_interval); loop { interval.tick().await; // ticks immediately - let delete_res = TaskManager::purge_old_tasks(&conn).await; + let delete_res = TaskManager::purge_old_tasks(&conn, purge_time).await; match delete_res { Ok(res) => { - debug!("deleted {} tasks entries", res.rows_affected); + info!("deleted {} tasks entries", res.rows_affected); + metric! { + statsd_count!("ingester.bgtask.purged_tasks", i64::try_from(res.rows_affected).unwrap_or(1)); + } } Err(e) => { + metric! { + statsd_count!("ingester.bgtask.purge_error", 1); + } error!("error deleting tasks: {}", e); } }; } }); + + // Loop to check for tasks that need to be executed and execute them let pool = self.pool.clone(); tokio::spawn(async move { - let mut interval = time::interval(tokio::time::Duration::from_millis(RETRY_INTERVAL)); + let mut interval = time::interval(retry_interval); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); loop { interval.tick().await; // ticks immediately - let tasks_res = TaskManager::get_pending_tasks(&conn).await; + let tasks_res = TaskManager::get_pending_tasks(&conn, batch_size).await; match tasks_res { Ok(tasks) => { debug!("tasks that need to be executed: {}", tasks.len());