Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Avoid restarting services on leader -> validator transition #1599

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use solana::transaction::Transaction;
use solana_program_interface::pubkey::Pubkey;
use std::iter;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use test::Bencher;

Expand Down Expand Up @@ -49,7 +49,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let mint = Mint::new(mint_total);

let (verified_sender, verified_receiver) = channel();
let bank = Arc::new(Bank::new(&mint));
let bank = Arc::new(RwLock::new(Bank::new(&mint)));
let dummy = Transaction::system_move(
&mint.keypair(),
mint.keypair().pubkey(),
Expand Down Expand Up @@ -78,20 +78,20 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
mint.last_id(),
0,
);
assert!(bank.process_transaction(&fund).is_ok());
assert!(bank.read().unwrap().process_transaction(&fund).is_ok());
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
let res = bank.read().unwrap().process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
let res = bank.read().unwrap().process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 192)
.into_iter()
.map(|x| {
Expand All @@ -110,19 +110,25 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let mut id = mint.last_id();
for _ in 0..MAX_ENTRY_IDS {
id = hash(&id.as_ref());
bank.register_entry_id(&id);
bank.read().unwrap().register_entry_id(&id);
}

bencher.iter(move || {
// make sure the tx last id is still registered
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_entry_id(&mint.last_id());
if bank
.read()
.unwrap()
.count_valid_ids(&[mint.last_id()])
.len()
== 0
{
bank.read().unwrap().register_entry_id(&mint.last_id());
}
for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes);
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
});
}

Expand All @@ -134,7 +140,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let mint = Mint::new(mint_total);

let (verified_sender, verified_receiver) = channel();
let bank = Arc::new(Bank::new(&mint));
let bank = Arc::new(RwLock::new(Bank::new(&mint)));
let dummy = Transaction::system_move(
&mint.keypair(),
mint.keypair().pubkey(),
Expand Down Expand Up @@ -179,20 +185,20 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
mint.last_id(),
0,
);
assert!(bank.process_transaction(&fund).is_ok());
assert!(bank.read().unwrap().process_transaction(&fund).is_ok());
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
let res = bank.read().unwrap().process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
let res = bank.read().unwrap().process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 96)
.into_iter()
.map(|x| {
Expand All @@ -211,18 +217,24 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let mut id = mint.last_id();
for _ in 0..MAX_ENTRY_IDS {
id = hash(&id.as_ref());
bank.register_entry_id(&id);
bank.read().unwrap().register_entry_id(&id);
}

bencher.iter(move || {
// make sure the transactions are still valid
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_entry_id(&mint.last_id());
if bank
.read()
.unwrap()
.count_valid_ids(&[mint.last_id()])
.len()
== 0
{
bank.read().unwrap().register_entry_id(&mint.last_id());
}
for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes);
bank.clear_signatures();
bank.read().unwrap().clear_signatures();
});
}
8 changes: 5 additions & 3 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,11 @@ impl Bank {
fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet<Pubkey>) {
match result {
Err(BankError::AccountInUse) => (),
_ => for k in &tx.account_keys {
account_locks.remove(k);
},
_ => {
for k in &tx.account_keys {
account_locks.remove(k);
}
}
}
}

Expand Down
59 changes: 33 additions & 26 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sigverify_stage::VerifiedPackets;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand Down Expand Up @@ -58,7 +58,7 @@ impl Default for Config {
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
pub fn new(
bank: &Arc<Bank>,
bank: &Arc<RwLock<Bank>>,
verified_receiver: Receiver<VerifiedPackets>,
config: Config,
last_entry_id: &Hash,
Expand Down Expand Up @@ -210,7 +210,7 @@ impl BankingStage {
}

fn process_transactions(
bank: &Arc<Bank>,
bank: &Arc<RwLock<Bank>>,
transactions: &[Transaction],
poh: &PohRecorder,
) -> Result<()> {
Expand All @@ -219,7 +219,9 @@ impl BankingStage {
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]);

bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh)?;
bank.read()
.unwrap()
.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh)?;

chunk_start = chunk_end;
}
Expand All @@ -230,7 +232,7 @@ impl BankingStage {
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
pub fn process_packets(
bank: &Arc<Bank>,
bank: &Arc<RwLock<Bank>>,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &PohRecorder,
) -> Result<()> {
Expand Down Expand Up @@ -262,11 +264,13 @@ impl BankingStage {
.zip(vers)
.filter_map(|(tx, ver)| match tx {
None => None,
Some((tx, _addr)) => if tx.verify_refs() && ver != 0 {
Some(tx)
} else {
None
},
Some((tx, _addr)) => {
if tx.verify_refs() && ver != 0 {
Some(tx)
} else {
None
}
}
}).collect();
debug!("verified transactions {}", transactions.len());
Self::process_transactions(bank, &transactions, poh)?;
Expand Down Expand Up @@ -330,13 +334,13 @@ mod tests {

#[test]
fn test_banking_stage_shutdown1() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let bank = Arc::new(RwLock::new(Bank::new(&Mint::new(2))));
let (verified_sender, verified_receiver) = channel();
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
None,
);
Expand All @@ -349,13 +353,13 @@ mod tests {

#[test]
fn test_banking_stage_shutdown2() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let bank = Arc::new(RwLock::new(Bank::new(&Mint::new(2))));
let (_verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
None,
);
Expand All @@ -368,14 +372,14 @@ mod tests {

#[test]
fn test_banking_stage_tick() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let start_hash = bank.last_id();
let bank = Arc::new(RwLock::new(Bank::new(&Mint::new(2))));
let start_hash = bank.read().unwrap().last_id();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Config::Sleep(Duration::from_millis(1)),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
None,
);
Expand All @@ -385,7 +389,10 @@ mod tests {
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
assert!(entries.len() != 0);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id());
assert_eq!(
entries[entries.len() - 1].id,
bank.read().unwrap().last_id()
);
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
Expand All @@ -395,14 +402,14 @@ mod tests {
#[test]
fn test_banking_stage_entries_only() {
let mint = Mint::new(2);
let bank = Arc::new(Bank::new(&mint));
let start_hash = bank.last_id();
let bank = Arc::new(RwLock::new(Bank::new(&mint)));
let start_hash = bank.read().unwrap().last_id();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
None,
);
Expand All @@ -423,7 +430,7 @@ mod tests {

// glad they all fit
assert_eq!(packets.len(), 1);
verified_sender // tx, no_ver, anf
verified_sender // tx, no_ver, anf
.send(vec![(packets[0].clone(), vec![1u8, 0u8, 1u8])])
.unwrap();

Expand Down Expand Up @@ -451,13 +458,13 @@ mod tests {
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let bank = Arc::new(Bank::new(&mint));
let bank = Arc::new(RwLock::new(Bank::new(&mint)));
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
None,
);
Expand Down Expand Up @@ -504,14 +511,14 @@ mod tests {
// with reason BankingStageReturnType::LeaderRotation
#[test]
fn test_max_tick_height_shutdown() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let bank = Arc::new(RwLock::new(Bank::new(&Mint::new(2))));
let (_verified_sender_, verified_receiver) = channel();
let max_tick_height = 10;
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
&bank.read().unwrap().last_id(),
0,
Some(max_tick_height),
);
Expand Down
Loading