From 4fd97586ba0d0e4add70e5079792031a257cc8b3 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Tue, 17 Sep 2024 11:12:29 +0200 Subject: [PATCH] refactor: bubblegum backfill orders off of slot (#155) --- backfill/src/worker/program_transformer.rs | 8 ++++---- backfill/src/worker/transaction.rs | 11 ++++------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/backfill/src/worker/program_transformer.rs b/backfill/src/worker/program_transformer.rs index b6416948f..d0e365854 100644 --- a/backfill/src/worker/program_transformer.rs +++ b/backfill/src/worker/program_transformer.rs @@ -19,9 +19,9 @@ impl ProgramTransformerWorkerArgs { &self, context: BubblegumBackfillContext, forwarder: UnboundedSender, - ) -> Result<(JoinHandle<()>, Sender<(Option, TransactionInfo)>)> { + ) -> Result<(JoinHandle<()>, Sender)> { let (sender, mut receiver) = - channel::<(Option, TransactionInfo)>(self.program_transformer_channel_size); + channel::(self.program_transformer_channel_size); let handle = tokio::spawn(async move { let mut transactions = Vec::new(); @@ -35,9 +35,9 @@ impl ProgramTransformerWorkerArgs { transactions.push(gap); } - transactions.sort_by(|(a, _), (b, _)| a.cmp(b)); + transactions.sort_by(|a, b| a.slot.cmp(&b.slot)); - for (_, transaction) in transactions { + for transaction in transactions { if let Err(e) = program_transformer.handle_transaction(&transaction).await { error!("handle transaction: {:?}", e) }; diff --git a/backfill/src/worker/transaction.rs b/backfill/src/worker/transaction.rs index 1b49c121f..6a25f6f1b 100644 --- a/backfill/src/worker/transaction.rs +++ b/backfill/src/worker/transaction.rs @@ -147,7 +147,7 @@ impl SignatureWorkerArgs { pub fn start( &self, context: crate::BubblegumBackfillContext, - forwarder: Sender<(Option, TransactionInfo)>, + forwarder: Sender, ) -> Result<(JoinHandle<()>, Sender)> { let (sig_sender, mut sig_receiver) = channel::(self.signature_channel_size); let worker_count = self.signature_worker_count; @@ -177,16 +177,13 @@ impl SignatureWorkerArgs { async fn queue_transaction<'a>( client: Rpc, - sender: Sender<(Option, TransactionInfo)>, + sender: Sender, signature: Signature, ) -> Result<(), ErrorKind> { let transaction = client.get_transaction(&signature).await?; sender - .send(( - transaction.block_time, - FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?, - )) + .send(FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?) .await .map_err(|e| ErrorKind::Generic(e.to_string()))?; @@ -195,7 +192,7 @@ async fn queue_transaction<'a>( fn spawn_transaction_worker( client: Rpc, - sender: Sender<(Option, TransactionInfo)>, + sender: Sender, signature: Signature, ) -> JoinHandle<()> { tokio::spawn(async move {