Skip to content

Commit

Permalink
Remove enforcing seq in bubblegum backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Oct 7, 2024
1 parent f4744c0 commit 1afee45
Showing 1 changed file with 30 additions and 125 deletions.
155 changes: 30 additions & 125 deletions backfill/src/worker/program_transformer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use anyhow::Result;
use blockbuster::{
instruction::InstructionBundle, program_handler::ProgramParser,
programs::bubblegum::BubblegumParser, programs::ProgramParseResult,
};
use clap::Parser;
use das_core::{create_download_metadata_notifier, DownloadMetadataInfo};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::error;
use program_transformers::{
bubblegum::handle_bubblegum_instruction, ProgramTransformer, TransactionInfo,
};
use sea_orm::SqlxPostgresConnector;
use program_transformers::{ProgramTransformer, TransactionInfo};
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Sender, UnboundedSender};
use tokio::task::JoinHandle;

Expand All @@ -19,6 +15,8 @@ use crate::BubblegumBackfillContext;
pub struct ProgramTransformerWorkerArgs {
#[arg(long, env, default_value = "100000")]
pub program_transformer_channel_size: usize,
#[arg(long, env, default_value = "50")]
pub program_transformer_worker_count: usize,
}

impl ProgramTransformerWorkerArgs {
Expand All @@ -32,132 +30,39 @@ impl ProgramTransformerWorkerArgs {

let worker_forwarder = forwarder.clone();
let worker_pool = context.database_pool.clone();
let worker_count = self.program_transformer_worker_count;
let handle = tokio::spawn(async move {
let mut transactions = Vec::new();

let download_metadata_notifier =
create_download_metadata_notifier(worker_forwarder.clone()).await;
let program_transformer =
ProgramTransformer::new(worker_pool.clone(), download_metadata_notifier);

while let Some(transaction) = receiver.recv().await {
transactions.push(transaction);
}

let mut instructions = transactions
.iter()
.flat_map(|tx_info| {
let ordered_instructions = program_transformer.break_transaction(tx_info);
ordered_instructions.into_iter().map(|(ix_pair, inner_ix)| {
(
tx_info.signature.to_string(),
ix_pair.0,
ix_pair.1,
inner_ix,
ix_pair
.1
.accounts
.iter()
.map(|&i| tx_info.account_keys[i as usize])
.collect::<Vec<_>>(),
tx_info.slot,
)
})
})
.collect::<Vec<_>>();
instructions.sort_by(|a, b| {
let a_tree_update_seq = if let Some(program_parser) =
program_transformer.match_program(&a.1)
{
if let Ok(result) = program_parser.handle_instruction(&InstructionBundle {
txn_id: &a.0,
program: a.1,
instruction: Some(a.2),
inner_ix: a.3.as_deref(),
keys: a.4.as_slice(),
slot: a.5,
}) {
if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type()
{
parsing_result
.tree_update
.as_ref()
.map_or(u64::MAX, |event| event.seq)
} else {
u64::MAX
}
} else {
u64::MAX
}
} else {
u64::MAX
};

let b_tree_update_seq = if let Some(program_parser) =
program_transformer.match_program(&b.1)
{
if let Ok(result) = program_parser.handle_instruction(&InstructionBundle {
txn_id: &b.0,
program: b.1,
instruction: Some(b.2),
inner_ix: b.3.as_deref(),
keys: b.4.as_slice(),
slot: b.5,
}) {
if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type()
{
parsing_result
.tree_update
.as_ref()
.map_or(u64::MAX, |event| event.seq)
} else {
u64::MAX
}
} else {
u64::MAX
}
} else {
u64::MAX
};
let program_transformer = Arc::new(ProgramTransformer::new(
worker_pool.clone(),
download_metadata_notifier,
));

a_tree_update_seq.cmp(&b_tree_update_seq)
});
let mut handlers = FuturesUnordered::new();

let parser = BubblegumParser {};

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(worker_pool);

for i in instructions {
let bundle = &InstructionBundle {
txn_id: &i.0,
program: i.1,
instruction: Some(i.2),
inner_ix: i.3.as_deref(),
keys: i.4.as_slice(),
slot: i.5,
};
if let Ok(result) = parser.handle_instruction(bundle) {
if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type() {
let download_metadata_notifier =
create_download_metadata_notifier(worker_forwarder.clone()).await;
while let Some(transaction) = receiver.recv().await {
if handlers.len() >= worker_count {
handlers.next().await;
}

if let Err(err) = handle_bubblegum_instruction(
parsing_result,
bundle,
&conn,
&download_metadata_notifier,
)
let program_transformer_clone = Arc::clone(&program_transformer);
let handle = tokio::spawn(async move {
if let Err(err) = program_transformer_clone
.handle_transaction(&transaction)
.await
{
error!(
"Failed to handle bubblegum instruction for txn {:?}: {:?}",
bundle.txn_id, err
);
break;
}
{
error!(
"Failed to handle bubblegum instruction for txn {:?}: {:?}",
transaction.signature, err
);
}
}
});

handlers.push(handle);
}

futures::future::join_all(handlers).await;
});

Ok((handle, sender))
Expand Down

0 comments on commit 1afee45

Please sign in to comment.