Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tao-stones committed Oct 12, 2021
1 parent 45bd9ae commit 66f7ac6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
45 changes: 40 additions & 5 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use std::{
net::{SocketAddr, UdpSocket},
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, RwLockReadGuard},
sync::{Arc, Mutex, RwLock, RwLockReadGuard},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
Expand Down Expand Up @@ -283,7 +283,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
Self::new_num_threads(
cluster_info,
Expand All @@ -294,6 +294,7 @@ impl BankingStage {
Self::num_threads(),
transaction_status_sender,
gossip_vote_sender,
cost_model,
)
}

Expand All @@ -306,6 +307,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -340,6 +342,7 @@ impl BankingStage {
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
Expand All @@ -355,6 +358,7 @@ impl BankingStage {
gossip_vote_sender,
&duplicates,
&data_budget,
cost_model,
);
})
.unwrap()
Expand Down Expand Up @@ -856,7 +860,6 @@ impl BankingStage {
};

let mut execute_timings = ExecuteTimings::default();

let (
mut loaded_accounts,
results,
Expand Down Expand Up @@ -952,6 +955,17 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {

// TODO TAO - before locking account for txs, do
// 1. calculate TXs costs: cost_model.calculate_costs(txs) -> [transaction_cost]
// 2, combine txs and [transaction_costs] into txs_costs, do packing logic:
// for (tx, cost) { bank.cost_tracker_mutable().add(tx, cost)->result;
// for OK: tx into candidates_txs list;
// for err: tx into retryable_txs;
// 3. continue with candidate_txs to lock accounts and process_load_record.
// *passing [transaction_costs] with transactions, it will be used within _locked() to
// compare account balance, and drop txs if necessary*

let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
Expand Down Expand Up @@ -1091,6 +1105,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
demote_program_write_locks: bool,
votes_only: bool,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) {
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
Expand Down Expand Up @@ -1130,7 +1145,13 @@ impl BankingStage {
&& read_cost_tracker
.would_transaction_fit(
&tx,
demote_program_write_locks,
// TODO TAO - refactor transaction_cost to avoid vec<key>, use index[]
// instead; so it can be readGuard
// TODO TAO - refactor cost_tracker to not haing cost_model
cost_model
.read()
.unwrap()
.calculate_cost(&tx, demote_program_write_locks),
cost_tracker_stats,
)
.is_err()
Expand Down Expand Up @@ -1207,6 +1228,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker_stats: &mut CostTrackerStats,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
Expand All @@ -1219,6 +1241,7 @@ impl BankingStage {
banking_stage_stats,
bank.demote_program_write_locks(),
bank.vote_only_bank(),
cost_model,
cost_tracker_stats,
);
packet_conversion_time.stop();
Expand Down Expand Up @@ -1258,7 +1281,19 @@ impl BankingStage {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
bank.write_cost_tracker().unwrap().add_transaction_cost(
tx,
bank.demote_program_write_locks(),
// TODO TAO - refactor transaction_cost to avoid vec<key>, use index[]
// instead; so it can be readGuard
// TODO TAO - refactor cost_tracker to not haing cost_model
// TODO TAO - refactor to avoid second cost_model calculation. Should cache
// first calc results and re-use, also need to resue later for bankless when to
// convert to fee to check account balance.
cost_model
.read()
.unwrap()
.calculate_cost(
tx,
bank.demote_program_write_locks()
),
cost_tracker_stats,
);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Tpu {
verified_gossip_vote_packets_receiver,
transaction_status_sender,
replay_vote_sender,
cost_model,
cost_model.clone(),
);

let broadcast_stage = broadcast_type.new_broadcast_stage(
Expand Down
1 change: 1 addition & 0 deletions runtime/src/cost_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl CostTracker {
stats.block_cost += cost;
}

// TODO TAO - refactor, reset is no longer called, gonna to report stats in diff way.
pub fn reset_if_new_bank(&mut self, slot: Slot, stats: &mut CostTrackerStats) -> bool {
// report stats when slot changes
if slot != stats.bank_slot {
Expand Down

0 comments on commit 66f7ac6

Please sign in to comment.