From 8722ae1e0325c3f1449cbc53fa3ef301386ec4e8 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 1 Jun 2023 02:34:46 -0400 Subject: [PATCH] tests tools: add concurrency for check-trees-leafs (#65) * concurrency for trees * fix concurrency for transactions --- tests/tree-status/src/main.rs | 124 ++++++++++++++++++++++----------- tests/txn_forwarder/src/lib.rs | 11 +-- 2 files changed, 89 insertions(+), 46 deletions(-) diff --git a/tests/tree-status/src/main.rs b/tests/tree-status/src/main.rs index 359aeeced..22b54b62a 100644 --- a/tests/tree-status/src/main.rs +++ b/tests/tree-status/src/main.rs @@ -39,7 +39,6 @@ use { cmp, collections::HashMap, env, - num::NonZeroUsize, pin::Pin, str::FromStr, sync::{ @@ -50,7 +49,8 @@ use { tokio::{ fs::OpenOptions, io::{stdout, AsyncWrite, AsyncWriteExt}, - sync::{mpsc, Mutex}, + sync::{mpsc, Mutex, Semaphore}, + task::JoinSet, time::Duration, }, txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries, save_metrics}, @@ -127,8 +127,12 @@ struct Args { rpc: String, /// Number of concurrent requests for fetching transactions. - #[arg(long, short, default_value_t = 25)] - concurrency: usize, + #[arg(long, default_value_t = 25)] + concurrency_tx: usize, + + /// Number of concurrent processed trees. + #[arg(long, default_value_t = 5)] + concurrency_tree: usize, /// Size of signatures queue #[arg(long, default_value_t = 25_000)] @@ -254,8 +258,13 @@ async fn main() -> anyhow::Result<()> { Duration::from_millis(args.prom_save_interval), ); - let concurrency = NonZeroUsize::new(args.concurrency) - .ok_or_else(|| anyhow::anyhow!("invalid concurrency: {}", args.concurrency))?; + let concurrency_tx = ( + args.concurrency_tx, + Arc::new(Semaphore::new(args.concurrency_tx)), + ); + let signatures_history_queue = args.signatures_history_queue; + let max_retries = args.max_retries; + let mut tasks_tree = JoinSet::new(); // Set up RPC interface let pubkeys_str = match &args.action { @@ -291,7 +300,6 @@ async fn main() -> anyhow::Result<()> { } } Action::CheckTreeLeafs { output, .. } | Action::CheckTreesLeafs { output, .. } => { - let conn = args.get_pg_conn().await?; let mut output: Option>> = if let Some(output) = output { Some(if output == "-" { Box::pin(stdout()) @@ -308,45 +316,79 @@ async fn main() -> anyhow::Result<()> { } else { None }; + let (tx, mut rx) = mpsc::unbounded_channel::(); + let tx = Arc::new(tx); while let Some(maybe_pubkey) = pubkeys.next().await { let pubkey = maybe_pubkey?; - info!("checking tree leafs {pubkey}, hex: {}", hex::encode(pubkey)); - if let Err(error) = check_tree_leafs( - pubkey, - &args.rpc, - args.signatures_history_queue, - concurrency, - args.max_retries, - &conn, - output.as_mut(), - ) - .await - { - error!("{:?}", error); + if tasks_tree.len() == args.concurrency_tree { + loop { + tokio::select! { + _ = tasks_tree.join_next() => { + break; + } + Some(text) = rx.recv() => { + if let Some(output) = output.as_mut() { + let _ = output.write(text.as_bytes()).await?; + } + } + } + } } + let rpc = args.rpc.clone(); + let concurrency_tx = concurrency_tx.clone(); + let conn = args.get_pg_conn().await?; + let tx = Arc::clone(&tx); + tasks_tree.spawn(async move { + info!("checking tree leafs {pubkey}, hex: {}", hex::encode(pubkey)); + if let Err(error) = check_tree_leafs( + pubkey, + &rpc, + signatures_history_queue, + concurrency_tx, + max_retries, + &conn, + tx, + ) + .await + { + error!("{:?}", error); + } + }); } + drop(tx); if let Some(mut output) = output { + while let Some(text) = rx.recv().await { + let _ = output.write(text.as_bytes()).await?; + } output.flush().await?; } } Action::ShowTree { .. } | Action::ShowTrees { .. } => { while let Some(maybe_pubkey) = pubkeys.next().await { let pubkey = maybe_pubkey?; - info!("showing tree {pubkey}, hex: {}", hex::encode(pubkey)); - if let Err(error) = read_tree( - pubkey, - &args.rpc, - args.signatures_history_queue, - concurrency, - args.max_retries, - ) - .await - { - error!("{:?}", error); + if tasks_tree.len() == args.concurrency_tree { + tasks_tree.join_next().await; } + let rpc = args.rpc.clone(); + let concurrency_tx = concurrency_tx.clone(); + tasks_tree.spawn(async move { + info!("showing tree {pubkey}, hex: {}", hex::encode(pubkey)); + if let Err(error) = read_tree( + pubkey, + &rpc, + signatures_history_queue, + concurrency_tx, + max_retries, + ) + .await + { + error!("{:?}", error); + } + }); } } } + while tasks_tree.join_next().await.is_some() {} metrics_jh.await } @@ -483,16 +525,16 @@ async fn check_tree_leafs( pubkey: Pubkey, client_url: &str, signatures_history_queue: usize, - concurrency: NonZeroUsize, + concurrency_tx: (usize, Arc), max_retries: u8, conn: &DatabaseConnection, - mut output: Option<&mut Pin>>, + tx: Arc>, ) -> anyhow::Result<()> { let (fetch_fut, mut leafs_rx) = read_tree_start( pubkey, client_url, signatures_history_queue, - concurrency, + concurrency_tx, max_retries, ); try_join(fetch_fut, async move { @@ -564,9 +606,7 @@ GROUP BY for (leaf_idx, (signature, seq)) in leafs.into_iter() { error!("leaf index {leaf_idx}: not found in db, seq {seq} tx={signature:?}"); - if let Some(output) = output.as_mut() { - let _ = output.write(format!("{signature}\n").as_bytes()).await?; - } + let _ = tx.send(format!("{signature}\n")); TREE_STATUS_MISSED_LEAVES .with_label_values(&[&pubkey.to_string()]) .inc(); @@ -583,7 +623,7 @@ async fn read_tree( pubkey: Pubkey, client_url: &str, signatures_history_queue: usize, - concurrency: NonZeroUsize, + concurrency_tx: (usize, Arc), max_retries: u8, ) -> anyhow::Result<()> { fn print_seqs(id: usize, sig: Signature, seqs: Vec<(u64, MaybeLeafNode)>) { @@ -597,7 +637,7 @@ async fn read_tree( pubkey, client_url, signatures_history_queue, - concurrency, + concurrency_tx, max_retries, ); try_join(fetch_fut, async move { @@ -630,7 +670,7 @@ fn read_tree_start( pubkey: Pubkey, client_url: &str, signatures_history_queue: usize, - concurrency: NonZeroUsize, + (concurrency_tx_max, concurrency_tx): (usize, Arc), max_retries: u8, ) -> ( BoxFuture<'static, anyhow::Result<()>>, @@ -646,13 +686,15 @@ fn read_tree_start( let (tx, rx) = mpsc::unbounded_channel(); let tx = Arc::new(tx); - let fetch_futs = (0..concurrency.get()) + let fetch_futs = (0..concurrency_tx_max) .map(|_| { let sig_id = Arc::clone(&sig_id); let rx_sig = Arc::clone(&rx_sig); let client = RpcClient::new(client_url.to_owned()); + let concurrency_tx = Arc::clone(&concurrency_tx); let tx = Arc::clone(&tx); async move { + let _permit = concurrency_tx.acquire_owned().await.unwrap(); loop { let mut lock = rx_sig.lock().await; let maybe_msg = lock.recv().await; diff --git a/tests/txn_forwarder/src/lib.rs b/tests/txn_forwarder/src/lib.rs index 8d330ca33..c2bcfcee5 100644 --- a/tests/txn_forwarder/src/lib.rs +++ b/tests/txn_forwarder/src/lib.rs @@ -16,11 +16,11 @@ use { pubkey::Pubkey, signature::{ParseSignatureError, Signature}, }, - std::{fmt, io::Result as IoResult, str::FromStr}, + std::{fmt, io::Result as IoResult, str::FromStr, sync::Arc}, tokio::{ fs::{self, File}, io::{stdin, AsyncBufReadExt, BufReader}, - sync::{mpsc, oneshot}, + sync::{mpsc, Notify}, time::{interval, sleep, Duration}, }, tokio_stream::wrappers::LinesStream, @@ -147,14 +147,15 @@ pub fn save_metrics( period: Duration, ) -> BoxFuture<'static, anyhow::Result<()>> { if let Some(path) = path { - let (tx, mut rx) = oneshot::channel(); + let notify_loop = Arc::new(Notify::new()); + let notify_shutdown = Arc::clone(¬ify_loop); let jh = tokio::spawn(async move { let mut interval = interval(period); let mut alive = true; while alive { tokio::select! { _ = interval.tick() => {}, - _ = &mut rx => { + _ = notify_loop.notified() => { alive = false; } }; @@ -169,7 +170,7 @@ pub fn save_metrics( Ok::<(), anyhow::Error>(()) }); async move { - let _ = tx.send(()); + notify_shutdown.notify_one(); jh.await? } .boxed()