diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 8c488a6b3..49f6c6ee5 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -28,7 +28,9 @@ pub struct IngesterConfig { pub role: Option, pub max_postgres_connections: Option, pub account_stream_worker_count: Option, + pub account_backfill_stream_worker_count: Option, pub transaction_stream_worker_count: Option, + pub transaction_backfill_stream_worker_count: Option, pub code_version: Option<&'static str>, pub background_task_runner_config: Option, pub cl_audits: Option, // save transaction logs for compressed nfts @@ -68,9 +70,19 @@ impl IngesterConfig { self.account_stream_worker_count.unwrap_or(2) } + pub fn get_account_backfill_stream_worker_count(&self) -> u32 { + self.account_backfill_stream_worker_count + .unwrap_or_else(|| self.get_account_stream_worker_count()) + } + pub fn get_transaction_stream_worker_count(&self) -> u32 { self.transaction_stream_worker_count.unwrap_or(2) } + + pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 { + self.transaction_backfill_stream_worker_count + .unwrap_or_else(|| self.get_transaction_stream_worker_count()) + } } // Types and constants used for Figment configuration items. diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index a09e7e416..3d08cafb1 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -149,7 +149,8 @@ pub async fn main() -> Result<(), IngesterError> { }, ACCOUNT_STREAM, ); - + } + for i in 0..config.get_account_backfill_stream_worker_count() { let _account_backfill = account_worker::( database_pool.clone(), config.get_messneger_client_config(), @@ -177,7 +178,8 @@ pub async fn main() -> Result<(), IngesterError> { config.cl_audits.unwrap_or(false), TRANSACTION_STREAM, ); - + } + for i in 0..config.get_transaction_backfill_stream_worker_count() { let _txn_backfill = transaction_worker::( database_pool.clone(), config.get_messneger_client_config(),