diff --git a/backfill/src/lib.rs b/backfill/src/lib.rs index 39a478170..dac3aba44 100644 --- a/backfill/src/lib.rs +++ b/backfill/src/lib.rs @@ -80,13 +80,13 @@ pub async fn start_bubblegum_backfill( #[derive(Debug, Parser, Clone)] pub struct BubblegumReplayArgs { - /// Number of tree crawler workers - #[arg(long, env, default_value = "20")] - pub tree_crawler_count: usize, + /// The tree to replay. + #[arg(long, env)] + pub tree: String, - /// The list of trees to crawl. If not specified, all trees will be crawled. + /// The list of sequences to replay. If not specified, all sequences will be replayed. #[arg(long, env, use_value_delimiter = true)] - pub trees: Vec, + pub only_sequences: Option>, #[clap(flatten)] pub signature_worker: SignatureWorkerArgs, @@ -102,72 +102,55 @@ pub async fn start_bubblegum_replay( context: BubblegumBackfillContext, args: BubblegumReplayArgs, ) -> Result<()> { - let pubkeys = args - .trees - .iter() - .map(|tree| Pubkey::from_str(tree).map(|pubkey| pubkey.to_bytes().to_vec())) - .collect::>, _>>()?; - - let mut crawl_handles = FuturesUnordered::new(); - - for pubkey in pubkeys { - if crawl_handles.len() >= args.tree_crawler_count { - crawl_handles.next().await; - } - let database_pool = context.database_pool.clone(); - - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool); + let pubkey = Pubkey::from_str(&args.tree) + .map(|pubkey| pubkey.to_bytes().to_vec()) + .map_err(|e| anyhow::anyhow!("Invalid tree pubkey: {:?}", e))?; - let cl_audits = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(pubkey)) - .order_by_asc(cl_audits_v2::Column::Seq) - .all(&conn) - .await?; + let database_pool = context.database_pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool); - let context = context.clone(); - let metadata_json_download_worker_args = args.metadata_json_download_worker.clone(); - let program_transformer_worker_args = args.program_transformer_worker.clone(); - let signature_worker_args = args.signature_worker.clone(); + let mut query = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(pubkey)) + .order_by_asc(cl_audits_v2::Column::Seq); - let metadata_json_download_database_pool = context.database_pool.clone(); - let handle: tokio::task::JoinHandle> = tokio::spawn(async move { - let metadata_json_download_db_pool = metadata_json_download_database_pool.clone(); - let program_transformer_context = context.clone(); - let signature_context = context.clone(); - - let (metadata_json_download_worker, metadata_json_download_sender) = - metadata_json_download_worker_args.start(metadata_json_download_db_pool)?; + if let Some(sequences) = args.only_sequences { + query = query.filter(cl_audits_v2::Column::Seq.is_in(sequences)); + } - let (program_transformer_worker, transaction_info_sender) = - program_transformer_worker_args - .start(program_transformer_context, metadata_json_download_sender)?; + let cl_audits = query.all(&conn).await?; - let (signature_worker, signature_sender) = - signature_worker_args.start(signature_context, transaction_info_sender)?; + let metadata_json_download_worker_args = args.metadata_json_download_worker.clone(); + let program_transformer_worker_args = args.program_transformer_worker.clone(); + let signature_worker_args = args.signature_worker.clone(); - for audit in cl_audits { - let signature = Signature::try_from(audit.tx.as_ref())?; - if let Err(e) = signature_sender.send(signature).await { - error!("send signature: {:?}", e); - } - } + let metadata_json_download_db_pool = context.database_pool.clone(); + let program_transformer_context = context.clone(); + let signature_context = context.clone(); - drop(signature_sender); + let (metadata_json_download_worker, metadata_json_download_sender) = + metadata_json_download_worker_args.start(metadata_json_download_db_pool)?; - futures::future::try_join3( - signature_worker, - program_transformer_worker, - metadata_json_download_worker, - ) - .await?; + let (program_transformer_worker, transaction_info_sender) = program_transformer_worker_args + .start(program_transformer_context, metadata_json_download_sender)?; - Ok(()) - }); + let (signature_worker, signature_sender) = + signature_worker_args.start(signature_context, transaction_info_sender)?; - crawl_handles.push(handle); + for audit in cl_audits { + let signature = Signature::try_from(audit.tx.as_ref())?; + if let Err(e) = signature_sender.send(signature).await { + error!("send signature: {:?}", e); + } } - futures::future::try_join_all(crawl_handles).await?; + drop(signature_sender); + + futures::future::try_join3( + signature_worker, + program_transformer_worker, + metadata_json_download_worker, + ) + .await?; Ok(()) }