From ea359c280dbf1f5fd7f0eb4011a794ba89670968 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Thu, 22 Sep 2022 14:52:42 -0500 Subject: [PATCH] Use BundleAccountLocker when handling tip txs (#147) --- anchor | 2 +- bootstrap | 12 +- core/src/banking_stage.rs | 22 +-- core/src/bundle_account_locker.rs | 74 +++++--- core/src/bundle_sanitizer.rs | 3 +- core/src/bundle_stage.rs | 258 ++++++++++++++------------- core/src/proxy/auth.rs | 6 +- core/src/proxy/block_engine_stage.rs | 2 - core/src/proxy/relayer_stage.rs | 4 - core/src/tip_manager.rs | 12 +- dev/Dockerfile | 10 +- f | 5 + jito-programs | 2 +- s | 2 +- 14 files changed, 222 insertions(+), 192 deletions(-) diff --git a/anchor b/anchor index d1ca272839..7532647bb8 160000 --- a/anchor +++ b/anchor @@ -1 +1 @@ -Subproject commit d1ca27283920fb470ad78672486f0894fc0c38b6 +Subproject commit 7532647bb86d26fd7497d9cbc7ac99e2b3941e86 diff --git a/bootstrap b/bootstrap index 48ac18fe08..1060cffeee 100755 --- a/bootstrap +++ b/bootstrap @@ -7,16 +7,16 @@ RUST_LOG=INFO \ NDEBUG=1 ./multinode-demo/bootstrap-validator.sh \ --wait-for-supermajority 0 \ --expected-bank-hash $bank_hash \ - --block-engine-address http://0.0.0.0:13333 \ - --block-engine-auth-service-address http://0.0.0.0:14444 \ - --relayer-auth-service-address http://0.0.0.0:11226 \ - --relayer-address http://0.0.0.0:11226 \ + --block-engine-address http://127.0.0.1:1003 \ + --block-engine-auth-service-address http://127.0.0.1:1005 \ + --relayer-auth-service-address http://127.0.0.1:11226 \ + --relayer-address http://127.0.0.1:11226 \ --rpc-pubsub-enable-block-subscription \ --enable-rpc-transaction-history \ --tip-payment-program-pubkey DThZmRNNXh7kvTQW9hXeGoWGPKktK8pgVAyoTLjH7UrT \ --tip-distribution-program-pubkey FjrdANjvo76aCYQ4kf9FM1R8aESUcEE6F8V7qyoVUQcM \ --commission-bps 0 \ - --shred-receiver-address 0.0.0.0:13330 \ + --shred-receiver-address 127.0.0.1:1002 \ --allow-private-addr \ --trust-relayer-packets \ - --trust-block-engine-packets \ No newline at end of file + --trust-block-engine-packets diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4daa5ae537..47b912df01 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1530,19 +1530,19 @@ impl BankingStage { // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state. - // BundleStage prevents locking ALL accounts in ALL transactions in a bundle mid-execution - // to ensure that avoid race conditions let mut lock_time = Measure::start("lock_time"); - let read_locks = bundle_account_locker.read_locks(); - let write_locks = bundle_account_locker.write_locks(); - - let batch = bank.prepare_sanitized_batch_with_results( - txs, - transactions_qos_results.iter(), - &read_locks, - &write_locks, - ); + let batch = { + // BundleStage locks ALL accounts in ALL transactions in a bundle to avoid race + // conditions with BankingStage + let account_locks = bundle_account_locker.account_locks(); + bank.prepare_sanitized_batch_with_results( + txs, + transactions_qos_results.iter(), + &account_locks.read_locks(), + &account_locks.write_locks(), + ) + }; lock_time.stop(); // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit diff --git a/core/src/bundle_account_locker.rs b/core/src/bundle_account_locker.rs index abaa7dad11..7a47ffaf69 100644 --- a/core/src/bundle_account_locker.rs +++ b/core/src/bundle_account_locker.rs @@ -9,12 +9,13 @@ /// and commit the results before the bundle completes. By the time the bundle commits the new account /// state for {A, B, C}, A and B would be incorrect and the entries containing the bundle would be /// replayed improperly and that leader would have produced an invalid block. -use std::sync::{Arc, Mutex}; use { + solana_runtime::bank::Bank, solana_sdk::{ bundle::sanitized::SanitizedBundle, pubkey::Pubkey, transaction::TransactionAccountLocks, }, std::collections::{hash_map::Entry, HashMap, HashSet}, + std::sync::{Arc, Mutex, MutexGuard}, }; #[derive(Debug)] @@ -27,16 +28,19 @@ pub type BundleAccountLockerResult = Result; pub struct LockedBundle<'a, 'b> { bundle_account_locker: &'a BundleAccountLocker, sanitized_bundle: &'b SanitizedBundle, + bank: Arc, } impl<'a, 'b> LockedBundle<'a, 'b> { pub fn new( bundle_account_locker: &'a BundleAccountLocker, sanitized_bundle: &'b SanitizedBundle, + bank: &Arc, ) -> Self { Self { bundle_account_locker, sanitized_bundle, + bank: bank.clone(), } } @@ -50,64 +54,63 @@ impl<'a, 'b> Drop for LockedBundle<'a, 'b> { fn drop(&mut self) { let _ = self .bundle_account_locker - .unlock_bundle_accounts(self.sanitized_bundle); + .unlock_bundle_accounts(self.sanitized_bundle, &self.bank); } } #[derive(Default, Clone)] -struct BundleAccountLocks { - read_locks: Arc>>, - write_locks: Arc>>, +pub struct BundleAccountLocks { + read_locks: HashMap, + write_locks: HashMap, } impl BundleAccountLocks { pub fn read_locks(&self) -> HashSet { - self.read_locks.lock().unwrap().keys().cloned().collect() + self.read_locks.keys().cloned().collect() } pub fn write_locks(&self) -> HashSet { - self.write_locks.lock().unwrap().keys().cloned().collect() + self.write_locks.keys().cloned().collect() } pub fn lock_accounts( - &self, + &mut self, read_locks: HashMap, write_locks: HashMap, ) { - let mut read_locks_l = self.read_locks.lock().unwrap(); - let mut write_locks_l = self.write_locks.lock().unwrap(); for (acc, count) in read_locks { - *read_locks_l.entry(acc).or_insert(0) += count; + *self.read_locks.entry(acc).or_insert(0) += count; } for (acc, count) in write_locks { - *write_locks_l.entry(acc).or_insert(0) += count; + *self.write_locks.entry(acc).or_insert(0) += count; } } pub fn unlock_accounts( - &self, + &mut self, read_locks: HashMap, write_locks: HashMap, ) { - let mut read_locks_l = self.read_locks.lock().unwrap(); - let mut write_locks_l = self.write_locks.lock().unwrap(); - for (acc, count) in read_locks { - if let Entry::Occupied(mut entry) = read_locks_l.entry(acc) { + if let Entry::Occupied(mut entry) = self.read_locks.entry(acc) { let val = entry.get_mut(); *val = val.saturating_sub(count); if entry.get() == &0 { let _ = entry.remove(); } + } else { + warn!("error unlocking read-locked account, account: {:?}", acc); } } for (acc, count) in write_locks { - if let Entry::Occupied(mut entry) = write_locks_l.entry(acc) { + if let Entry::Occupied(mut entry) = self.write_locks.entry(acc) { let val = entry.get_mut(); *val = val.saturating_sub(count); if entry.get() == &0 { let _ = entry.remove(); } + } else { + warn!("error unlocking write-locked account, account: {:?}", acc); } } } @@ -115,20 +118,26 @@ impl BundleAccountLocks { #[derive(Clone, Default)] pub struct BundleAccountLocker { - account_locks: BundleAccountLocks, + account_locks: Arc>, } impl BundleAccountLocker { /// used in BankingStage during TransactionBatch construction to ensure that BankingStage /// doesn't lock anything currently locked in the BundleAccountLocker pub fn read_locks(&self) -> HashSet { - self.account_locks.read_locks() + self.account_locks.lock().unwrap().read_locks() } /// used in BankingStage during TransactionBatch construction to ensure that BankingStage /// doesn't lock anything currently locked in the BundleAccountLocker pub fn write_locks(&self) -> HashSet { - self.account_locks.write_locks() + self.account_locks.lock().unwrap().write_locks() + } + + /// used in BankingStage during TransactionBatch construction to ensure that BankingStage + /// doesn't lock anything currently locked in the BundleAccountLocker + pub fn account_locks(&self) -> MutexGuard { + self.account_locks.lock().unwrap() } /// Prepares a locked bundle and returns a LockedBundle containing locked accounts. @@ -136,21 +145,29 @@ impl BundleAccountLocker { pub fn prepare_locked_bundle<'a, 'b>( &'a self, sanitized_bundle: &'b SanitizedBundle, + bank: &Arc, ) -> BundleAccountLockerResult> { - let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?; + let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?; - self.account_locks.lock_accounts(read_locks, write_locks); - Ok(LockedBundle::new(self, sanitized_bundle)) + self.account_locks + .lock() + .unwrap() + .lock_accounts(read_locks, write_locks); + Ok(LockedBundle::new(self, sanitized_bundle, bank)) } /// Unlocks bundle accounts. Note that LockedBundle::drop will auto-drop the bundle account locks fn unlock_bundle_accounts( &self, sanitized_bundle: &SanitizedBundle, + bank: &Bank, ) -> BundleAccountLockerResult<()> { - let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?; + let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?; - self.account_locks.unlock_accounts(read_locks, write_locks); + self.account_locks + .lock() + .unwrap() + .unlock_accounts(read_locks, write_locks); Ok(()) } @@ -158,6 +175,7 @@ impl BundleAccountLocker { /// Each lock type contains a HashMap which maps Pubkey to number of locks held fn get_read_write_locks( bundle: &SanitizedBundle, + bank: &Bank, ) -> BundleAccountLockerResult<(HashMap, HashMap)> { let transaction_locks: Vec = bundle .transactions @@ -265,7 +283,7 @@ mod tests { .expect("sanitize bundle 1"); let locked_bundle0 = bundle_account_locker - .prepare_locked_bundle(&sanitized_bundle0) + .prepare_locked_bundle(&sanitized_bundle0, &bank) .unwrap(); assert_eq!( @@ -278,7 +296,7 @@ mod tests { ); let locked_bundle1 = bundle_account_locker - .prepare_locked_bundle(&sanitized_bundle1) + .prepare_locked_bundle(&sanitized_bundle1, &bank) .unwrap(); assert_eq!( bundle_account_locker.write_locks(), diff --git a/core/src/bundle_sanitizer.rs b/core/src/bundle_sanitizer.rs index 577d96f7a4..1cfc5b0f6f 100644 --- a/core/src/bundle_sanitizer.rs +++ b/core/src/bundle_sanitizer.rs @@ -122,8 +122,7 @@ pub fn get_sanitized_bundle( MAX_PROCESSING_AGE, &mut metrics, ); - if let Some(failure) = check_results.iter().find(|r| r.0.is_err()) { - warn!("bundle check failure: {:?}", failure); + if check_results.iter().any(|r| r.0.is_err()) { return Err(BundleSanitizerError::FailedCheckResults(packet_bundle.uuid)); } diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 9865791f66..ad3b33d24d 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -4,7 +4,7 @@ use { crate::{ banking_stage::{BatchedTransactionDetails, CommitTransactionDetails}, - bundle_account_locker::BundleAccountLocker, + bundle_account_locker::{BundleAccountLocker, BundleAccountLockerResult, LockedBundle}, bundle_sanitizer::get_sanitized_bundle, leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings, packet_bundle::PacketBundle, @@ -63,7 +63,7 @@ use { const MAX_BUNDLE_RETRY_DURATION: Duration = Duration::from_millis(10); -type BundleExecutionResult = Result; +type BundleStageResult = Result; struct AllExecutionResults { pub load_and_execute_tx_output: LoadAndExecuteTransactionsOutput, @@ -225,6 +225,8 @@ impl BundleStage { /// Calculates QoS and reserves compute space for the bundle. If the bundle succeeds, commits /// the results to the cost tracker. If the bundle fails, rolls back any QoS changes made. + /// Ensure that SanitizedBundle was returned by BundleAccountLocker to avoid parallelism issues + /// with banking stage fn update_qos_and_execute_record_commit_bundle( sanitized_bundle: &SanitizedBundle, recorder: &TransactionRecorder, @@ -234,7 +236,11 @@ impl BundleStage { bank_start: &BankStart, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, max_bundle_retry_duration: &Duration, - ) -> BundleExecutionResult<()> { + ) -> BundleStageResult<()> { + if sanitized_bundle.transactions.is_empty() { + return Ok(()); + } + let tx_costs = qos_service.compute_transaction_costs(sanitized_bundle.transactions.iter()); let (transactions_qos_results, num_included) = qos_service.select_transactions_per_cost( sanitized_bundle.transactions.iter(), @@ -310,7 +316,7 @@ impl BundleStage { bank_start: &BankStart, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, max_bundle_retry_duration: &Duration, - ) -> BundleExecutionResult> { + ) -> BundleStageResult> { let mut account_overrides = AccountOverrides::default(); let mut execution_results = Vec::new(); @@ -331,6 +337,13 @@ impl BundleStage { // ************************************************************************ // Build a TransactionBatch that ensures transactions in the bundle // are executed sequentially. + // NOTE: The TransactionBatch is dropped before the results are committed, which + // would normally open up race conditions between this stage and BankingStage where + // a transaction here could read and execute state on a transaction and BankingStage + // could read-execute-store, invaliding the state produced by the bundle. + // Assuming the SanitizedBundle was locked with the BundleAccountLocker, that race + // condition shall be prevented as it holds an extra set of locks until the entire + // bundle is processed. // ************************************************************************ let chunk_end = std::cmp::min(sanitized_bundle.transactions.len(), chunk_start + 128); let chunk = &sanitized_bundle.transactions[chunk_start..chunk_end]; @@ -396,9 +409,9 @@ impl BundleStage { .iter() .any(|r| r.was_executed()) { - warn!("none of the transactions were executed"); let bundle_execution_elapsed = start_time.elapsed(); if bundle_execution_elapsed >= *max_bundle_retry_duration { + warn!("bundle timed out: {:?}", sanitized_bundle); return Err(BundleExecutionError::MaxRetriesExceeded( bundle_execution_elapsed, )); @@ -461,7 +474,7 @@ impl BundleStage { bank_start: &BankStart, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, max_bundle_retry_duration: &Duration, - ) -> BundleExecutionResult> { + ) -> BundleStageResult> { let execution_results = Self::execute_bundle( sanitized_bundle, transaction_status_sender, @@ -491,7 +504,7 @@ impl BundleStage { execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, - ) -> BundleExecutionResult> { + ) -> BundleStageResult> { // ********************************************************************************* // All transactions are executed in the bundle. // Record to PoH and send the saved execution results to the Bank. @@ -639,23 +652,14 @@ impl BundleStage { } } - /// Initializes the tip config, as well as the tip_receiver iff the epoch has changed, then - /// sets the tip_receiver if needed and executes them as a bundle. - fn maybe_initialize_and_change_tip_receiver( - bank_start: &BankStart, + /// When executed the first time, there's some accounts that need to be initialized. + /// This is only helpful for local testing, on testnet and mainnet these will never be executed. + /// TODO (LB): consider removing this for mainnet/testnet and move to program deployment? + fn get_initialize_tip_accounts_transactions( + bank: &Bank, tip_manager: &TipManager, - qos_service: &QosService, - recorder: &TransactionRecorder, - transaction_status_sender: &Option, - execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, - gossip_vote_sender: &ReplayVoteSender, cluster_info: &Arc, - max_bundle_retry_duration: &Duration, - ) -> BundleExecutionResult<()> { - let BankStart { - working_bank: bank, .. - } = bank_start; - + ) -> BundleStageResult> { let maybe_init_tip_payment_config_tx = if tip_manager.should_initialize_tip_payment_program(bank) { info!("initializing tip-payment program config"); @@ -682,7 +686,6 @@ impl BundleStage { .should_init_tip_distribution_account(bank) { info!("initializing my [TipDistributionAccount]"); - Some(tip_manager.init_tip_distribution_account_tx(bank.last_blockhash(), bank.epoch())) } else { None @@ -697,91 +700,7 @@ impl BundleStage { .flatten() .collect::>(); - if !transactions.is_empty() { - info!("executing init txs"); - Self::update_qos_and_execute_record_commit_bundle( - &SanitizedBundle { transactions }, - recorder, - transaction_status_sender, - gossip_vote_sender, - qos_service, - bank_start, - execute_and_commit_timings, - max_bundle_retry_duration, - )?; - info!("successfully executed init txs"); - } - - let configured_tip_receiver = tip_manager.get_configured_tip_receiver(bank)?; - let my_tip_distribution_pda = tip_manager.get_my_tip_distribution_pda(bank.epoch()); - - if configured_tip_receiver != my_tip_distribution_pda { - info!( - "changing tip receiver from {:?} to {:?}", - configured_tip_receiver, my_tip_distribution_pda - ); - let sanitized_bundle = SanitizedBundle { - transactions: vec![tip_manager.change_tip_receiver_tx( - &my_tip_distribution_pda, - bank, - &cluster_info.keypair(), - )?], - }; - - Self::update_qos_and_execute_record_commit_bundle( - &sanitized_bundle, - recorder, - transaction_status_sender, - gossip_vote_sender, - qos_service, - bank_start, - execute_and_commit_timings, - max_bundle_retry_duration, - )?; - } - Ok(()) - } - - /// Handles tip account management and executing, recording, and committing bundles - #[allow(clippy::too_many_arguments)] - fn handle_tip_and_execute_record_commit_bundle( - sanitized_bundle: &SanitizedBundle, - tip_manager: &TipManager, - bank_start: &BankStart, - qos_service: &QosService, - recorder: &TransactionRecorder, - transaction_status_sender: &Option, - gossip_vote_sender: &ReplayVoteSender, - cluster_info: &Arc, - max_bundle_retry_duration: &Duration, - execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, - ) -> BundleExecutionResult<()> { - let _lock = tip_manager.lock(); - let tip_pdas = tip_manager.get_tip_accounts(); - if Self::bundle_touches_tip_pdas(&sanitized_bundle.transactions, &tip_pdas) { - Self::maybe_initialize_and_change_tip_receiver( - bank_start, - tip_manager, - qos_service, - recorder, - transaction_status_sender, - execute_and_commit_timings, - gossip_vote_sender, - cluster_info, - max_bundle_retry_duration, - )?; - info!("successfully changed tip receiver"); - } - Self::update_qos_and_execute_record_commit_bundle( - sanitized_bundle, - recorder, - transaction_status_sender, - gossip_vote_sender, - qos_service, - bank_start, - execute_and_commit_timings, - max_bundle_retry_duration, - ) + Ok(transactions) } /// Execute all unprocessed bundles until no more left or POH max tick height is reached. @@ -802,7 +721,8 @@ impl BundleStage { tip_manager: &TipManager, max_bundle_retry_duration: &Duration, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, - ) -> BundleExecutionResult<()> { + last_tip_update_slot: &mut Slot, + ) -> BundleStageResult<()> { // Drain all unprocessed bundles, turn to sanitized_bundles, lock them all, then process // until max proof-of-history tick let sanitized_bundles: VecDeque<(PacketBundle, SanitizedBundle)> = unprocessed_bundles @@ -817,7 +737,7 @@ impl BundleStage { ) { Ok(sanitized_bundle) => Some((packet_bundle, sanitized_bundle)), Err(e) => { - warn!( + debug!( "failed to sanitize bundle uuid: {:?} error: {:?}", packet_bundle.uuid, e ); @@ -827,11 +747,21 @@ impl BundleStage { }) .collect(); + let tip_pdas = tip_manager.get_tip_accounts(); + // Prepare locked bundles, which will RW lock accounts in sanitized_bundles so - // BankingStage can't lock them - let locked_bundles = sanitized_bundles.iter().map(|(_, sanitized_bundle)| { - bundle_account_locker.prepare_locked_bundle(sanitized_bundle) - }); + // BankingStage can't lock them. This adds a layer of protection since a transaction in a bundle + // will not hold the AccountLocks through TransactionBatch across load-execute-commit cycle. + // We collect here to ensure that all of the bundles are locked ahead of time for priority over + // BankingStage + #[allow(clippy::needless_collect)] + let locked_bundles: Vec> = sanitized_bundles + .iter() + .map(|(_, sanitized_bundle)| { + bundle_account_locker + .prepare_locked_bundle(sanitized_bundle, &bank_start.working_bank) + }) + .collect(); let execution_results = locked_bundles.into_iter().map(|maybe_locked_bundle| { let locked_bundle = maybe_locked_bundle.map_err(|_| BundleExecutionError::LockError)?; @@ -841,17 +771,90 @@ impl BundleStage { ) { Err(BundleExecutionError::PohMaxHeightError) } else { - Self::handle_tip_and_execute_record_commit_bundle( - locked_bundle.sanitized_bundle(), - tip_manager, - bank_start, - qos_service, + let sanitized_bundle = locked_bundle.sanitized_bundle(); + + // if bundle touches tip account, need to make sure the tip-related accounts are initialized + // and the tip receiver is set correctly so tips in any bundles executed go to this leader + // instead of the last. + if Self::bundle_touches_tip_pdas(&sanitized_bundle.transactions, &tip_pdas) + && bank_start.working_bank.slot() != *last_tip_update_slot + { + let initialize_tip_accounts_bundle = SanitizedBundle { + transactions: Self::get_initialize_tip_accounts_transactions( + &bank_start.working_bank, + tip_manager, + cluster_info, + )?, + }; + { + let locked_init_tip_bundle = bundle_account_locker + .prepare_locked_bundle( + &initialize_tip_accounts_bundle, + &bank_start.working_bank, + ) + .map_err(|_| BundleExecutionError::LockError)?; + Self::update_qos_and_execute_record_commit_bundle( + locked_init_tip_bundle.sanitized_bundle(), + recorder, + transaction_status_sender, + gossip_vote_sender, + qos_service, + bank_start, + execute_and_commit_timings, + max_bundle_retry_duration, + )?; + } + + // change tip receiver, draining tips to the previous tip_receiver in the process + // note that this needs to happen after the above tip-related bundle initializes + // config accounts because get_configured_tip_receiver relies on an account + // existing in the bank + let configured_tip_receiver = + tip_manager.get_configured_tip_receiver(&bank_start.working_bank)?; + let my_tip_distribution_pda = + tip_manager.get_my_tip_distribution_pda(bank_start.working_bank.epoch()); + if configured_tip_receiver != my_tip_distribution_pda { + info!( + "changing tip receiver from {} to {}", + configured_tip_receiver, my_tip_distribution_pda + ); + + let change_tip_receiver_bundle = SanitizedBundle { + transactions: vec![tip_manager.change_tip_receiver_tx( + &my_tip_distribution_pda, + &bank_start.working_bank, + &cluster_info.keypair(), + )?], + }; + let locked_change_tip_receiver_bundle = bundle_account_locker + .prepare_locked_bundle( + &change_tip_receiver_bundle, + &bank_start.working_bank, + ) + .map_err(|_| BundleExecutionError::LockError)?; + Self::update_qos_and_execute_record_commit_bundle( + locked_change_tip_receiver_bundle.sanitized_bundle(), + recorder, + transaction_status_sender, + gossip_vote_sender, + qos_service, + bank_start, + execute_and_commit_timings, + max_bundle_retry_duration, + )?; + } + + *last_tip_update_slot = bank_start.working_bank.slot(); + } + Self::update_qos_and_execute_record_commit_bundle( + sanitized_bundle, recorder, transaction_status_sender, gossip_vote_sender, - cluster_info, - max_bundle_retry_duration, + qos_service, + bank_start, execute_and_commit_timings, + max_bundle_retry_duration, ) } }); @@ -908,6 +911,7 @@ impl BundleStage { // Bundles can't mention any accounts related to consensus let mut consensus_accounts_cache: HashSet = HashSet::new(); let mut last_consensus_update = Epoch::default(); + let mut last_tip_update_slot = Slot::default(); let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); @@ -929,13 +933,14 @@ impl BundleStage { match (working_bank_start, would_be_leader_soon) { // leader now, insert new read bundles + as many as can read then return bank (Some(bank_start), _) => { - unprocessed_bundles.extend(bundles); - unprocessed_bundles.extend(bundle_receiver.try_iter().flatten()); Self::maybe_update_consensus_cache( &bank_start.working_bank, &mut consensus_accounts_cache, &mut last_consensus_update, ); + + unprocessed_bundles.extend(bundles); + unprocessed_bundles.extend(bundle_receiver.try_iter().flatten()); match Self::execute_bundles_until_empty_or_end_of_slot( &mut bundle_account_locker, &mut unprocessed_bundles, @@ -950,6 +955,7 @@ impl BundleStage { &tip_manager, &max_bundle_retry_duration, &mut execute_and_commit_timings, + &mut last_tip_update_slot, ) { Ok(_) => { // keep going @@ -975,8 +981,8 @@ impl BundleStage { unprocessed_bundles.iter().map(|p| p.uuid).collect(); unprocessed_bundles.clear(); - info!("dropping new bundles: {:?}", new_dropped_bundles); - info!("dropping old bundles: {:?}", old_dropped_bundles); + debug!("dropping new bundles: {:?}", new_dropped_bundles); + debug!("dropping old bundles: {:?}", old_dropped_bundles); } } } @@ -1048,7 +1054,7 @@ impl BundleStage { recorder: &TransactionRecorder, bank_slot: Slot, mixins_txs: Vec<(Hash, Vec)>, - ) -> BundleExecutionResult> { + ) -> BundleStageResult> { match recorder.record(bank_slot, mixins_txs) { Ok(maybe_tx_index) => Ok(maybe_tx_index), Err(PohRecorderError::MaxHeightReached) => Err(BundleExecutionError::PohMaxHeightError), diff --git a/core/src/proxy/auth.rs b/core/src/proxy/auth.rs index 23a7f76149..f8d1c196d4 100644 --- a/core/src/proxy/auth.rs +++ b/core/src/proxy/auth.rs @@ -71,7 +71,11 @@ pub(crate) mod token_manager { } } Err(e) => { - error!("error connecting to auth service: {}", e); + error!( + "error connecting to auth service url: {} error: {}", + auth_service_endpoint.uri(), + e + ); sleep(retry_interval).await; } } diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index e098ff2c9c..860c7417af 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -253,7 +253,6 @@ impl BlockEngineStage { bundle_sender: &Sender>, ) -> crate::proxy::Result<()> { let bundles_response = maybe_bundles_response?.ok_or(ProxyError::GrpcStreamDisconnected)?; - info!("received block-engine bundles"); let bundles: Vec = bundles_response .bundles .into_iter() @@ -283,7 +282,6 @@ impl BlockEngineStage { trust_packets: bool, ) -> crate::proxy::Result<()> { if let Some(batch) = resp.batch { - info!("received block-engine packets"); let packet_batch = PacketBatch::new( batch .packets diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index 96aa116cef..370049a5c6 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -306,8 +306,6 @@ impl RelayerStage { warn!("Received an empty subscribe_packets msg."); } Some(relayer::subscribe_packets_response::Msg::Batch(proto_batch)) => { - info!("packet batch received"); - let packet_batch = PacketBatch::new( proto_batch .packets @@ -327,8 +325,6 @@ impl RelayerStage { } } Some(relayer::subscribe_packets_response::Msg::Heartbeat(_)) => { - debug!("heartbeat received"); - *last_heartbeat_ts = Instant::now(); heartbeat_tx .send(heartbeat_event) diff --git a/core/src/tip_manager.rs b/core/src/tip_manager.rs index b7eee21750..9477abd0de 100644 --- a/core/src/tip_manager.rs +++ b/core/src/tip_manager.rs @@ -181,7 +181,7 @@ impl TipManager { } /// Given a bank, returns the current `tip_receiver` configured with the tip-payment program. - pub fn get_configured_tip_receiver(&self, bank: &Arc) -> Result { + pub fn get_configured_tip_receiver(&self, bank: &Bank) -> Result { Ok(self.get_tip_payment_config_account(bank)?.tip_receiver) } @@ -198,7 +198,7 @@ impl TipManager { ]) } - pub fn get_tip_payment_config_account(&self, bank: &Arc) -> Result { + pub fn get_tip_payment_config_account(&self, bank: &Bank) -> Result { let config_data = bank .get_account(&self.tip_payment_program_info.config_pda_bump.0) .ok_or(TipPaymentError::AccountMissing( @@ -269,7 +269,7 @@ impl TipManager { } /// Returns whether or not the tip-payment program should be initialized. - pub fn should_initialize_tip_payment_program(&self, bank: &Arc) -> bool { + pub fn should_initialize_tip_payment_program(&self, bank: &Bank) -> bool { match bank.get_account(&self.tip_payment_config_pubkey()) { None => true, Some(account) => account.owner() != &self.tip_payment_program_info.program_id, @@ -277,7 +277,7 @@ impl TipManager { } /// Returns whether or not the tip-distribution program's [Config] PDA should be initialized. - pub fn should_initialize_tip_distribution_config(&self, bank: &Arc) -> bool { + pub fn should_initialize_tip_distribution_config(&self, bank: &Bank) -> bool { match bank.get_account(&self.tip_distribution_config_pubkey()) { None => true, Some(account) => account.owner() != &self.tip_distribution_program_info.program_id, @@ -285,7 +285,7 @@ impl TipManager { } /// Returns whether or not the current [TipDistributionAccount] PDA should be initialized for this epoch. - pub fn should_init_tip_distribution_account(&self, bank: &Arc) -> bool { + pub fn should_init_tip_distribution_account(&self, bank: &Bank) -> bool { let pda = derive_tip_distribution_account_address( &self.tip_distribution_program_info.program_id, &self.tip_distribution_account_config.vote_account, @@ -375,7 +375,7 @@ impl TipManager { pub fn change_tip_receiver_tx( &self, new_tip_receiver: &Pubkey, - bank: &Arc, + bank: &Bank, keypair: &Keypair, ) -> Result { let config = self.get_tip_payment_config_account(bank)?; diff --git a/dev/Dockerfile b/dev/Dockerfile index 452a723403..ffb6a85f0f 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -26,12 +26,16 @@ RUN set -x \ && rm -f $PROTOC_ZIP -# Uses docker buildkit to cache the image. -# /usr/local/cargo/git needed for crossbeam patch WORKDIR /solana COPY . . RUN mkdir -p docker-output + +ARG ci_commit +ENV CI_COMMIT=$ci_commit + +# Uses docker buildkit to cache the image. +# /usr/local/cargo/git needed for crossbeam patch RUN --mount=type=cache,mode=0777,target=/solana/target \ --mount=type=cache,mode=0777,target=/usr/local/cargo/registry \ --mount=type=cache,mode=0777,target=/usr/local/cargo/git \ - cargo build --verbose --release && cp target/release/solana* ./docker-output + cargo build --release && cp target/release/solana* ./docker-output diff --git a/f b/f index 62262d33af..9f276a138f 100755 --- a/f +++ b/f @@ -5,7 +5,12 @@ set -eux SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +GIT_SHA="$(git describe --always --dirty)" + +echo $GIT_SHA + DOCKER_BUILDKIT=1 docker build \ + --build-arg ci_commit=$GIT_SHA \ -t jitolabs/build-solana \ -f dev/Dockerfile . \ --progress=plain diff --git a/jito-programs b/jito-programs index 8446fdda50..2cb044acb4 160000 --- a/jito-programs +++ b/jito-programs @@ -1 +1 @@ -Subproject commit 8446fdda50e0dde88f43a07e423c1da25b25a24f +Subproject commit 2cb044acb45c1f787b2cd77aedbdc8e619408f1c diff --git a/s b/s index 643be5732d..bb2c216805 100755 --- a/s +++ b/s @@ -12,5 +12,5 @@ fi echo "Syncing to host: $HOST" # sync + build -rsync -avh --delete --exclude .git --exclude target "$SCRIPT_DIR" "$HOST":~/ +rsync -avh --delete --exclude target "$SCRIPT_DIR" "$HOST":~/ #ssh $HOST "source ~/.profile && cd mev-solana-priv && cargo b --release"