Skip to content

Commit

Permalink
fix(tools): fix the fix-tree tool (metaplex-foundation#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasPennie authored Jun 27, 2023
1 parent 763126a commit bebefec
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 33 deletions.
2 changes: 1 addition & 1 deletion tools/tree-status/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ cargo run -- \
cargo run -- \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 3 \
--concurrency 25 \
fix-tree --pg-url $DB_URL --redis-url $REDIS_URL --tree $TREE
```
95 changes: 63 additions & 32 deletions tools/tree-status/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ enum Action {
redis_url: String,
#[arg(short, long, help = "Tree pubkey")]
tree: String,
#[arg(
short,
long,
help = "Concurrency for fetching signatures for sequence batches"
)]
get_sigs_concurrency: Option<usize>,
},
}

Expand Down Expand Up @@ -332,14 +338,28 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Action::FixTree { .. } => {
Action::FixTree {
get_sigs_concurrency,
pg_url: _,
redis_url: _,
tree: _,
} => {
let client = RpcClient::new(args.rpc.clone());
let conn = args.get_pg_conn().await?;
let messenger_config = args.get_messenger_config().await?;
if let Some(maybe_pubkey) = pubkeys.next().await {
let pubkey: Pubkey = maybe_pubkey?;
info!("fixing tree {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = fix_tree(pubkey, client, conn, messenger_config).await {
if let Err(error) = fix_tree(
pubkey,
client,
conn,
messenger_config,
Some(args.concurrency),
get_sigs_concurrency.to_owned(),
)
.await
{
error!("{:?}", error);
}
}
Expand Down Expand Up @@ -406,6 +426,8 @@ async fn fix_tree(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
get_txn_concurrency: Option<usize>,
get_sigs_concurrency: Option<usize>,
) -> anyhow::Result<()> {
let onchain_seq: i64 = get_onchain_tree_seq(pubkey, &client)
.await
Expand All @@ -425,13 +447,15 @@ async fn fix_tree(
indexed_seq.max_seq, indexed_seq.cnt_seq
);
let missing_seqs = get_missing_seq(pubkey, onchain_seq, &conn).await?;
warn!("[{pubkey}] missing seq: {:?}", missing_seqs);
trace!("[{pubkey}] missing seq: {:?}", missing_seqs);
find_and_forward_txns_for_missing_seqs(
pubkey,
missing_seqs,
client,
conn,
messenger_config,
get_txn_concurrency,
get_sigs_concurrency,
)
.await?;
} else {
Expand All @@ -449,11 +473,12 @@ async fn find_and_forward_txns_for_missing_seqs(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
get_txn_concurrency: Option<usize>,
get_sigs_concurrency: Option<usize>,
) -> anyhow::Result<()> {
// Concurrency config
// TODO: Change to args
let batch_processing_concurrency = 10;
let get_txn_concurrency = 20;
let get_txn_concurrency: usize = get_txn_concurrency.unwrap_or(20);
let get_sigs_concurrency: usize = get_sigs_concurrency.unwrap_or(3);

let (r_sender, r_recv) = unbounded();
let (s_sender, s_recv) = unbounded();
Expand All @@ -473,21 +498,22 @@ async fn find_and_forward_txns_for_missing_seqs(

s.spawn(|_| {
let ranges = build_seq_ranges(seqs);
trace!("Processing seq ranges: {:?}", ranges);
info!("Processing seq ranges: {:?}", ranges);
for range in ranges {
r_sender.send(range).unwrap();
}
drop(r_sender);
});

for _ in 0..batch_processing_concurrency {
for _ in 0..get_sigs_concurrency {
let (s_sender, r_recv) = (s_sender.clone(), r_recv.clone());
let client = client.clone();
let conn = conn.clone();
let runtime = runtime.clone();
// Spawn workers in separate threads
s.spawn(move |_| {
for range in r_recv.iter() {
info!("Processing seq range: {:?}", range);
runtime
.block_on(find_signatures_for_missing_seq_range(
tree, range, &client, &conn, &s_sender,
Expand Down Expand Up @@ -614,52 +640,57 @@ async fn find_signatures_for_missing_seq_range(
let (start, end) = range;
trace!("Filling gap for range: [{:?}, {:?}]", start, end);

// Find the next indexed seq outside of the range.
let res = cl_audits::Entity::find()
// Find the next indexed after the end of the range.
let before_txn = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.gt(end))
.filter(cl_audits::Column::Seq.gte(end))
.order_by_asc(cl_audits::Column::Seq)
.limit(1)
.all(conn)
.await?;
let next_txn = res
let before_txn = before_txn
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?;
.ok_or_else(|| anyhow::anyhow!("Failed to get before txn for seq: {:?}", end))?;

// Find the indexed seq before the start of the range.
let until_txn = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.lte(start))
.order_by_desc(cl_audits::Column::Seq)
.limit(1)
.all(conn)
.await?;
let until_txn = until_txn
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get until txn for seq: {:?}", start))?;

trace!(
"Next txn for missing seq range [{:?}, {:?}]: {:?}",
next_txn.tx,
"Txns for missing seq range [{:?}, {:?}]. Until (start): {:?}. Before (end): {:?}.",
start,
end
end,
until_txn.tx,
before_txn.tx,
);

// Extra signatures to send as a safety buffer.
// E.g. if Seq 5 is missing and txn D produced Seq 6, we crawl back N signatures and send them all.
// This is a hacky workaround for failed signatures or non-bubblegum txns.
let hacky_buffer = 3;

// Fetch signatures for all txns in the range before "next".
// TODO: Scan and parse the transactions using blockbuster and skip failed txns.
let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize;
let mut before = Signature::from_str(&next_txn.tx).ok();
let mut before = Signature::from_str(&before_txn.tx).ok();
let until = Signature::from_str(&until_txn.tx).ok();
let limit: usize = 1000;
loop {
let limit = cmp::min(1000, sigs_remaining);
let config = GetConfirmedSignaturesForAddress2Config {
before: before,
until: until,
limit: Some(limit),
..Default::default()
};
let sigs = client
.get_signatures_for_address_with_config(&tree, config)
.await?;
for (i, sig) in sigs.clone().iter().enumerate() {
for sig in sigs.clone() {
let o = Signature::from_str(&sig.signature)?;
sender.send(o)?;
if i == sigs.len() {
before = Some(o);
}
before = Some(o);
}
sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0);
if sigs_remaining == 0 || sigs.len() == 0 {
if sigs.len() == 0 {
break;
}
}
Expand Down

0 comments on commit bebefec

Please sign in to comment.