From 92bbaead119d8c4d17d739a02a0a679ab9352620 Mon Sep 17 00:00:00 2001 From: Nicolas Pennie Date: Mon, 26 Jun 2023 20:14:24 -0700 Subject: [PATCH] fix(tools): fix the fix-tree tool --- tools/tree-status/README.md | 2 +- tools/tree-status/src/main.rs | 95 +++++++++++++++++++++++------------ 2 files changed, 64 insertions(+), 33 deletions(-) diff --git a/tools/tree-status/README.md b/tools/tree-status/README.md index a5ab8215b..f70ab2eea 100644 --- a/tools/tree-status/README.md +++ b/tools/tree-status/README.md @@ -36,6 +36,6 @@ cargo run -- \ cargo run -- \ --rpc-url $RPC_URL \ --max-retries 10 \ - --concurrency 3 \ + --concurrency 25 \ fix-tree --pg-url $DB_URL --redis-url $REDIS_URL --tree $TREE ``` diff --git a/tools/tree-status/src/main.rs b/tools/tree-status/src/main.rs index 835e4165f..dc4776010 100644 --- a/tools/tree-status/src/main.rs +++ b/tools/tree-status/src/main.rs @@ -232,6 +232,12 @@ enum Action { redis_url: String, #[arg(short, long, help = "Tree pubkey")] tree: String, + #[arg( + short, + long, + help = "Concurrency for fetching signatures for sequence batches" + )] + get_sigs_concurrency: Option, }, } @@ -332,14 +338,28 @@ async fn main() -> anyhow::Result<()> { } } } - Action::FixTree { .. } => { + Action::FixTree { + get_sigs_concurrency, + pg_url: _, + redis_url: _, + tree: _, + } => { let client = RpcClient::new(args.rpc.clone()); let conn = args.get_pg_conn().await?; let messenger_config = args.get_messenger_config().await?; if let Some(maybe_pubkey) = pubkeys.next().await { let pubkey: Pubkey = maybe_pubkey?; info!("fixing tree {pubkey}, hex: {}", hex::encode(pubkey)); - if let Err(error) = fix_tree(pubkey, client, conn, messenger_config).await { + if let Err(error) = fix_tree( + pubkey, + client, + conn, + messenger_config, + Some(args.concurrency), + get_sigs_concurrency.to_owned(), + ) + .await + { error!("{:?}", error); } } @@ -406,6 +426,8 @@ async fn fix_tree( client: RpcClient, conn: DatabaseConnection, messenger_config: MessengerConfig, + get_txn_concurrency: Option, + get_sigs_concurrency: Option, ) -> anyhow::Result<()> { let onchain_seq: i64 = get_onchain_tree_seq(pubkey, &client) .await @@ -425,13 +447,15 @@ async fn fix_tree( indexed_seq.max_seq, indexed_seq.cnt_seq ); let missing_seqs = get_missing_seq(pubkey, onchain_seq, &conn).await?; - warn!("[{pubkey}] missing seq: {:?}", missing_seqs); + trace!("[{pubkey}] missing seq: {:?}", missing_seqs); find_and_forward_txns_for_missing_seqs( pubkey, missing_seqs, client, conn, messenger_config, + get_txn_concurrency, + get_sigs_concurrency, ) .await?; } else { @@ -449,11 +473,12 @@ async fn find_and_forward_txns_for_missing_seqs( client: RpcClient, conn: DatabaseConnection, messenger_config: MessengerConfig, + get_txn_concurrency: Option, + get_sigs_concurrency: Option, ) -> anyhow::Result<()> { // Concurrency config - // TODO: Change to args - let batch_processing_concurrency = 10; - let get_txn_concurrency = 20; + let get_txn_concurrency: usize = get_txn_concurrency.unwrap_or(20); + let get_sigs_concurrency: usize = get_sigs_concurrency.unwrap_or(3); let (r_sender, r_recv) = unbounded(); let (s_sender, s_recv) = unbounded(); @@ -473,14 +498,14 @@ async fn find_and_forward_txns_for_missing_seqs( s.spawn(|_| { let ranges = build_seq_ranges(seqs); - trace!("Processing seq ranges: {:?}", ranges); + info!("Processing seq ranges: {:?}", ranges); for range in ranges { r_sender.send(range).unwrap(); } drop(r_sender); }); - for _ in 0..batch_processing_concurrency { + for _ in 0..get_sigs_concurrency { let (s_sender, r_recv) = (s_sender.clone(), r_recv.clone()); let client = client.clone(); let conn = conn.clone(); @@ -488,6 +513,7 @@ async fn find_and_forward_txns_for_missing_seqs( // Spawn workers in separate threads s.spawn(move |_| { for range in r_recv.iter() { + info!("Processing seq range: {:?}", range); runtime .block_on(find_signatures_for_missing_seq_range( tree, range, &client, &conn, &s_sender, @@ -614,52 +640,57 @@ async fn find_signatures_for_missing_seq_range( let (start, end) = range; trace!("Filling gap for range: [{:?}, {:?}]", start, end); - // Find the next indexed seq outside of the range. - let res = cl_audits::Entity::find() + // Find the next indexed after the end of the range. + let before_txn = cl_audits::Entity::find() .filter(cl_audits::Column::Tree.eq(tree.as_ref())) - .filter(cl_audits::Column::Seq.gt(end)) + .filter(cl_audits::Column::Seq.gte(end)) .order_by_asc(cl_audits::Column::Seq) .limit(1) .all(conn) .await?; - let next_txn = res + let before_txn = before_txn .first() - .ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?; + .ok_or_else(|| anyhow::anyhow!("Failed to get before txn for seq: {:?}", end))?; + + // Find the indexed seq before the start of the range. + let until_txn = cl_audits::Entity::find() + .filter(cl_audits::Column::Tree.eq(tree.as_ref())) + .filter(cl_audits::Column::Seq.lte(start)) + .order_by_desc(cl_audits::Column::Seq) + .limit(1) + .all(conn) + .await?; + let until_txn = until_txn + .first() + .ok_or_else(|| anyhow::anyhow!("Failed to get until txn for seq: {:?}", start))?; + trace!( - "Next txn for missing seq range [{:?}, {:?}]: {:?}", - next_txn.tx, + "Txns for missing seq range [{:?}, {:?}]. Until (start): {:?}. Before (end): {:?}.", start, - end + end, + until_txn.tx, + before_txn.tx, ); - // Extra signatures to send as a safety buffer. - // E.g. if Seq 5 is missing and txn D produced Seq 6, we crawl back N signatures and send them all. - // This is a hacky workaround for failed signatures or non-bubblegum txns. - let hacky_buffer = 3; - - // Fetch signatures for all txns in the range before "next". - // TODO: Scan and parse the transactions using blockbuster and skip failed txns. - let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize; - let mut before = Signature::from_str(&next_txn.tx).ok(); + let mut before = Signature::from_str(&before_txn.tx).ok(); + let until = Signature::from_str(&until_txn.tx).ok(); + let limit: usize = 1000; loop { - let limit = cmp::min(1000, sigs_remaining); let config = GetConfirmedSignaturesForAddress2Config { before: before, + until: until, limit: Some(limit), ..Default::default() }; let sigs = client .get_signatures_for_address_with_config(&tree, config) .await?; - for (i, sig) in sigs.clone().iter().enumerate() { + for sig in sigs.clone() { let o = Signature::from_str(&sig.signature)?; sender.send(o)?; - if i == sigs.len() { - before = Some(o); - } + before = Some(o); } - sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0); - if sigs_remaining == 0 || sigs.len() == 0 { + if sigs.len() == 0 { break; } }