Skip to content

Commit

Permalink
Replay single tree with targeted seq replay (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Oct 8, 2024
1 parent 1afee45 commit 18411a9
Showing 1 changed file with 42 additions and 59 deletions.
101 changes: 42 additions & 59 deletions backfill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ pub async fn start_bubblegum_backfill(

#[derive(Debug, Parser, Clone)]
pub struct BubblegumReplayArgs {
/// Number of tree crawler workers
#[arg(long, env, default_value = "20")]
pub tree_crawler_count: usize,
/// The tree to replay.
#[arg(long, env)]
pub tree: String,

/// The list of trees to crawl. If not specified, all trees will be crawled.
/// The list of sequences to replay. If not specified, all sequences will be replayed.
#[arg(long, env, use_value_delimiter = true)]
pub trees: Vec<String>,
pub only_sequences: Option<Vec<i64>>,

#[clap(flatten)]
pub signature_worker: SignatureWorkerArgs,
Expand All @@ -102,72 +102,55 @@ pub async fn start_bubblegum_replay(
context: BubblegumBackfillContext,
args: BubblegumReplayArgs,
) -> Result<()> {
let pubkeys = args
.trees
.iter()
.map(|tree| Pubkey::from_str(tree).map(|pubkey| pubkey.to_bytes().to_vec()))
.collect::<Result<Vec<Vec<u8>>, _>>()?;

let mut crawl_handles = FuturesUnordered::new();

for pubkey in pubkeys {
if crawl_handles.len() >= args.tree_crawler_count {
crawl_handles.next().await;
}
let database_pool = context.database_pool.clone();

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool);
let pubkey = Pubkey::from_str(&args.tree)
.map(|pubkey| pubkey.to_bytes().to_vec())
.map_err(|e| anyhow::anyhow!("Invalid tree pubkey: {:?}", e))?;

let cl_audits = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(pubkey))
.order_by_asc(cl_audits_v2::Column::Seq)
.all(&conn)
.await?;
let database_pool = context.database_pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool);

let context = context.clone();
let metadata_json_download_worker_args = args.metadata_json_download_worker.clone();
let program_transformer_worker_args = args.program_transformer_worker.clone();
let signature_worker_args = args.signature_worker.clone();
let mut query = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(pubkey))
.order_by_asc(cl_audits_v2::Column::Seq);

let metadata_json_download_database_pool = context.database_pool.clone();
let handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> = tokio::spawn(async move {
let metadata_json_download_db_pool = metadata_json_download_database_pool.clone();
let program_transformer_context = context.clone();
let signature_context = context.clone();

let (metadata_json_download_worker, metadata_json_download_sender) =
metadata_json_download_worker_args.start(metadata_json_download_db_pool)?;
if let Some(sequences) = args.only_sequences {
query = query.filter(cl_audits_v2::Column::Seq.is_in(sequences));
}

let (program_transformer_worker, transaction_info_sender) =
program_transformer_worker_args
.start(program_transformer_context, metadata_json_download_sender)?;
let cl_audits = query.all(&conn).await?;

let (signature_worker, signature_sender) =
signature_worker_args.start(signature_context, transaction_info_sender)?;
let metadata_json_download_worker_args = args.metadata_json_download_worker.clone();
let program_transformer_worker_args = args.program_transformer_worker.clone();
let signature_worker_args = args.signature_worker.clone();

for audit in cl_audits {
let signature = Signature::try_from(audit.tx.as_ref())?;
if let Err(e) = signature_sender.send(signature).await {
error!("send signature: {:?}", e);
}
}
let metadata_json_download_db_pool = context.database_pool.clone();
let program_transformer_context = context.clone();
let signature_context = context.clone();

drop(signature_sender);
let (metadata_json_download_worker, metadata_json_download_sender) =
metadata_json_download_worker_args.start(metadata_json_download_db_pool)?;

futures::future::try_join3(
signature_worker,
program_transformer_worker,
metadata_json_download_worker,
)
.await?;
let (program_transformer_worker, transaction_info_sender) = program_transformer_worker_args
.start(program_transformer_context, metadata_json_download_sender)?;

Ok(())
});
let (signature_worker, signature_sender) =
signature_worker_args.start(signature_context, transaction_info_sender)?;

crawl_handles.push(handle);
for audit in cl_audits {
let signature = Signature::try_from(audit.tx.as_ref())?;
if let Err(e) = signature_sender.send(signature).await {
error!("send signature: {:?}", e);
}
}

futures::future::try_join_all(crawl_handles).await?;
drop(signature_sender);

futures::future::try_join3(
signature_worker,
program_transformer_worker,
metadata_json_download_worker,
)
.await?;

Ok(())
}

0 comments on commit 18411a9

Please sign in to comment.