Skip to content

Commit

Permalink
tree-stautus: improve concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
niks3089 committed Jun 13, 2023
1 parent 40dc86f commit 3becda4
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 133 deletions.
258 changes: 125 additions & 133 deletions tools/tree-status/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::thread;

use crossbeam::channel::{bounded, unbounded, Sender};
use digital_asset_types::dao::cl_audits;
use log::{trace, warn};
Expand Down Expand Up @@ -64,7 +66,7 @@ use {
io::{stdout, AsyncWrite, AsyncWriteExt},
sync::{mpsc, Mutex},
},
txn_forwarder::{find_signatures, read_lines, rpc_tx_with_retries},
txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries, rpc_tx_with_retries},
};

const RPC_GET_TXN_RETRIES: u8 = 5;
Expand Down Expand Up @@ -339,7 +341,9 @@ async fn main() -> anyhow::Result<()> {
if let Some(maybe_pubkey) = pubkeys.next().await {
let pubkey: Pubkey = maybe_pubkey?;
info!("fixing tree {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = fix_tree(pubkey, client, conn, messenger_config).await {
if let Err(error) =
fix_tree(pubkey, client, conn, messenger_config, concurrency.into()).await
{
error!("{:?}", error);
}
}
Expand Down Expand Up @@ -406,6 +410,7 @@ async fn fix_tree(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
concurrency: usize,
) -> anyhow::Result<()> {
let onchain_seq: i64 = get_onchain_tree_seq(pubkey, &client)
.await
Expand All @@ -432,6 +437,7 @@ async fn fix_tree(
client,
conn,
messenger_config,
concurrency,
)
.await?;
} else {
Expand All @@ -449,73 +455,66 @@ async fn find_and_forward_txns_for_missing_seqs(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
concurrency: usize,
) -> anyhow::Result<()> {
// Concurrency config
// TODO: Change to args
let batch_processing_concurrency = 10;
let get_txn_concurrency = 20;

let (r_sender, r_recv) = unbounded();
let (s_sender, s_recv) = unbounded();
let (s_sender, s_receiver) = mpsc::channel(2_000);
let s_receiver = Arc::new(Mutex::new(s_receiver));

let client = Arc::new(client);
let conn = Arc::new(conn);
let messenger = init_redis_messenger(messenger_config).await?;

crossbeam::scope(|s| {
let runtime = Arc::new(
Builder::new_multi_thread()
.enable_all()
.worker_threads(4)
.build()
.unwrap(),
);
let ranges = build_seq_ranges(seqs);
for range in ranges {
s_sender.send(range).await.unwrap();
}

s.spawn(|_| {
let ranges = build_seq_ranges(seqs);
trace!("Processing seq ranges: {:?}", ranges);
for range in ranges {
r_sender.send(range).unwrap();
}
drop(r_sender);
});

for _ in 0..batch_processing_concurrency {
let (s_sender, r_recv) = (s_sender.clone(), r_recv.clone());
let client = client.clone();
let conn = conn.clone();
let runtime = runtime.clone();
// Spawn workers in separate threads
s.spawn(move |_| {
for range in r_recv.iter() {
runtime
.block_on(find_signatures_for_missing_seq_range(
tree, range, &client, &conn, &s_sender,
))
.unwrap();
}
});
}
drop(s_sender);

for _ in 0..get_txn_concurrency {
let s_recv = s_recv.clone();
let client = client.clone();
let messenger = messenger.clone();
let runtime = runtime.clone();
s.spawn(move |_| {
for sig in s_recv.iter() {
trace!("Attempting to send signature to redis: {:?}", sig);
runtime
.block_on(send_txn(sig, &client, &messenger))
.unwrap();
drop(s_sender);
try_join_all((0..concurrency).map(|_| {
let s_receiver = Arc::clone(&s_receiver);
let client = Arc::clone(&client);
let conn = Arc::clone(&conn);
let messenger = Arc::clone(&messenger);
async move {
loop {
let maybe_range = {
let mut s_receiver = s_receiver.lock().await; // Acquire the lock
s_receiver.recv().await
};
match maybe_range {
Some(range) => {
drop(s_receiver);

let mut receiver = find_signatures_for_missing_seq_range(
tree,
range,
Arc::clone(&client),
Arc::clone(&conn),
);

while let Some(result) = receiver.recv().await {
match result {
Ok(signature) => {
send_txn(signature, &client, Arc::clone(&messenger)).await?;
}
Err(e) => {
// Handle the error...
}
}
}

break;
}
None => break,
}
});
}

Ok::<(), anyhow::Error>(())
}
})
.unwrap();
}))
.await?;

anyhow::Ok(())
Ok(())
}

async fn init_redis_messenger(
Expand All @@ -532,34 +531,18 @@ async fn init_redis_messenger(
async fn send_txn(
signature: Signature,
client: &RpcClient,
messenger: &Mutex<Box<dyn plerkle_messenger::Messenger>>,
messenger: Arc<Mutex<Box<dyn plerkle_messenger::Messenger>>>,
) -> anyhow::Result<()> {
let txn: EncodedConfirmedTransactionWithStatusMeta = rpc_tx_with_retries(
rpc_send_with_retries(
&client,
RpcRequest::GetTransaction,
serde_json::json!([signature.to_string(), RPC_TXN_CONFIG,]),
RPC_GET_TXN_RETRIES,
Arc::clone(&messenger),
signature,
)
.await?;

// Ignore if tx failed or meta is missed
let meta = txn.transaction.meta.as_ref();
if meta.map(|meta| meta.status.is_err()).unwrap_or(true) {
info!("Dropping failed transaction: {:?}", signature);
return Ok(());
}

let fbb = flatbuffers::FlatBufferBuilder::new();
let fbb = seralize_encoded_transaction_with_status(fbb, txn)
.with_context(|| format!("failed to serialize transaction with {}", signature))?;
let bytes = fbb.finished_data();

let mut locked = messenger.lock().await;
locked.send(TRANSACTION_STREAM, bytes).await?;
drop(locked);
info!("Successfully pushed transaction to redis: {:?}", signature);
Ok(())
.await
.map_err(|e| anyhow::anyhow!(e))
}

fn build_seq_ranges(seqs: Vec<i64>) -> Vec<(i64, i64)> {
Expand Down Expand Up @@ -604,67 +587,76 @@ fn build_seq_ranges(seqs: Vec<i64>) -> Vec<(i64, i64)> {
// Add the following:
// 1 – Keep searching until finding a successful transaction.
// 2 – Parse txns and extract seq, keep searching until the seq is found (can use Helius for this).
async fn find_signatures_for_missing_seq_range(
fn find_signatures_for_missing_seq_range(
tree: Pubkey,
range: (i64, i64),
client: &RpcClient,
conn: &DatabaseConnection,
sender: &Sender<Signature>,
) -> anyhow::Result<()> {
client: Arc<RpcClient>,
conn: Arc<DatabaseConnection>,
) -> mpsc::Receiver<anyhow::Result<Signature>> {
let (start, end) = range;
trace!("Filling gap for range: [{:?}, {:?}]", start, end);
let (sender, receiver) = mpsc::channel(2_000);

tokio::spawn(async move {
info!("Filling gap for range: [{:?}, {:?}]", start, end);

// Find the next indexed seq outside of the range.
let res = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.gt(end))
.order_by_asc(cl_audits::Column::Seq)
.limit(1)
.all(&*conn)
.await
.map_err(|e| anyhow::anyhow!(e))?;
let next_txn = res
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?;
trace!(
"Next txn for missing seq range [{:?}, {:?}]: {:?}",
start,
end,
next_txn.tx,
);

// Find the next indexed seq outside of the range.
let res = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.gt(end))
.order_by_asc(cl_audits::Column::Seq)
.limit(1)
.all(conn)
.await?;
let next_txn = res
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?;
trace!(
"Next txn for missing seq range [{:?}, {:?}]: {:?}",
next_txn.tx,
start,
end
);
// Extra signatures to send as a safety buffer.
let hacky_buffer = 3;

// Fetch signatures for all txns in the range before "next".
let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize;
let mut before = Signature::from_str(&next_txn.tx).ok();
loop {
let limit = cmp::min(1000, sigs_remaining);
let config = GetConfirmedSignaturesForAddress2Config {
before: before,
limit: Some(limit),
..Default::default()
};
let sigs = client
.get_signatures_for_address_with_config(&tree, config)
.await
.map_err(|e| anyhow::anyhow!(e))?;

for (i, sig) in sigs.iter().enumerate() {
let o = Signature::from_str(&sig.signature).map_err(|e| anyhow::anyhow!(e))?;
sender
.send(Ok(o))
.await
.map_err(|_| anyhow::anyhow!("Failed to send signature"))?;
if i == sigs.len() {
before = Some(o);
}
}

// Extra signatures to send as a safety buffer.
// E.g. if Seq 5 is missing and txn D produced Seq 6, we crawl back N signatures and send them all.
// This is a hacky workaround for failed signatures or non-bubblegum txns.
let hacky_buffer = 3;

// Fetch signatures for all txns in the range before "next".
// TODO: Scan and parse the transactions using blockbuster and skip failed txns.
let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize;
let mut before = Signature::from_str(&next_txn.tx).ok();
loop {
let limit = cmp::min(1000, sigs_remaining);
let config = GetConfirmedSignaturesForAddress2Config {
before: before,
limit: Some(limit),
..Default::default()
};
let sigs = client
.get_signatures_for_address_with_config(&tree, config)
.await?;
for (i, sig) in sigs.clone().iter().enumerate() {
let o = Signature::from_str(&sig.signature)?;
sender.send(o)?;
if i == sigs.len() {
before = Some(o);
sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0);
if sigs_remaining == 0 || sigs.len() == 0 {
break;
}
}
sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0);
if sigs_remaining == 0 || sigs.len() == 0 {
break;
}
}

return anyhow::Ok(());
Ok::<(), anyhow::Error>(())
});

receiver
}

async fn get_onchain_tree_seq(address: Pubkey, client: &RpcClient) -> anyhow::Result<u64> {
Expand Down
1 change: 1 addition & 0 deletions tools/txn_forwarder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ async fn send(
// Ignore if tx failed or meta is missed
let meta = tx.transaction.meta.as_ref();
if meta.map(|meta| meta.status.is_err()).unwrap_or(true) {
info!("Dropping failed transaction: {:?}", signature);
return Ok(());
}

Expand Down

0 comments on commit 3becda4

Please sign in to comment.