Skip to content

Commit

Permalink
tests tools: add concurrency for check-trees-leafs (#65)
Browse files Browse the repository at this point in the history
* concurrency for trees

* fix concurrency for transactions
  • Loading branch information
fanatid authored Jun 1, 2023
1 parent 454a307 commit 8722ae1
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 46 deletions.
124 changes: 83 additions & 41 deletions tests/tree-status/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use {
cmp,
collections::HashMap,
env,
num::NonZeroUsize,
pin::Pin,
str::FromStr,
sync::{
Expand All @@ -50,7 +49,8 @@ use {
tokio::{
fs::OpenOptions,
io::{stdout, AsyncWrite, AsyncWriteExt},
sync::{mpsc, Mutex},
sync::{mpsc, Mutex, Semaphore},
task::JoinSet,
time::Duration,
},
txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries, save_metrics},
Expand Down Expand Up @@ -127,8 +127,12 @@ struct Args {
rpc: String,

/// Number of concurrent requests for fetching transactions.
#[arg(long, short, default_value_t = 25)]
concurrency: usize,
#[arg(long, default_value_t = 25)]
concurrency_tx: usize,

/// Number of concurrent processed trees.
#[arg(long, default_value_t = 5)]
concurrency_tree: usize,

/// Size of signatures queue
#[arg(long, default_value_t = 25_000)]
Expand Down Expand Up @@ -254,8 +258,13 @@ async fn main() -> anyhow::Result<()> {
Duration::from_millis(args.prom_save_interval),
);

let concurrency = NonZeroUsize::new(args.concurrency)
.ok_or_else(|| anyhow::anyhow!("invalid concurrency: {}", args.concurrency))?;
let concurrency_tx = (
args.concurrency_tx,
Arc::new(Semaphore::new(args.concurrency_tx)),
);
let signatures_history_queue = args.signatures_history_queue;
let max_retries = args.max_retries;
let mut tasks_tree = JoinSet::new();

// Set up RPC interface
let pubkeys_str = match &args.action {
Expand Down Expand Up @@ -291,7 +300,6 @@ async fn main() -> anyhow::Result<()> {
}
}
Action::CheckTreeLeafs { output, .. } | Action::CheckTreesLeafs { output, .. } => {
let conn = args.get_pg_conn().await?;
let mut output: Option<Pin<Box<dyn AsyncWrite>>> = if let Some(output) = output {
Some(if output == "-" {
Box::pin(stdout())
Expand All @@ -308,45 +316,79 @@ async fn main() -> anyhow::Result<()> {
} else {
None
};
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let tx = Arc::new(tx);
while let Some(maybe_pubkey) = pubkeys.next().await {
let pubkey = maybe_pubkey?;
info!("checking tree leafs {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = check_tree_leafs(
pubkey,
&args.rpc,
args.signatures_history_queue,
concurrency,
args.max_retries,
&conn,
output.as_mut(),
)
.await
{
error!("{:?}", error);
if tasks_tree.len() == args.concurrency_tree {
loop {
tokio::select! {
_ = tasks_tree.join_next() => {
break;
}
Some(text) = rx.recv() => {
if let Some(output) = output.as_mut() {
let _ = output.write(text.as_bytes()).await?;
}
}
}
}
}
let rpc = args.rpc.clone();
let concurrency_tx = concurrency_tx.clone();
let conn = args.get_pg_conn().await?;
let tx = Arc::clone(&tx);
tasks_tree.spawn(async move {
info!("checking tree leafs {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = check_tree_leafs(
pubkey,
&rpc,
signatures_history_queue,
concurrency_tx,
max_retries,
&conn,
tx,
)
.await
{
error!("{:?}", error);
}
});
}
drop(tx);
if let Some(mut output) = output {
while let Some(text) = rx.recv().await {
let _ = output.write(text.as_bytes()).await?;
}
output.flush().await?;
}
}
Action::ShowTree { .. } | Action::ShowTrees { .. } => {
while let Some(maybe_pubkey) = pubkeys.next().await {
let pubkey = maybe_pubkey?;
info!("showing tree {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = read_tree(
pubkey,
&args.rpc,
args.signatures_history_queue,
concurrency,
args.max_retries,
)
.await
{
error!("{:?}", error);
if tasks_tree.len() == args.concurrency_tree {
tasks_tree.join_next().await;
}
let rpc = args.rpc.clone();
let concurrency_tx = concurrency_tx.clone();
tasks_tree.spawn(async move {
info!("showing tree {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = read_tree(
pubkey,
&rpc,
signatures_history_queue,
concurrency_tx,
max_retries,
)
.await
{
error!("{:?}", error);
}
});
}
}
}
while tasks_tree.join_next().await.is_some() {}

metrics_jh.await
}
Expand Down Expand Up @@ -483,16 +525,16 @@ async fn check_tree_leafs(
pubkey: Pubkey,
client_url: &str,
signatures_history_queue: usize,
concurrency: NonZeroUsize,
concurrency_tx: (usize, Arc<Semaphore>),
max_retries: u8,
conn: &DatabaseConnection,
mut output: Option<&mut Pin<Box<dyn AsyncWrite>>>,
tx: Arc<mpsc::UnboundedSender<String>>,
) -> anyhow::Result<()> {
let (fetch_fut, mut leafs_rx) = read_tree_start(
pubkey,
client_url,
signatures_history_queue,
concurrency,
concurrency_tx,
max_retries,
);
try_join(fetch_fut, async move {
Expand Down Expand Up @@ -564,9 +606,7 @@ GROUP BY

for (leaf_idx, (signature, seq)) in leafs.into_iter() {
error!("leaf index {leaf_idx}: not found in db, seq {seq} tx={signature:?}");
if let Some(output) = output.as_mut() {
let _ = output.write(format!("{signature}\n").as_bytes()).await?;
}
let _ = tx.send(format!("{signature}\n"));
TREE_STATUS_MISSED_LEAVES
.with_label_values(&[&pubkey.to_string()])
.inc();
Expand All @@ -583,7 +623,7 @@ async fn read_tree(
pubkey: Pubkey,
client_url: &str,
signatures_history_queue: usize,
concurrency: NonZeroUsize,
concurrency_tx: (usize, Arc<Semaphore>),
max_retries: u8,
) -> anyhow::Result<()> {
fn print_seqs(id: usize, sig: Signature, seqs: Vec<(u64, MaybeLeafNode)>) {
Expand All @@ -597,7 +637,7 @@ async fn read_tree(
pubkey,
client_url,
signatures_history_queue,
concurrency,
concurrency_tx,
max_retries,
);
try_join(fetch_fut, async move {
Expand Down Expand Up @@ -630,7 +670,7 @@ fn read_tree_start(
pubkey: Pubkey,
client_url: &str,
signatures_history_queue: usize,
concurrency: NonZeroUsize,
(concurrency_tx_max, concurrency_tx): (usize, Arc<Semaphore>),
max_retries: u8,
) -> (
BoxFuture<'static, anyhow::Result<()>>,
Expand All @@ -646,13 +686,15 @@ fn read_tree_start(
let (tx, rx) = mpsc::unbounded_channel();
let tx = Arc::new(tx);

let fetch_futs = (0..concurrency.get())
let fetch_futs = (0..concurrency_tx_max)
.map(|_| {
let sig_id = Arc::clone(&sig_id);
let rx_sig = Arc::clone(&rx_sig);
let client = RpcClient::new(client_url.to_owned());
let concurrency_tx = Arc::clone(&concurrency_tx);
let tx = Arc::clone(&tx);
async move {
let _permit = concurrency_tx.acquire_owned().await.unwrap();
loop {
let mut lock = rx_sig.lock().await;
let maybe_msg = lock.recv().await;
Expand Down
11 changes: 6 additions & 5 deletions tests/txn_forwarder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use {
pubkey::Pubkey,
signature::{ParseSignatureError, Signature},
},
std::{fmt, io::Result as IoResult, str::FromStr},
std::{fmt, io::Result as IoResult, str::FromStr, sync::Arc},
tokio::{
fs::{self, File},
io::{stdin, AsyncBufReadExt, BufReader},
sync::{mpsc, oneshot},
sync::{mpsc, Notify},
time::{interval, sleep, Duration},
},
tokio_stream::wrappers::LinesStream,
Expand Down Expand Up @@ -147,14 +147,15 @@ pub fn save_metrics(
period: Duration,
) -> BoxFuture<'static, anyhow::Result<()>> {
if let Some(path) = path {
let (tx, mut rx) = oneshot::channel();
let notify_loop = Arc::new(Notify::new());
let notify_shutdown = Arc::clone(&notify_loop);
let jh = tokio::spawn(async move {
let mut interval = interval(period);
let mut alive = true;
while alive {
tokio::select! {
_ = interval.tick() => {},
_ = &mut rx => {
_ = notify_loop.notified() => {
alive = false;
}
};
Expand All @@ -169,7 +170,7 @@ pub fn save_metrics(
Ok::<(), anyhow::Error>(())
});
async move {
let _ = tx.send(());
notify_shutdown.notify_one();
jh.await?
}
.boxed()
Expand Down

0 comments on commit 8722ae1

Please sign in to comment.