Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
rebase builds
Browse files Browse the repository at this point in the history
Implement an mt safe bank and batch programs into single atomic transactions.

fix entry processing

fmt

once!

cleanup and remove clone

recv poh until response arrives

handle recverrors

fix benches

get rid of answer, use 10 threads because math is simpler

coalesce the extra thread switch

optimizing

cleanup

register faster

cleanup

fixed test

sdk serialize

fixed benches
  • Loading branch information
aeyakovenko committed Sep 25, 2018
1 parent 680072e commit 19a1a9e
Show file tree
Hide file tree
Showing 11 changed files with 1,028 additions and 762 deletions.
2 changes: 1 addition & 1 deletion benches/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.
bank.clear_signatures();
let results = bank.process_transactions(transactions.clone());
let results = bank.process_transactions(&transactions);
assert!(results.iter().all(Result::is_ok));
})
}
298 changes: 121 additions & 177 deletions benches/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,25 @@
#![feature(test)]
extern crate bincode;
extern crate rand;
extern crate rayon;
extern crate solana;
extern crate test;

use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana::bank::Bank;
use solana::banking_stage::BankingStage;
use solana::banking_stage::{BankingStage, NUM_THREADS};
use solana::entry::Entry;
use solana::mint::Mint;
use solana::packet::{to_packets_chunked, PacketRecycler};
use solana::poh_service::PohService;
use solana::signature::{Keypair, KeypairUtil};
use solana::signature::{KeypairUtil, Pubkey, Signature};
use solana::transaction::Transaction;
use std::iter;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use std::time::Duration;
use test::Bencher;

// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{Keypair, KeypairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// fn bench_process_transactions(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = Keypair::new();
// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_transaction(&tx).unwrap();
//
// let rando1 = Keypair::new();
// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_transaction(&tx).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(banking_stage.process_transactions(transactions).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(banking_stage.historian_input);
// let entries: Vec<Entry> = banking_stage.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].transactions.len(), txs as usize);
//
// println!("{} tps", tps);
// }

fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {
let mut total = 0;
loop {
Expand All @@ -101,132 +40,137 @@ fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {

#[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let tx = 10_000_usize;
let txes = 1000 * NUM_THREADS;
let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total);
let num_dst_accounts = 8 * 1024;
let num_src_accounts = 8 * 1024;

let srckeys: Vec<_> = (0..num_src_accounts).map(|_| Keypair::new()).collect();
let dstkeys: Vec<_> = (0..num_dst_accounts)
.map(|_| Keypair::new().pubkey())
.collect();

let transactions: Vec<_> = (0..tx)
.map(|i| {
Transaction::new(
&srckeys[i % num_src_accounts],
dstkeys[i % num_dst_accounts],
i as i64,
mint.last_id(),
)
}).collect();

let (verified_sender, verified_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let packet_recycler = PacketRecycler::default();

let setup_transactions: Vec<_> = (0..num_src_accounts)
.map(|i| {
Transaction::new(
&mint.keypair(),
srckeys[i].pubkey(),
mint_total / num_src_accounts as i64,
mint.last_id(),
)
let bank = Arc::new(Bank::new(&mint));
let dummy = Transaction::new(&mint.keypair(), mint.keypair().pubkey(), 1, mint.last_id());
let transactions: Vec<_> = (0..txes)
.into_par_iter()
.map(|_| {
let mut new = dummy.clone();
let from: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let to: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
new.keys[0] = Pubkey::new(&from[0..32]);
new.keys[1] = Pubkey::new(&to[0..32]);
new.signature = Signature::new(&sig[0..64]);
new
}).collect();

transactions.iter().for_each(|tx| {
let fund = Transaction::new(
&mint.keypair(),
tx.keys[0],
mint_total / txes as i64,
mint.last_id(),
);
assert!(bank.process_transaction(&fund).is_ok());
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = x.read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
let (_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver, None);
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));

let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None);

let verified_setup: Vec<_> =
to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();

verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();

check_txs(&entry_receiver, num_src_accounts);

let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();

verified_sender.send(verified).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();

check_txs(&entry_receiver, tx);
for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes);
bank.clear_signatures();
bank.register_entry_id(&mint.last_id());
});
}

#[bench]
fn bench_banking_stage_single_from(bencher: &mut Bencher) {
let tx = 10_000_usize;
let mint = Mint::new(1_000_000_000_000);
let mut pubkeys = Vec::new();
let num_keys = 8;
for _ in 0..num_keys {
pubkeys.push(Keypair::new().pubkey());
}

let transactions: Vec<_> = (0..tx)
.into_par_iter()
.map(|i| {
Transaction::new(
&mint.keypair(),
pubkeys[i % num_keys],
i as i64,
mint.last_id(),
)
}).collect();
fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
//use solana::logger;
//logger::setup();
let progs = 5;
let txes = 1000 * NUM_THREADS;
let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total);

let (verified_sender, verified_receiver) = channel();
let (entry_sender, entry_receiver) = channel();
let packet_recycler = PacketRecycler::default();

let bank = Arc::new(Bank::new(&mint));
let dummy = Transaction::new(&mint.keypair(), mint.keypair().pubkey(), 1, mint.last_id());
let transactions: Vec<_> = (0..txes)
.into_par_iter()
.map(|_| {
let mut new = dummy.clone();
let from: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
let to: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
new.keys[0] = Pubkey::new(&from[0..32]);
new.keys[1] = Pubkey::new(&to[0..32]);
let prog = new.programs[0].clone();
for i in 1..progs {
//generate programs that spend to random keys
let to: Vec<u8> = (0..32).map(|_| thread_rng().gen()).collect();
let to_key = Pubkey::new(&to[0..32]);
new.keys.push(to_key);
assert_eq!(new.keys.len(), i + 2);
new.programs.push(prog.clone());
assert_eq!(new.programs.len(), i + 1);
new.programs[i].accounts[1] = 1 + i as u8;
assert_eq!(new.keys[new.programs[i].accounts[1] as usize], to_key);
}
assert_eq!(new.programs.len(), progs);
new.signature = Signature::new(&sig[0..64]);
new
}).collect();
transactions.iter().for_each(|tx| {
let fund = Transaction::new(
&mint.keypair(),
tx.keys[0],
mint_total / txes as i64,
mint.last_id(),
);
assert!(bank.process_transaction(&fund).is_ok());
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 96)
.into_iter()
.map(|x| {
let len = x.read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
let (_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver, None);
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));

let (hash_sender, hash_receiver) = channel();
let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None);

let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (x).read().packets.len();
(x, iter::repeat(1).take(len).collect())
}).collect();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
&bank,
&hash_sender,
&poh_receiver,
&verified_receiver,
&entry_sender,
).unwrap();

check_txs(&entry_receiver, tx);
for v in verified.chunks(verified.len() / NUM_THREADS) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes);
bank.clear_signatures();
// make sure the transactions are still valid
bank.register_entry_id(&mint.last_id());
});
}
Loading

0 comments on commit 19a1a9e

Please sign in to comment.