Skip to content

Commit

Permalink
Add transaction index in slot to geyser plugin TransactionInfo (solan…
Browse files Browse the repository at this point in the history
…a-labs#25688)

* Define shuffle to prep using same shuffle for multiple slices

* Determine transaction indexes and plumb to execute_batch

* Pair transaction_index with transaction in TransactionStatusService

* Add new ReplicaTransactionInfoVersion

* Plumb transaction_indexes through BankingStage

* Prepare BankingStage to receive transaction indexes from PohRecorder

* Determine transaction indexes in PohRecorder; add field to WorkingBank

* Add PohRecorder::record unit test

* Only pass starting_transaction_index around PohRecorder

* Add helper structs to simplify test DashMap

* Pass entry and starting-index into process_entries_with_callback together

* Add tx-index checks to test_rebatch_transactions

* Revert shuffle definition and use zip/unzip

* Only zip/unzip if randomize

* Add confirm_slot_entries test

* Review nits

* Add type alias to make sender docs more clear
  • Loading branch information
Tyera Eulberg authored and gregcusack committed Jun 23, 2022
1 parent 8f938c8 commit 7017142
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 69 deletions.
4 changes: 2 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)),
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
Expand Down Expand Up @@ -439,7 +439,7 @@ fn main() {
std::u64::MAX,
);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
assert!(poh_recorder.lock().unwrap().bank().is_some());
if bank.slot() > 32 {
leader_schedule_cache.set_root(&bank);
Expand Down
2 changes: 1 addition & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

let chunk_len = verified.len() / CHUNKS;
let mut start = 0;
Expand Down
45 changes: 34 additions & 11 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ struct RecordTransactionsSummary {
record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
result: Result<(), PohRecorderError>,
// Index in the slot of the first transaction recorded
starting_transaction_index: Option<usize>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -1203,6 +1205,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let mut starting_transaction_index = None;

if !transactions.is_empty() {
let num_to_record = transactions.len();
Expand All @@ -1217,7 +1220,9 @@ impl BankingStage {
record_transactions_timings.poh_record_us = poh_record_time.as_us();

match res {
Ok(()) => (),
Ok(starting_index) => {
starting_transaction_index = starting_index;
}
Err(PohRecorderError::MaxHeightReached) => {
inc_new_counter_info!("banking_stage-max_height_reached", 1);
inc_new_counter_info!(
Expand All @@ -1227,6 +1232,7 @@ impl BankingStage {
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
starting_transaction_index: None,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e),
Expand All @@ -1236,6 +1242,7 @@ impl BankingStage {
RecordTransactionsSummary {
record_transactions_timings,
result: Ok(()),
starting_transaction_index,
}
}

Expand Down Expand Up @@ -1328,6 +1335,7 @@ impl BankingStage {
let RecordTransactionsSummary {
result: record_transactions_result,
record_transactions_timings,
starting_transaction_index,
} = record_transactions_summary;
execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings {
execution_results_to_transactions_us: execution_results_to_transactions_time.as_us(),
Expand Down Expand Up @@ -1406,6 +1414,20 @@ impl BankingStage {
let post_balances = bank.collect_balances(batch);
let post_token_balances =
collect_token_balances(bank, batch, &mut mint_decimals);
let mut transaction_index = starting_transaction_index.unwrap_or_default();
let batch_transaction_indexes: Vec<_> = tx_results
.execution_results
.iter()
.map(|result| {
if result.was_executed() {
let this_transaction_index = transaction_index;
saturating_add_assign!(transaction_index, 1);
this_transaction_index
} else {
0
}
})
.collect();
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
Expand All @@ -1416,6 +1438,7 @@ impl BankingStage {
post_token_balances,
),
tx_results.rent_debits,
batch_transaction_indexes,
);
}
},
Expand Down Expand Up @@ -2692,7 +2715,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
Expand Down Expand Up @@ -2914,7 +2937,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let process_transactions_batch_output = BankingStage::process_and_record_transactions(
Expand Down Expand Up @@ -3049,7 +3072,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let process_transactions_batch_output = BankingStage::process_and_record_transactions(
Expand Down Expand Up @@ -3122,7 +3145,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
Expand Down Expand Up @@ -3272,7 +3295,7 @@ mod tests {
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

Expand Down Expand Up @@ -3472,7 +3495,7 @@ mod tests {
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

Expand Down Expand Up @@ -3679,7 +3702,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
Expand Down Expand Up @@ -3840,7 +3863,7 @@ mod tests {

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);

let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
Expand Down Expand Up @@ -3997,7 +4020,7 @@ mod tests {
// Processes one packet per iteration of the loop
let num_packets_to_process_per_iteration = num_conflicting_transactions;
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
Expand Down Expand Up @@ -4046,7 +4069,7 @@ mod tests {
// each iteration of this loop will process one element of the batch per iteration of the
// loop.
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.lock().unwrap().recorder();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
Expand Down
7 changes: 6 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ impl ReplayStage {
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
transaction_status_sender.is_some(),
);

let poh_bank = poh_recorder.lock().unwrap().bank();
Expand Down Expand Up @@ -1544,6 +1545,7 @@ impl ReplayStage {
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
Expand Down Expand Up @@ -1659,7 +1661,10 @@ impl ReplayStage {
);

let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
poh_recorder
.lock()
.unwrap()
.set_bank(&tpu_bank, track_transaction_indexes);
} else {
error!("{} No next leader found", my_pubkey);
}
Expand Down
20 changes: 20 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,32 @@ pub struct ReplicaTransactionInfo<'a> {
pub transaction_status_meta: &'a TransactionStatusMeta,
}

/// Information about a transaction, including index in block
#[derive(Clone, Debug)]
pub struct ReplicaTransactionInfoV2<'a> {
/// The first signature of the transaction, used for identifying the transaction.
pub signature: &'a Signature,

/// Indicates if the transaction is a simple vote transaction.
pub is_vote: bool,

/// The sanitized transaction.
pub transaction: &'a SanitizedTransaction,

/// Metadata of the transaction status.
pub transaction_status_meta: &'a TransactionStatusMeta,

/// The transaction's index in the block
pub index: usize,
}

/// A wrapper to future-proof ReplicaTransactionInfo handling.
/// If there were a change to the structure of ReplicaTransactionInfo,
/// there would be new enum entry for the newer version, forcing
/// plugin implementations to handle the change.
pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaTransactionInfo<'a>),
V0_0_2(&'a ReplicaTransactionInfoV2<'a>),
}

#[derive(Clone, Debug)]
Expand Down
19 changes: 13 additions & 6 deletions geyser-plugin-manager/src/transaction_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
crate::geyser_plugin_manager::GeyserPluginManager,
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaTransactionInfo, ReplicaTransactionInfoVersions,
ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
},
solana_measure::measure::Measure,
solana_metrics::*,
Expand All @@ -25,13 +25,18 @@ impl TransactionNotifier for TransactionNotifierImpl {
fn notify_transaction(
&self,
slot: Slot,
index: usize,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_transaction_info");
let transaction_log_info =
Self::build_replica_transaction_info(signature, transaction_status_meta, transaction);
let transaction_log_info = Self::build_replica_transaction_info(
index,
signature,
transaction_status_meta,
transaction,
);

let mut plugin_manager = self.plugin_manager.write().unwrap();

Expand All @@ -44,7 +49,7 @@ impl TransactionNotifier for TransactionNotifierImpl {
continue;
}
match plugin.notify_transaction(
ReplicaTransactionInfoVersions::V0_0_1(&transaction_log_info),
ReplicaTransactionInfoVersions::V0_0_2(&transaction_log_info),
slot,
) {
Err(err) => {
Expand Down Expand Up @@ -78,11 +83,13 @@ impl TransactionNotifierImpl {
}

fn build_replica_transaction_info<'a>(
index: usize,
signature: &'a Signature,
transaction_status_meta: &'a TransactionStatusMeta,
transaction: &'a SanitizedTransaction,
) -> ReplicaTransactionInfo<'a> {
ReplicaTransactionInfo {
) -> ReplicaTransactionInfoV2<'a> {
ReplicaTransactionInfoV2 {
index,
signature,
is_vote: transaction.is_simple_vote_transaction(),
transaction,
Expand Down
Loading

0 comments on commit 7017142

Please sign in to comment.