Skip to content

Commit

Permalink
- add tpu live packets (eg, not buffered packets) states to qos metri…
Browse files Browse the repository at this point in the history
…cs reporting
  • Loading branch information
tao-stones committed Dec 8, 2021
1 parent 64cfe50 commit ac658f0
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 156 deletions.
132 changes: 14 additions & 118 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_metrics::inc_new_counter_info,
solana_perf::{
cuda_runtime::PinnedVec,
data_budget::DataBudget,
Expand Down Expand Up @@ -433,21 +433,13 @@ impl BankingStage {
original_unprocessed_indexes,
new_unprocessed_indexes,
)
// TODO TAO - does ^^^ call provides # of old txs that are dropped? Prolly needs
// this number
//
} else {
let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
{
// TODO TAO - msgs.packets.len() is number of packets in msgs.
// qos_service.accumulate_try_to_process_buffered_txs(msgs.packets.len(), id, working_bank);
// this counts the buffered txs (or packets) during the bank-life-time. If the
// recording bank is expired, the packets are to be re-queued.
//
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions(
&working_bank,
Expand All @@ -460,11 +452,6 @@ impl BankingStage {
banking_stage_stats,
qos_service,
);
// TODO TAO - here vvvv again, "processed < verified_txs_len" is used to
// indicate end-of-slot, which is exactly process_packets() does - it stops
// processingf when PoHMaxHeight reached or Bank halt, in that case, processed
// will < txs_len, but all unprocessed (retryable and those turly unprocessed)
// will be pushed into unprocessed_indexes[] for queueing
if processed < verified_txs_len
|| !Bank::should_bank_still_be_processing_txs(
&bank_creation_time,
Expand All @@ -478,14 +465,6 @@ impl BankingStage {
}
new_tx_count += processed;

// TODO TAO - qos_service.accumulated_processed_buffered_txs(processed, id, bank.slot())
//
// TODO TAO - qos_service.accumulated_rebuffered_txs(unprocessed_indexes.len(), id,
// bank.slot());
// this includes block by cost_limit, account_in_use etc; maybe want to
// report the causes as well.
// TODO TAO - can added refined drop count, and newly buffered count

// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packets_len += new_unprocessed_indexes.len();
Expand Down Expand Up @@ -968,9 +947,6 @@ impl BankingStage {
let batch =
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.into_iter());
lock_time.stop();
// TODO TAO - above ^^^ call does not provide metrics about how many accounts were locked
// successfully, which is an important number to indicate if there are account contention.
// Should add it.

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit and
// WouldExceedMaxAccountCostLimit
Expand Down Expand Up @@ -1337,7 +1313,6 @@ impl BankingStage {
count,
id,
);
inc_new_counter_debug!("banking_stage-transactions_received", count);
let mut proc_start = Measure::start("process_packets_transactions_process");
let mut new_tx_count = 0;

Expand All @@ -1346,13 +1321,11 @@ impl BankingStage {
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() {
// TODO TAO - received `mms` has many `msgs`, a `msgs` is a packets has many `packet`,
// a `packet` is one `tx` atm.
//
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank();
let working_bank_start = poh_recorder_bank.working_bank_start();
if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() {
qos_service.accumulate_tpu_buffered_packets_count(msgs.packets.len() as u64);
Self::push_unprocessed(
buffered_packets,
msgs,
Expand All @@ -1364,11 +1337,6 @@ impl BankingStage {
duplicates,
banking_stage_stats,
);
// TODO TAO - count the received txs that being pushed due to working_bank expired
// those txs will be picked up either by next working bank, or being forwarded
// qos_service.accumulate_queued_txs_due_to_working_bank_expire(msgs.packets.len(),
// id, None_bank);
// TODO TAO - can added refined drop count, and newly buffered count
continue;
}

Expand All @@ -1378,11 +1346,7 @@ impl BankingStage {
bank_creation_time,
} = &*working_bank_start.unwrap();

// TODO TAO - msgs.packets.len() is number of packets in msgs.
// qos_service.accumulate_ingress_txs(msgs.packets.len(), id, working_bank);
// this counts the received txs (or packets) during the bank-life-time. If the
// recording bank is expired, the packets are to be queued.
//
qos_service.accumulate_tpu_ingested_packets_count(msgs.packets.len() as u64);
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
working_bank,
Expand All @@ -1397,15 +1361,9 @@ impl BankingStage {
);

new_tx_count += processed;

// TODO TAO - so here are the numbers:
// 1. msgs.packets.len() -- # txs received
// 2. verified_txs_len -- # txs verified, diff from above are txs dropped due
// to in vote-only mode, or unsanitized txs
// 3. processed -- # txs being processed
// 4. unprocessed_indexes.len() -- # txs to be buffered,
// So: 3 + 4 == 2, 2 <= 1
// TODO TAO - qos_service.accumulated_processed_txs(processed, id, bank.slot())
qos_service.accumulated_verified_txs_count(verified_txs_len as u64);
qos_service.accumulated_processed_txs_count(processed as u64);
qos_service.accumulated_retryable_txs_count(unprocessed_indexes.len() as u64);

// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(
Expand All @@ -1419,43 +1377,9 @@ impl BankingStage {
duplicates,
banking_stage_stats,
);
// TODO TAO - numbers on push_unprocessed, keep in mind this function called on both
// live stream and buffered queue, hence there are re-counts
// 1. unprocessed_indexes.len() - is unprocessed from above
// 2. (hidden inside function) dropped_duplicated_packets_count - # txs
// silently dropped as they already exists in buffer when this function
// is called again (reason?)
// 3. dropped_packets_count - # of packets pushed out of FIFO buffer to make
// room for newly buffered pacckets
// 4. newly_buffered_packets_count - # of packets being added to buffer in
// this call.
// since push_unprocessed() is called at multiple spots, at metrics level it is hard
// to be exact 1 = 2 + 4, but should be close.
// If we really want to see how live stream part works, might be better to aggregate
// 1, 2, 4 in same bucket as before-call stats (case of dup these data in qos_stats)
//

// TODO TAO - qos_service.accumulated_unprocessed_txs(unprocessed_indexes.len(), id,
// bank.slot());
// this includes block by cost_limit, account_in_use etc; maybe want to
// report the causes as well.
// TODO TAO - can added refined drop count, and newly buffered count

// If there were retryable transactions, add the unexpired ones to the buffered queue
if processed < verified_txs_len {
// TODO TAO - this ^^^ logic here, is it assuming that normally calling
// process_packets_transactions() will process ALL txs passed in, therefore processed
// == verified_txs_len, meaning unprocessed_indexes.len() == 0. If so, it means the
// only reason there are unprocessed message is due to slot advancing, therefore
// following code will push all not-too-old txs in the remaining recved queue into
// buffer.
// If that is the case, then could following happen?
// - received many packets (eg mms has many msgs)
// - packets in first msgs were not fully processed by process_packets_transactions()
// - all packets in 2nd and beyound packets will be pushed into buffer
// - this call exits to upper-loop, where consume_buffer() is called to process buffer
// this isn't necessary bad, prolly for a good reason, but it'd badly mess up with
// reported metrics, perhaps give s more reason to add qos_stats
let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets");
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
Expand All @@ -1481,10 +1405,6 @@ impl BankingStage {
duplicates,
banking_stage_stats,
);
// TODO TAO - not sure if we want to collect this:
// qos_service.accumulate_buffered_retryable_txs_due_to_bank_expires(unprocessed_indexes.len(),
// id, bank.slot())
// TODO TAO - same as above, can log the drop and newly buffered details.
}
handle_retryable_packets_time.stop();
banking_stage_stats
Expand Down Expand Up @@ -2319,10 +2239,7 @@ mod tests {
0,
None,
&gossip_vote_sender,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
)
.0
.unwrap();
Expand Down Expand Up @@ -2364,10 +2281,7 @@ mod tests {
0,
None,
&gossip_vote_sender,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
)
.0,
Err(PohRecorderError::MaxHeightReached)
Expand Down Expand Up @@ -2455,10 +2369,7 @@ mod tests {
0,
None,
&gossip_vote_sender,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);

poh_recorder
Expand Down Expand Up @@ -2567,10 +2478,7 @@ mod tests {
&recorder,
None,
&gossip_vote_sender,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);

assert_eq!(processed_transactions_count, 0,);
Expand Down Expand Up @@ -2665,10 +2573,7 @@ mod tests {
enable_cpi_and_log_storage: false,
}),
&gossip_vote_sender,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);

transaction_status_service.join().unwrap();
Expand Down Expand Up @@ -2796,10 +2701,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
Expand All @@ -2816,10 +2718,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
Expand Down Expand Up @@ -2885,10 +2784,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&QosService::new(
Arc::new(RwLock::new(CostModel::default())),
1,
),
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down
Loading

0 comments on commit ac658f0

Please sign in to comment.