Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tools): fix the fix-tree tool #67

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/tree-status/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ cargo run -- \
cargo run -- \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 3 \
--concurrency 25 \
fix-tree --pg-url $DB_URL --redis-url $REDIS_URL --tree $TREE
```
95 changes: 63 additions & 32 deletions tools/tree-status/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ enum Action {
redis_url: String,
#[arg(short, long, help = "Tree pubkey")]
tree: String,
#[arg(
short,
long,
help = "Concurrency for fetching signatures for sequence batches"
)]
get_sigs_concurrency: Option<usize>,
},
}

Expand Down Expand Up @@ -332,14 +338,28 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Action::FixTree { .. } => {
Action::FixTree {
get_sigs_concurrency,
pg_url: _,
redis_url: _,
tree: _,
} => {
let client = RpcClient::new(args.rpc.clone());
let conn = args.get_pg_conn().await?;
let messenger_config = args.get_messenger_config().await?;
if let Some(maybe_pubkey) = pubkeys.next().await {
let pubkey: Pubkey = maybe_pubkey?;
info!("fixing tree {pubkey}, hex: {}", hex::encode(pubkey));
if let Err(error) = fix_tree(pubkey, client, conn, messenger_config).await {
if let Err(error) = fix_tree(
pubkey,
client,
conn,
messenger_config,
Some(args.concurrency),
get_sigs_concurrency.to_owned(),
)
.await
{
error!("{:?}", error);
}
}
Expand Down Expand Up @@ -406,6 +426,8 @@ async fn fix_tree(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
get_txn_concurrency: Option<usize>,
get_sigs_concurrency: Option<usize>,
) -> anyhow::Result<()> {
let onchain_seq: i64 = get_onchain_tree_seq(pubkey, &client)
.await
Expand All @@ -425,13 +447,15 @@ async fn fix_tree(
indexed_seq.max_seq, indexed_seq.cnt_seq
);
let missing_seqs = get_missing_seq(pubkey, onchain_seq, &conn).await?;
warn!("[{pubkey}] missing seq: {:?}", missing_seqs);
trace!("[{pubkey}] missing seq: {:?}", missing_seqs);
find_and_forward_txns_for_missing_seqs(
pubkey,
missing_seqs,
client,
conn,
messenger_config,
get_txn_concurrency,
get_sigs_concurrency,
)
.await?;
} else {
Expand All @@ -449,11 +473,12 @@ async fn find_and_forward_txns_for_missing_seqs(
client: RpcClient,
conn: DatabaseConnection,
messenger_config: MessengerConfig,
get_txn_concurrency: Option<usize>,
get_sigs_concurrency: Option<usize>,
) -> anyhow::Result<()> {
// Concurrency config
// TODO: Change to args
let batch_processing_concurrency = 10;
let get_txn_concurrency = 20;
let get_txn_concurrency: usize = get_txn_concurrency.unwrap_or(20);
let get_sigs_concurrency: usize = get_sigs_concurrency.unwrap_or(3);

let (r_sender, r_recv) = unbounded();
let (s_sender, s_recv) = unbounded();
Expand All @@ -473,21 +498,22 @@ async fn find_and_forward_txns_for_missing_seqs(

s.spawn(|_| {
let ranges = build_seq_ranges(seqs);
trace!("Processing seq ranges: {:?}", ranges);
info!("Processing seq ranges: {:?}", ranges);
for range in ranges {
r_sender.send(range).unwrap();
}
drop(r_sender);
});

for _ in 0..batch_processing_concurrency {
for _ in 0..get_sigs_concurrency {
let (s_sender, r_recv) = (s_sender.clone(), r_recv.clone());
let client = client.clone();
let conn = conn.clone();
let runtime = runtime.clone();
// Spawn workers in separate threads
s.spawn(move |_| {
for range in r_recv.iter() {
info!("Processing seq range: {:?}", range);
runtime
.block_on(find_signatures_for_missing_seq_range(
tree, range, &client, &conn, &s_sender,
Expand Down Expand Up @@ -614,52 +640,57 @@ async fn find_signatures_for_missing_seq_range(
let (start, end) = range;
trace!("Filling gap for range: [{:?}, {:?}]", start, end);

// Find the next indexed seq outside of the range.
let res = cl_audits::Entity::find()
// Find the next indexed after the end of the range.
let before_txn = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.gt(end))
.filter(cl_audits::Column::Seq.gte(end))
.order_by_asc(cl_audits::Column::Seq)
.limit(1)
.all(conn)
.await?;
let next_txn = res
let before_txn = before_txn
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get next txn for seq: {:?}", end))?;
.ok_or_else(|| anyhow::anyhow!("Failed to get before txn for seq: {:?}", end))?;

// Find the indexed seq before the start of the range.
let until_txn = cl_audits::Entity::find()
.filter(cl_audits::Column::Tree.eq(tree.as_ref()))
.filter(cl_audits::Column::Seq.lte(start))
.order_by_desc(cl_audits::Column::Seq)
.limit(1)
.all(conn)
.await?;
let until_txn = until_txn
.first()
.ok_or_else(|| anyhow::anyhow!("Failed to get until txn for seq: {:?}", start))?;

trace!(
"Next txn for missing seq range [{:?}, {:?}]: {:?}",
next_txn.tx,
"Txns for missing seq range [{:?}, {:?}]. Until (start): {:?}. Before (end): {:?}.",
start,
end
end,
until_txn.tx,
before_txn.tx,
);

// Extra signatures to send as a safety buffer.
// E.g. if Seq 5 is missing and txn D produced Seq 6, we crawl back N signatures and send them all.
// This is a hacky workaround for failed signatures or non-bubblegum txns.
let hacky_buffer = 3;

// Fetch signatures for all txns in the range before "next".
// TODO: Scan and parse the transactions using blockbuster and skip failed txns.
let mut sigs_remaining = (end - start + hacky_buffer + 1) as usize;
let mut before = Signature::from_str(&next_txn.tx).ok();
let mut before = Signature::from_str(&before_txn.tx).ok();
let until = Signature::from_str(&until_txn.tx).ok();
let limit: usize = 1000;
loop {
let limit = cmp::min(1000, sigs_remaining);
let config = GetConfirmedSignaturesForAddress2Config {
before: before,
until: until,
limit: Some(limit),
..Default::default()
};
let sigs = client
.get_signatures_for_address_with_config(&tree, config)
.await?;
for (i, sig) in sigs.clone().iter().enumerate() {
for sig in sigs.clone() {
let o = Signature::from_str(&sig.signature)?;
sender.send(o)?;
if i == sigs.len() {
before = Some(o);
}
before = Some(o);
}
sigs_remaining = cmp::max(sigs_remaining - sigs.len(), 0);
if sigs_remaining == 0 || sigs.len() == 0 {
if sigs.len() == 0 {
break;
}
}
Expand Down