Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

replay stage feed back program cost (backport #17731) #18694

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::BankingStage;
use solana_core::{banking_stage::BankingStage, cost_model::CostModel};
use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node};
use solana_ledger::{
blockstore::Blockstore,
Expand All @@ -26,7 +26,7 @@ use solana_sdk::{
transaction::Transaction,
};
use std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex},
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock},
thread::sleep,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -224,6 +224,7 @@ fn main() {
vote_receiver,
None,
replay_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
13 changes: 13 additions & 0 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
<<<<<<< HEAD
use std::sync::Arc;
=======
use std::sync::{Arc, Mutex, RwLock};
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
use std::time::{Duration, Instant};
use test::Bencher;

Expand Down Expand Up @@ -91,6 +95,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);
});

Expand Down Expand Up @@ -211,6 +220,10 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
vote_receiver,
None,
s,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
147 changes: 146 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
<<<<<<< HEAD
use crate::packet_hasher::PacketHasher;
=======
use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher};
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -51,7 +55,7 @@ use std::{
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
Expand Down Expand Up @@ -222,7 +226,36 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
<<<<<<< HEAD
=======
Self::new_with_cost_limit(
cluster_info,
poh_recorder,
verified_receiver,
verified_vote_receiver,
transaction_status_sender,
gossip_vote_sender,
cost_model,
)
}

pub fn new_with_cost_limit(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
// shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits.
let cost_tracker = Arc::new(Mutex::new(CostTracker::new(
cost_model.read().unwrap().get_account_cost_limit(),
cost_model.read().unwrap().get_block_cost_limit(),
)));
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
Self::new_num_threads(
cluster_info,
poh_recorder,
Expand All @@ -242,6 +275,11 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -340,6 +378,11 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
Expand Down Expand Up @@ -477,6 +520,11 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) -> BufferedPacketsDecision {
let bank_start;
let (
Expand Down Expand Up @@ -587,6 +635,11 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand Down Expand Up @@ -977,15 +1030,44 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
<<<<<<< HEAD
) -> (Vec<HashedTransaction<'static>>, Vec<usize>) {
transaction_indexes
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
// Making a snapshot of shared cost_tracker by clone(), drop lock immediately.
// Local copy `cost_tracker` is used to filter transactions by cost.
// Shared cost_tracker is updated later by processed transactions confirmed by bank.
let mut cost_tracker = cost_tracker.lock().unwrap().clone();

let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
let (filtered_transactions, filter_transaction_packet_indexes) = transaction_indexes
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
.iter()
.filter_map(|tx_index| {
let p = &msgs.packets[*tx_index];
let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
if secp256k1_program_enabled {
tx.verify_precompiles().ok()?;
}
<<<<<<< HEAD
=======

// Get transaction cost via cost_model; try to add cost to
// local copy of cost_tracker, if suceeded, local copy is updated
// and transaction added to valid list; otherwise, transaction is
// added to retry list. No locking here.
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let result = cost_tracker.try_add(tx_cost);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
retryable_transaction_packet_indexes.push(*tx_index);
return None;
}

>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
Expand Down Expand Up @@ -1044,6 +1126,11 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
Expand Down Expand Up @@ -1072,7 +1159,19 @@ impl BankingStage {
);
process_tx_time.stop();

<<<<<<< HEAD
let unprocessed_tx_count = unprocessed_tx_indexes.len();
=======
// applying cost of processed transactions to shared cost_tracker
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx.transaction());
let mut guard = cost_tracker.lock().unwrap();
let _result = guard.try_add(tx_cost);
drop(guard);
}
});
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))

let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
Expand Down Expand Up @@ -1107,6 +1206,11 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets.
Expand Down Expand Up @@ -1172,6 +1276,11 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
<<<<<<< HEAD
=======
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
Expand Down Expand Up @@ -1387,6 +1496,7 @@ fn next_leader_tpu_forwards(
#[cfg(test)]
mod tests {
use super::*;
use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::cluster_info::Node;
Expand Down Expand Up @@ -1447,6 +1557,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
drop(verified_sender);
drop(vote_sender);
Expand Down Expand Up @@ -1492,6 +1603,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
trace!("sending bank");
drop(verified_sender);
Expand Down Expand Up @@ -1561,6 +1673,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1709,6 +1822,14 @@ mod tests {
2,
None,
gossip_vote_sender,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -2529,6 +2650,14 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
Expand All @@ -2545,6 +2674,14 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
Expand Down Expand Up @@ -2610,6 +2747,14 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
<<<<<<< HEAD
=======
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
>>>>>>> ae27fcbcd (replay stage feed back program cost (#17731))
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down
Loading