Skip to content

Commit

Permalink
replay stage feed back program cost (solana-labs#17731)
Browse files Browse the repository at this point in the history
* replay stage feeds back realtime per-program execution cost to cost model;

* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;

* changed cost unit to microsecond, using value collected from mainnet;

* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
  • Loading branch information
tao-stones committed Jul 16, 2021
1 parent 4f9abf9 commit 62fae49
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 176 deletions.
5 changes: 3 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::BankingStage;
use solana_core::{banking_stage::BankingStage, cost_model::CostModel};
use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node};
use solana_ledger::{
blockstore::Blockstore,
Expand All @@ -26,7 +26,7 @@ use solana_sdk::{
transaction::Transaction,
};
use std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex},
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock},
thread::sleep,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -224,6 +224,7 @@ fn main() {
vote_receiver,
None,
replay_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
9 changes: 4 additions & 5 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use test::Bencher;

Expand Down Expand Up @@ -94,8 +94,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(Mutex::new(CostTracker::new(std::u32::MAX, std::u32::MAX))),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
);
});

Expand Down Expand Up @@ -216,8 +216,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
vote_receiver,
None,
s,
std::u32::MAX,
std::u32::MAX,
&Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
57 changes: 26 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST},
cost_tracker::CostTracker,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
};
use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -57,7 +51,7 @@ use std::{
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
Expand Down Expand Up @@ -228,6 +222,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
Self::new_with_cost_limit(
cluster_info,
Expand All @@ -236,8 +231,7 @@ impl BankingStage {
verified_vote_receiver,
transaction_status_sender,
gossip_vote_sender,
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
cost_model,
)
}

Expand All @@ -248,15 +242,12 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
account_cost_limit: u32,
block_cost_limit: u32,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
// shared immutable 'cost_model' that calcuates transaction costs
// shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits.
let cost_model = Arc::new(CostModel::new(account_cost_limit, block_cost_limit));
let cost_tracker = Arc::new(Mutex::new(CostTracker::new(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
cost_model.read().unwrap().get_account_cost_limit(),
cost_model.read().unwrap().get_block_cost_limit(),
)));
Self::new_num_threads(
cluster_info,
Expand All @@ -279,7 +270,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
Expand Down Expand Up @@ -388,7 +379,7 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let mut rebuffered_packets_len = 0;
Expand Down Expand Up @@ -532,7 +523,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> BufferedPacketsDecision {
let bank_start;
Expand Down Expand Up @@ -649,7 +640,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
Expand Down Expand Up @@ -1046,7 +1037,7 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
// Making a snapshot of shared cost_tracker by clone(), drop lock immediately.
Expand All @@ -1064,11 +1055,11 @@ impl BankingStage {
tx.verify_precompiles().ok()?;
}

// Get transaction cost via immutable cost_model; try to add cost to
// Get transaction cost via cost_model; try to add cost to
// local copy of cost_tracker, if suceeded, local copy is updated
// and transaction added to valid list; otherwise, transaction is
// added to retry list. No locking here.
let tx_cost = cost_model.calculate_cost(&tx);
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let result = cost_tracker.try_add(tx_cost);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
Expand Down Expand Up @@ -1141,7 +1132,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
Expand Down Expand Up @@ -1179,7 +1170,7 @@ impl BankingStage {
// applying cost of processed transactions to shared cost_tracker
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
let tx_cost = cost_model.calculate_cost(&tx.transaction());
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx.transaction());
let mut guard = cost_tracker.lock().unwrap();
let _result = guard.try_add(tx_cost);
drop(guard);
Expand Down Expand Up @@ -1223,7 +1214,7 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
Expand Down Expand Up @@ -1295,7 +1286,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
Expand Down Expand Up @@ -1517,6 +1508,7 @@ fn next_leader_tpu_forwards(
#[cfg(test)]
mod tests {
use super::*;
use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::cluster_info::Node;
Expand Down Expand Up @@ -1577,6 +1569,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
drop(verified_sender);
drop(vote_sender);
Expand Down Expand Up @@ -1622,6 +1615,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
trace!("sending bank");
drop(verified_sender);
Expand Down Expand Up @@ -1691,6 +1685,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1839,7 +1834,7 @@ mod tests {
2,
None,
gossip_vote_sender,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down Expand Up @@ -2664,7 +2659,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand All @@ -2685,7 +2680,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down Expand Up @@ -2755,7 +2750,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down
Loading

0 comments on commit 62fae49

Please sign in to comment.