Skip to content

Commit

Permalink
Use BundleAccountLocker when handling tip txs (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed Sep 22, 2022
1 parent b8d4e35 commit ea359c2
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 192 deletions.
2 changes: 1 addition & 1 deletion anchor
Submodule anchor updated from d1ca27 to 753264
12 changes: 6 additions & 6 deletions bootstrap
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ RUST_LOG=INFO \
NDEBUG=1 ./multinode-demo/bootstrap-validator.sh \
--wait-for-supermajority 0 \
--expected-bank-hash $bank_hash \
--block-engine-address http://0.0.0.0:13333 \
--block-engine-auth-service-address http://0.0.0.0:14444 \
--relayer-auth-service-address http://0.0.0.0:11226 \
--relayer-address http://0.0.0.0:11226 \
--block-engine-address http://127.0.0.1:1003 \
--block-engine-auth-service-address http://127.0.0.1:1005 \
--relayer-auth-service-address http://127.0.0.1:11226 \
--relayer-address http://127.0.0.1:11226 \
--rpc-pubsub-enable-block-subscription \
--enable-rpc-transaction-history \
--tip-payment-program-pubkey DThZmRNNXh7kvTQW9hXeGoWGPKktK8pgVAyoTLjH7UrT \
--tip-distribution-program-pubkey FjrdANjvo76aCYQ4kf9FM1R8aESUcEE6F8V7qyoVUQcM \
--commission-bps 0 \
--shred-receiver-address 0.0.0.0:13330 \
--shred-receiver-address 127.0.0.1:1002 \
--allow-private-addr \
--trust-relayer-packets \
--trust-block-engine-packets
--trust-block-engine-packets
22 changes: 11 additions & 11 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1530,19 +1530,19 @@ impl BankingStage {
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state.
// BundleStage prevents locking ALL accounts in ALL transactions in a bundle mid-execution
// to ensure that avoid race conditions
let mut lock_time = Measure::start("lock_time");

let read_locks = bundle_account_locker.read_locks();
let write_locks = bundle_account_locker.write_locks();

let batch = bank.prepare_sanitized_batch_with_results(
txs,
transactions_qos_results.iter(),
&read_locks,
&write_locks,
);
let batch = {
// BundleStage locks ALL accounts in ALL transactions in a bundle to avoid race
// conditions with BankingStage
let account_locks = bundle_account_locker.account_locks();
bank.prepare_sanitized_batch_with_results(
txs,
transactions_qos_results.iter(),
&account_locks.read_locks(),
&account_locks.write_locks(),
)
};
lock_time.stop();

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
Expand Down
74 changes: 46 additions & 28 deletions core/src/bundle_account_locker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
/// and commit the results before the bundle completes. By the time the bundle commits the new account
/// state for {A, B, C}, A and B would be incorrect and the entries containing the bundle would be
/// replayed improperly and that leader would have produced an invalid block.
use std::sync::{Arc, Mutex};
use {
solana_runtime::bank::Bank,
solana_sdk::{
bundle::sanitized::SanitizedBundle, pubkey::Pubkey, transaction::TransactionAccountLocks,
},
std::collections::{hash_map::Entry, HashMap, HashSet},
std::sync::{Arc, Mutex, MutexGuard},
};

#[derive(Debug)]
Expand All @@ -27,16 +28,19 @@ pub type BundleAccountLockerResult<T> = Result<T, BundleAccountLockerError>;
pub struct LockedBundle<'a, 'b> {
bundle_account_locker: &'a BundleAccountLocker,
sanitized_bundle: &'b SanitizedBundle,
bank: Arc<Bank>,
}

impl<'a, 'b> LockedBundle<'a, 'b> {
pub fn new(
bundle_account_locker: &'a BundleAccountLocker,
sanitized_bundle: &'b SanitizedBundle,
bank: &Arc<Bank>,
) -> Self {
Self {
bundle_account_locker,
sanitized_bundle,
bank: bank.clone(),
}
}

Expand All @@ -50,114 +54,128 @@ impl<'a, 'b> Drop for LockedBundle<'a, 'b> {
fn drop(&mut self) {
let _ = self
.bundle_account_locker
.unlock_bundle_accounts(self.sanitized_bundle);
.unlock_bundle_accounts(self.sanitized_bundle, &self.bank);
}
}

#[derive(Default, Clone)]
struct BundleAccountLocks {
read_locks: Arc<Mutex<HashMap<Pubkey, u64>>>,
write_locks: Arc<Mutex<HashMap<Pubkey, u64>>>,
pub struct BundleAccountLocks {
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
}

impl BundleAccountLocks {
pub fn read_locks(&self) -> HashSet<Pubkey> {
self.read_locks.lock().unwrap().keys().cloned().collect()
self.read_locks.keys().cloned().collect()
}

pub fn write_locks(&self) -> HashSet<Pubkey> {
self.write_locks.lock().unwrap().keys().cloned().collect()
self.write_locks.keys().cloned().collect()
}

pub fn lock_accounts(
&self,
&mut self,
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
) {
let mut read_locks_l = self.read_locks.lock().unwrap();
let mut write_locks_l = self.write_locks.lock().unwrap();
for (acc, count) in read_locks {
*read_locks_l.entry(acc).or_insert(0) += count;
*self.read_locks.entry(acc).or_insert(0) += count;
}
for (acc, count) in write_locks {
*write_locks_l.entry(acc).or_insert(0) += count;
*self.write_locks.entry(acc).or_insert(0) += count;
}
}

pub fn unlock_accounts(
&self,
&mut self,
read_locks: HashMap<Pubkey, u64>,
write_locks: HashMap<Pubkey, u64>,
) {
let mut read_locks_l = self.read_locks.lock().unwrap();
let mut write_locks_l = self.write_locks.lock().unwrap();

for (acc, count) in read_locks {
if let Entry::Occupied(mut entry) = read_locks_l.entry(acc) {
if let Entry::Occupied(mut entry) = self.read_locks.entry(acc) {
let val = entry.get_mut();
*val = val.saturating_sub(count);
if entry.get() == &0 {
let _ = entry.remove();
}
} else {
warn!("error unlocking read-locked account, account: {:?}", acc);
}
}
for (acc, count) in write_locks {
if let Entry::Occupied(mut entry) = write_locks_l.entry(acc) {
if let Entry::Occupied(mut entry) = self.write_locks.entry(acc) {
let val = entry.get_mut();
*val = val.saturating_sub(count);
if entry.get() == &0 {
let _ = entry.remove();
}
} else {
warn!("error unlocking write-locked account, account: {:?}", acc);
}
}
}
}

#[derive(Clone, Default)]
pub struct BundleAccountLocker {
account_locks: BundleAccountLocks,
account_locks: Arc<Mutex<BundleAccountLocks>>,
}

impl BundleAccountLocker {
/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn read_locks(&self) -> HashSet<Pubkey> {
self.account_locks.read_locks()
self.account_locks.lock().unwrap().read_locks()
}

/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn write_locks(&self) -> HashSet<Pubkey> {
self.account_locks.write_locks()
self.account_locks.lock().unwrap().write_locks()
}

/// used in BankingStage during TransactionBatch construction to ensure that BankingStage
/// doesn't lock anything currently locked in the BundleAccountLocker
pub fn account_locks(&self) -> MutexGuard<BundleAccountLocks> {
self.account_locks.lock().unwrap()
}

/// Prepares a locked bundle and returns a LockedBundle containing locked accounts.
/// When a LockedBundle is dropped, the accounts are automatically unlocked
pub fn prepare_locked_bundle<'a, 'b>(
&'a self,
sanitized_bundle: &'b SanitizedBundle,
bank: &Arc<Bank>,
) -> BundleAccountLockerResult<LockedBundle<'a, 'b>> {
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?;
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?;

self.account_locks.lock_accounts(read_locks, write_locks);
Ok(LockedBundle::new(self, sanitized_bundle))
self.account_locks
.lock()
.unwrap()
.lock_accounts(read_locks, write_locks);
Ok(LockedBundle::new(self, sanitized_bundle, bank))
}

/// Unlocks bundle accounts. Note that LockedBundle::drop will auto-drop the bundle account locks
fn unlock_bundle_accounts(
&self,
sanitized_bundle: &SanitizedBundle,
bank: &Bank,
) -> BundleAccountLockerResult<()> {
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle)?;
let (read_locks, write_locks) = Self::get_read_write_locks(sanitized_bundle, bank)?;

self.account_locks.unlock_accounts(read_locks, write_locks);
self.account_locks
.lock()
.unwrap()
.unlock_accounts(read_locks, write_locks);
Ok(())
}

/// Returns the read and write locks for this bundle
/// Each lock type contains a HashMap which maps Pubkey to number of locks held
fn get_read_write_locks(
bundle: &SanitizedBundle,
bank: &Bank,
) -> BundleAccountLockerResult<(HashMap<Pubkey, u64>, HashMap<Pubkey, u64>)> {
let transaction_locks: Vec<TransactionAccountLocks> = bundle
.transactions
Expand Down Expand Up @@ -265,7 +283,7 @@ mod tests {
.expect("sanitize bundle 1");

let locked_bundle0 = bundle_account_locker
.prepare_locked_bundle(&sanitized_bundle0)
.prepare_locked_bundle(&sanitized_bundle0, &bank)
.unwrap();

assert_eq!(
Expand All @@ -278,7 +296,7 @@ mod tests {
);

let locked_bundle1 = bundle_account_locker
.prepare_locked_bundle(&sanitized_bundle1)
.prepare_locked_bundle(&sanitized_bundle1, &bank)
.unwrap();
assert_eq!(
bundle_account_locker.write_locks(),
Expand Down
3 changes: 1 addition & 2 deletions core/src/bundle_sanitizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ pub fn get_sanitized_bundle(
MAX_PROCESSING_AGE,
&mut metrics,
);
if let Some(failure) = check_results.iter().find(|r| r.0.is_err()) {
warn!("bundle check failure: {:?}", failure);
if check_results.iter().any(|r| r.0.is_err()) {
return Err(BundleSanitizerError::FailedCheckResults(packet_bundle.uuid));
}

Expand Down
Loading

0 comments on commit ea359c2

Please sign in to comment.