diff --git a/tools/tree-status/src/main.rs b/tools/tree-status/src/main.rs index 8e4ad622e..76c41807b 100644 --- a/tools/tree-status/src/main.rs +++ b/tools/tree-status/src/main.rs @@ -1,3 +1,5 @@ +use std::thread; + use crossbeam::channel::{bounded, unbounded, Sender}; use digital_asset_types::dao::cl_audits; use log::{trace, warn}; @@ -64,7 +66,7 @@ use { io::{stdout, AsyncWrite, AsyncWriteExt}, sync::{mpsc, Mutex}, }, - txn_forwarder::{find_signatures, read_lines, rpc_tx_with_retries}, + txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries, rpc_tx_with_retries}, }; const RPC_GET_TXN_RETRIES: u8 = 5; @@ -339,7 +341,9 @@ async fn main() -> anyhow::Result<()> { if let Some(maybe_pubkey) = pubkeys.next().await { let pubkey: Pubkey = maybe_pubkey?; info!("fixing tree {pubkey}, hex: {}", hex::encode(pubkey)); - if let Err(error) = fix_tree(pubkey, client, conn, messenger_config).await { + if let Err(error) = + fix_tree(pubkey, client, conn, messenger_config, concurrency.into()).await + { error!("{:?}", error); } } @@ -406,6 +410,7 @@ async fn fix_tree( client: RpcClient, conn: DatabaseConnection, messenger_config: MessengerConfig, + concurrency: usize, ) -> anyhow::Result<()> { let onchain_seq: i64 = get_onchain_tree_seq(pubkey, &client) .await @@ -432,6 +437,7 @@ async fn fix_tree( client, conn, messenger_config, + concurrency, ) .await?; } else { @@ -449,73 +455,66 @@ async fn find_and_forward_txns_for_missing_seqs( client: RpcClient, conn: DatabaseConnection, messenger_config: MessengerConfig, + concurrency: usize, ) -> anyhow::Result<()> { - // Concurrency config - // TODO: Change to args - let batch_processing_concurrency = 10; - let get_txn_concurrency = 20; - - let (r_sender, r_recv) = unbounded(); - let (s_sender, s_recv) = unbounded(); + let (s_sender, s_receiver) = mpsc::channel(2_000); + let s_receiver = Arc::new(Mutex::new(s_receiver)); let client = Arc::new(client); let conn = Arc::new(conn); let messenger = init_redis_messenger(messenger_config).await?; - crossbeam::scope(|s| { - let runtime = Arc::new( - Builder::new_multi_thread() - .enable_all() - .worker_threads(4) - .build() - .unwrap(), - ); + let ranges = build_seq_ranges(seqs); + for range in ranges { + s_sender.send(range).await.unwrap(); + } - s.spawn(|_| { - let ranges = build_seq_ranges(seqs); - trace!("Processing seq ranges: {:?}", ranges); - for range in ranges { - r_sender.send(range).unwrap(); - } - drop(r_sender); - }); - - for _ in 0..batch_processing_concurrency { - let (s_sender, r_recv) = (s_sender.clone(), r_recv.clone()); - let client = client.clone(); - let conn = conn.clone(); - let runtime = runtime.clone(); - // Spawn workers in separate threads - s.spawn(move |_| { - for range in r_recv.iter() { - runtime - .block_on(find_signatures_for_missing_seq_range( - tree, range, &client, &conn, &s_sender, - )) - .unwrap(); - } - }); - } - drop(s_sender); - - for _ in 0..get_txn_concurrency { - let s_recv = s_recv.clone(); - let client = client.clone(); - let messenger = messenger.clone(); - let runtime = runtime.clone(); - s.spawn(move |_| { - for sig in s_recv.iter() { - trace!("Attempting to send signature to redis: {:?}", sig); - runtime - .block_on(send_txn(sig, &client, &messenger)) - .unwrap(); + drop(s_sender); + try_join_all((0..concurrency).map(|_| { + let s_receiver = Arc::clone(&s_receiver); + let client = Arc::clone(&client); + let conn = Arc::clone(&conn); + let messenger = Arc::clone(&messenger); + async move { + loop { + let maybe_range = { + let mut s_receiver = s_receiver.lock().await; // Acquire the lock + s_receiver.recv().await + }; + match maybe_range { + Some(range) => { + drop(s_receiver); + + let mut receiver = find_signatures_for_missing_seq_range( + tree, + range, + Arc::clone(&client), + Arc::clone(&conn), + ); + + while let Some(result) = receiver.recv().await { + match result { + Ok(signature) => { + send_txn(signature, &client, Arc::clone(&messenger)).await?; + } + Err(e) => { + // Handle the error... + } + } + } + + break; + } + None => break, } - }); + } + + Ok::<(), anyhow::Error>(()) } - }) - .unwrap(); + })) + .await?; - anyhow::Ok(()) + Ok(()) } async fn init_redis_messenger( @@ -532,34 +531,18 @@ async fn init_redis_messenger( async fn send_txn( signature: Signature, client: &RpcClient, - messenger: &Mutex>, + messenger: Arc>>, ) -> anyhow::Result<()> { - let txn: EncodedConfirmedTransactionWithStatusMeta = rpc_tx_with_retries( + rpc_send_with_retries( &client, RpcRequest::GetTransaction, serde_json::json!([signature.to_string(), RPC_TXN_CONFIG,]), RPC_GET_TXN_RETRIES, + Arc::clone(&messenger), signature, ) - .await?; - - // Ignore if tx failed or meta is missed - let meta = txn.transaction.meta.as_ref(); - if meta.map(|meta| meta.status.is_err()).unwrap_or(true) { - info!("Dropping failed transaction: {:?}", signature); - return Ok(()); - } - - let fbb = flatbuffers::FlatBufferBuilder::new(); - let fbb = seralize_encoded_transaction_with_status(fbb, txn) - .with_context(|| format!("failed to serialize transaction with {}", signature))?; - let bytes = fbb.finished_data(); - - let mut locked = messenger.lock().await; - locked.send(TRANSACTION_STREAM, bytes).await?; - drop(locked); - info!("Successfully pushed transaction to redis: {:?}", signature); - Ok(()) + .await + .map_err(|e| anyhow::anyhow!(e)) } fn build_seq_ranges(seqs: Vec) -> Vec<(i64, i64)> { @@ -604,67 +587,76 @@ fn build_seq_ranges(seqs: Vec) -> Vec<(i64, i64)> { // Add the following: // 1 – Keep searching until finding a successful transaction. // 2 – Parse txns and extract seq, keep searching until the seq is found (can use Helius for this). -async fn find_signatures_for_missing_seq_range( +fn find_signatures_for_missing_seq_range( tree: Pubkey, range: (i64, i64), - client: &RpcClient, - conn: &DatabaseConnection, - sender: &Sender, -) -> anyhow::Result<()> { + client: Arc, + conn: Arc, +) -> mpsc::Receiver> { let (start, end) = range; - trace!("Filling gap for range: [{:?}, {:?}]", start, end); + let (sender, receiver) = mpsc::channel(2_000); + + tokio::spawn(async move { + info!("Filling gap for range: [{:?}, {:?}]", start, end); + + // Find the next indexed seq outside of the range. + let res = cl_audits::Entity::find() + .filter(cl_audits::Column::Tree.eq(tree.as_ref())) + .filter(cl_audits::Column::Seq.gt(end)) + .order_by_asc(cl_audits::Column::Seq) + .limit(1) + .all(&*conn) + .await + .map_err(|e| anyhow::anyhow!(e))?; + let next_txn = res + .first() + .ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?; + trace!( + "Next txn for missing seq range [{:?}, {:?}]: {:?}", + start, + end, + next_txn.tx, + ); - // Find the next indexed seq outside of the range. - let res = cl_audits::Entity::find() - .filter(cl_audits::Column::Tree.eq(tree.as_ref())) - .filter(cl_audits::Column::Seq.gt(end)) - .order_by_asc(cl_audits::Column::Seq) - .limit(1) - .all(conn) - .await?; - let next_txn = res - .first() - .ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?; - trace!( - "Next txn for missing seq range [{:?}, {:?}]: {:?}", - next_txn.tx, - start, - end - ); + // Extra signatures to send as a safety buffer. + let hacky_buffer = 3; + + // Fetch signatures for all txns in the range before "next". + let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize; + let mut before = Signature::from_str(&next_txn.tx).ok(); + loop { + let limit = cmp::min(1000, sigs_remaining); + let config = GetConfirmedSignaturesForAddress2Config { + before: before, + limit: Some(limit), + ..Default::default() + }; + let sigs = client + .get_signatures_for_address_with_config(&tree, config) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + for (i, sig) in sigs.iter().enumerate() { + let o = Signature::from_str(&sig.signature).map_err(|e| anyhow::anyhow!(e))?; + sender + .send(Ok(o)) + .await + .map_err(|_| anyhow::anyhow!("Failed to send signature"))?; + if i == sigs.len() { + before = Some(o); + } + } - // Extra signatures to send as a safety buffer. - // E.g. if Seq 5 is missing and txn D produced Seq 6, we crawl back N signatures and send them all. - // This is a hacky workaround for failed signatures or non-bubblegum txns. - let hacky_buffer = 3; - - // Fetch signatures for all txns in the range before "next". - // TODO: Scan and parse the transactions using blockbuster and skip failed txns. - let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize; - let mut before = Signature::from_str(&next_txn.tx).ok(); - loop { - let limit = cmp::min(1000, sigs_remaining); - let config = GetConfirmedSignaturesForAddress2Config { - before: before, - limit: Some(limit), - ..Default::default() - }; - let sigs = client - .get_signatures_for_address_with_config(&tree, config) - .await?; - for (i, sig) in sigs.clone().iter().enumerate() { - let o = Signature::from_str(&sig.signature)?; - sender.send(o)?; - if i == sigs.len() { - before = Some(o); + sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0); + if sigs_remaining == 0 || sigs.len() == 0 { + break; } } - sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0); - if sigs_remaining == 0 || sigs.len() == 0 { - break; - } - } - return anyhow::Ok(()); + Ok::<(), anyhow::Error>(()) + }); + + receiver } async fn get_onchain_tree_seq(address: Pubkey, client: &RpcClient) -> anyhow::Result { diff --git a/tools/txn_forwarder/src/lib.rs b/tools/txn_forwarder/src/lib.rs index 68b2fe183..0774b345d 100644 --- a/tools/txn_forwarder/src/lib.rs +++ b/tools/txn_forwarder/src/lib.rs @@ -204,6 +204,7 @@ async fn send( // Ignore if tx failed or meta is missed let meta = tx.transaction.meta.as_ref(); if meta.map(|meta| meta.status.is_err()).unwrap_or(true) { + info!("Dropping failed transaction: {:?}", signature); return Ok(()); }