diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index c884c20aa142e0..c723f3af9a0da4 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -1,2 +1,8 @@ #[allow(dead_code)] mod thread_aware_account_locks; + +mod transaction_priority_id; +#[allow(dead_code)] +mod transaction_state; +#[allow(dead_code)] +mod transaction_state_container; diff --git a/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs new file mode 100644 index 00000000000000..178a9cdf582d5f --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/transaction_priority_id.rs @@ -0,0 +1,27 @@ +use crate::banking_stage::scheduler_messages::TransactionId; + +/// A unique identifier tied with priority ordering for a transaction/packet: +/// - `id` has no effect on ordering +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(crate) struct TransactionPriorityId { + pub(crate) priority: u64, + pub(crate) id: TransactionId, +} + +impl TransactionPriorityId { + pub(crate) fn new(priority: u64, id: TransactionId) -> Self { + Self { priority, id } + } +} + +impl Ord for TransactionPriorityId { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.priority.cmp(&other.priority) + } +} + +impl PartialOrd for TransactionPriorityId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs new file mode 100644 index 00000000000000..da3916cd20ec1c --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -0,0 +1,323 @@ +use { + solana_runtime::transaction_priority_details::TransactionPriorityDetails, + solana_sdk::{slot_history::Slot, transaction::SanitizedTransaction}, +}; + +/// Simple wrapper type to tie a sanitized transaction to max age slot. +pub(crate) struct SanitizedTransactionTTL { + pub(crate) transaction: SanitizedTransaction, + pub(crate) max_age_slot: Slot, +} + +/// TransactionState is used to track the state of a transaction in the transaction scheduler +/// and banking stage as a whole. +/// +/// There are two states a transaction can be in: +/// 1. `Unprocessed` - The transaction is available for scheduling. +/// 2. `Pending` - The transaction is currently scheduled or being processed. +/// +/// Newly received transactions are initially in the `Unprocessed` state. +/// When a transaction is scheduled, it is transitioned to the `Pending` state, +/// using the `transition_to_pending` method. +/// When a transaction finishes processing it may be retryable. If it is retryable, +/// the transaction is transitioned back to the `Unprocessed` state using the +/// `transition_to_unprocessed` method. If it is not retryable, the state should +/// be dropped. +/// +/// For performance, when a transaction is transitioned to the `Pending` state, the +/// internal `SanitizedTransaction` is moved out of the `TransactionState` and sent +/// to the appropriate thread for processing. This is done to avoid cloning the +/// `SanitizedTransaction`. +#[allow(clippy::large_enum_variant)] +pub(crate) enum TransactionState { + /// The transaction is available for scheduling. + Unprocessed { + transaction_ttl: SanitizedTransactionTTL, + transaction_priority_details: TransactionPriorityDetails, + forwarded: bool, + }, + /// The transaction is currently scheduled or being processed. + Pending { + transaction_priority_details: TransactionPriorityDetails, + forwarded: bool, + }, +} + +impl TransactionState { + /// Creates a new `TransactionState` in the `Unprocessed` state. + pub(crate) fn new( + transaction_ttl: SanitizedTransactionTTL, + transaction_priority_details: TransactionPriorityDetails, + ) -> Self { + Self::Unprocessed { + transaction_ttl, + transaction_priority_details, + forwarded: false, + } + } + + /// Returns a reference to the priority details of the transaction. + pub(crate) fn transaction_priority_details(&self) -> &TransactionPriorityDetails { + match self { + Self::Unprocessed { + transaction_priority_details, + .. + } => transaction_priority_details, + Self::Pending { + transaction_priority_details, + .. + } => transaction_priority_details, + } + } + + /// Returns the priority of the transaction. + pub(crate) fn priority(&self) -> u64 { + self.transaction_priority_details().priority + } + + /// Returns whether or not the transaction has already been forwarded. + pub(crate) fn forwarded(&self) -> bool { + match self { + Self::Unprocessed { forwarded, .. } => *forwarded, + Self::Pending { forwarded, .. } => *forwarded, + } + } + + /// Sets the transaction as forwarded. + pub(crate) fn set_forwarded(&self) { + match self { + Self::Unprocessed { forwarded, .. } => *forwarded = true, + Self::Pending { forwarded, .. } => *forwarded = true, + } + } + + /// Intended to be called when a transaction is scheduled. This method will + /// transition the transaction from `Unprocessed` to `Pending` and return the + /// `SanitizedTransactionTTL` for processing. + /// + /// # Panics + /// This method will panic if the transaction is already in the `Pending` state, + /// as this is an invalid state transition. + pub(crate) fn transition_to_pending(&mut self) -> SanitizedTransactionTTL { + match self.take() { + TransactionState::Unprocessed { + transaction_ttl, + transaction_priority_details, + forwarded, + } => { + *self = TransactionState::Pending { + transaction_priority_details, + forwarded, + }; + transaction_ttl + } + TransactionState::Pending { .. } => { + panic!("transaction already pending"); + } + } + } + + /// Intended to be called when a transaction is retried. This method will + /// transition the transaction from `Pending` to `Unprocessed`. + /// + /// # Panics + /// This method will panic if the transaction is already in the `Unprocessed` + /// state, as this is an invalid state transition. + pub(crate) fn transition_to_unprocessed(&mut self, transaction_ttl: SanitizedTransactionTTL) { + match self.take() { + TransactionState::Unprocessed { .. } => panic!("already unprocessed"), + TransactionState::Pending { + transaction_priority_details, + forwarded, + } => { + *self = Self::Unprocessed { + transaction_ttl, + transaction_priority_details, + forwarded, + } + } + } + } + + /// Get a reference to the `SanitizedTransactionTTL` for the transaction. + /// + /// # Panics + /// This method will panic if the transaction is in the `Pending` state. + pub(crate) fn transaction_ttl(&self) -> &SanitizedTransactionTTL { + match self { + Self::Unprocessed { + transaction_ttl, .. + } => transaction_ttl, + Self::Pending { .. } => panic!("transaction is pending"), + } + } + + /// Internal helper to transitioning between states. + /// Replaces `self` with a dummy state that will immediately be overwritten in transition. + fn take(&mut self) -> Self { + core::mem::replace( + self, + Self::Pending { + transaction_priority_details: TransactionPriorityDetails { + priority: 0, + compute_unit_limit: 0, + }, + forwarded: false, + }, + ) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, + }, + }; + + fn create_transaction_state(priority: u64) -> TransactionState { + let from_keypair = Keypair::new(); + let ixs = vec![ + system_instruction::transfer( + &from_keypair.pubkey(), + &solana_sdk::pubkey::new_rand(), + 1, + ), + ComputeBudgetInstruction::set_compute_unit_price(priority), + ]; + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = Transaction::new(&[&from_keypair], message, Hash::default()); + + let transaction_ttl = SanitizedTransactionTTL { + transaction: SanitizedTransaction::from_transaction_for_tests(tx), + max_age_slot: Slot::MAX, + }; + + TransactionState::new( + transaction_ttl, + TransactionPriorityDetails { + priority, + compute_unit_limit: 0, + }, + ) + } + + #[test] + #[should_panic(expected = "already pending")] + fn test_transition_to_pending_panic() { + let mut transaction_state = create_transaction_state(0); + transaction_state.transition_to_pending(); + transaction_state.transition_to_pending(); // invalid transition + } + + #[test] + fn test_transition_to_pending() { + let mut transaction_state = create_transaction_state(0); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + let _ = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + } + + #[test] + #[should_panic(expected = "already unprocessed")] + fn test_transition_to_unprocessed_panic() { + let mut transaction_state = create_transaction_state(0); + + // Manually clone `SanitizedTransactionTTL` + let SanitizedTransactionTTL { + transaction, + max_age_slot, + } = transaction_state.transaction_ttl(); + let transaction_ttl = SanitizedTransactionTTL { + transaction: transaction.clone(), + max_age_slot: *max_age_slot, + }; + transaction_state.transition_to_unprocessed(transaction_ttl); // invalid transition + } + + #[test] + fn test_transition_to_unprocessed() { + let mut transaction_state = create_transaction_state(0); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + let transaction_ttl = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + transaction_state.transition_to_unprocessed(transaction_ttl); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + } + + #[test] + fn test_transaction_priority_details() { + let priority = 15; + let mut transaction_state = create_transaction_state(priority); + assert_eq!(transaction_state.priority(), priority); + + // ensure priority is not lost through state transitions + let transaction_ttl = transaction_state.transition_to_pending(); + assert_eq!(transaction_state.priority(), priority); + transaction_state.transition_to_unprocessed(transaction_ttl); + assert_eq!(transaction_state.priority(), priority); + } + + #[test] + #[should_panic(expected = "transaction is pending")] + fn test_transaction_ttl_panic() { + let mut transaction_state = create_transaction_state(0); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + + let _ = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + let _ = transaction_state.transaction_ttl(); // pending state, the transaction ttl is not available + } + + #[test] + fn test_transaction_ttl() { + let mut transaction_state = create_transaction_state(0); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + + // ensure transaction_ttl is not lost through state transitions + let transaction_ttl = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + + transaction_state.transition_to_unprocessed(transaction_ttl); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + } +} diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs new file mode 100644 index 00000000000000..f5f80f30aceb40 --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -0,0 +1,311 @@ +use { + super::{ + transaction_priority_id::TransactionPriorityId, + transaction_state::{SanitizedTransactionTTL, TransactionState}, + }, + crate::banking_stage::scheduler_messages::TransactionId, + min_max_heap::MinMaxHeap, + solana_runtime::transaction_priority_details::TransactionPriorityDetails, + std::collections::HashMap, +}; + +/// This structure will hold `TransactionState` for the entirety of a +/// transaction's lifetime in the scheduler and BankingStage as a whole. +/// +/// Transaction Lifetime: +/// 1. Received from `SigVerify` by `BankingStage` +/// 2. Inserted into `TransactionStateContainer` by `BankingStage` +/// 3. Popped in priority-order by scheduler, and transitioned to `Pending` state +/// 4. Processed by `ConsumeWorker` +/// a. If consumed, remove `Pending` state from the `TransactionStateContainer` +/// b. If retryable, transition back to `Unprocessed` state. +/// Re-insert to the queue, and return to step 3. +/// +/// The structure is composed of two main components: +/// 1. A priority queue of wrapped `TransactionId`s, which are used to +/// order transactions by priority for selection by the scheduler. +/// 2. A map of `TransactionId` to `TransactionState`, which is used to +/// track the state of each transaction. +/// +/// When `Pending`, the associated `TransactionId` is not in the queue, but +/// is still in the map. +/// The entry in the map should exist before insertion into the queue, and be +/// be removed only after the id is removed from the queue. +/// +/// The container maintains a fixed capacity. If the queue is full when pushing +/// a new transaction, the lowest priority transaction will be dropped. +pub(crate) struct TransactionStateContainer { + priority_queue: MinMaxHeap, + id_to_transaction_state: HashMap, +} + +impl TransactionStateContainer { + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self { + priority_queue: MinMaxHeap::with_capacity(capacity), + id_to_transaction_state: HashMap::with_capacity(capacity), + } + } + + /// Returns true if the queue is empty. + pub(crate) fn is_empty(&self) -> bool { + self.priority_queue.is_empty() + } + + /// Returns the remaining capacity of the queue + pub(crate) fn remaining_queue_capacity(&self) -> usize { + self.priority_queue.capacity() - self.priority_queue.len() + } + + /// Get an iterator of the top `n` transaction ids in the priority queue. + /// This will remove the ids from the queue, but not drain the remainder + /// of the queue. + pub(crate) fn take_top_n( + &mut self, + n: usize, + ) -> impl Iterator + '_ { + (0..n).map_while(|_| self.priority_queue.pop_max()) + } + + /// Serialize entire priority queue. `hold` indicates whether the priority queue should + /// be drained or not. + /// If `hold` is true, these ids should not be removed from the map while processing. + pub(crate) fn priority_ordered_ids(&mut self, hold: bool) -> Vec { + let priority_queue = if hold { + self.priority_queue.clone() + } else { + let capacity = self.priority_queue.capacity(); + core::mem::replace( + &mut self.priority_queue, + MinMaxHeap::with_capacity(capacity), + ) + }; + + priority_queue.into_vec_desc() + } + + /// Get mutable transaction state by id. + pub(crate) fn get_mut_transaction_state( + &mut self, + id: &TransactionId, + ) -> Option<&mut TransactionState> { + self.id_to_transaction_state.get_mut(id) + } + + /// Get reference to `SanitizedTransactionTTL` by id. + /// Panics if the transaction does not exist. + pub(crate) fn get_transaction_ttl( + &self, + id: &TransactionId, + ) -> Option<&SanitizedTransactionTTL> { + self.id_to_transaction_state + .get(id) + .map(|state| state.transaction_ttl()) + } + + /// Take `SanitizedTransactionTTL` by id. + /// This transitions the transaction to `Pending` state. + /// Panics if the transaction does not exist. + pub(crate) fn take_transaction(&mut self, id: &TransactionId) -> SanitizedTransactionTTL { + self.id_to_transaction_state + .get_mut(id) + .expect("transaction must exist") + .transition_to_pending() + } + + /// Insert a new transaction into the container's queues and maps. + pub(crate) fn insert_new_transaction( + &mut self, + transaction_id: TransactionId, + transaction_ttl: SanitizedTransactionTTL, + transaction_priority_details: TransactionPriorityDetails, + ) { + let priority_id = + TransactionPriorityId::new(transaction_priority_details.priority, transaction_id); + self.id_to_transaction_state.insert( + transaction_id, + TransactionState::new(transaction_ttl, transaction_priority_details), + ); + self.push_id_into_queue(priority_id) + } + + /// Retries a transaction - inserts transaction back into map (but not packet). + /// This transitions the transaction to `Unprocessed` state. + pub(crate) fn retry_transaction( + &mut self, + transaction_id: TransactionId, + transaction_ttl: SanitizedTransactionTTL, + ) { + let transaction_state = self + .get_mut_transaction_state(&transaction_id) + .expect("transaction must exist"); + let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); + transaction_state.transition_to_unprocessed(transaction_ttl); + self.push_id_into_queue(priority_id); + } + + /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority + /// transaction will be dropped (removed from the queue and map). + pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) { + if self.remaining_queue_capacity() == 0 { + let popped_id = self.priority_queue.push_pop_min(priority_id); + self.remove_by_id(&popped_id.id); + } else { + self.priority_queue.push(priority_id); + } + } + + /// Remove transaction by id. + pub(crate) fn remove_by_id(&mut self, id: &TransactionId) { + self.id_to_transaction_state + .remove(id) + .expect("transaction must exist"); + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + message::Message, + signature::Keypair, + signer::Signer, + slot_history::Slot, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + }; + + fn test_transaction(priority: u64) -> (SanitizedTransactionTTL, TransactionPriorityDetails) { + let from_keypair = Keypair::new(); + let ixs = vec![ + system_instruction::transfer( + &from_keypair.pubkey(), + &solana_sdk::pubkey::new_rand(), + 1, + ), + ComputeBudgetInstruction::set_compute_unit_price(priority), + ]; + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = Transaction::new(&[&from_keypair], message, Hash::default()); + + let transaction_ttl = SanitizedTransactionTTL { + transaction: SanitizedTransaction::from_transaction_for_tests(tx), + max_age_slot: Slot::MAX, + }; + ( + transaction_ttl, + TransactionPriorityDetails { + priority, + compute_unit_limit: 0, + }, + ) + } + + fn push_to_container(container: &mut TransactionStateContainer, num: usize) { + for id in 0..num as u64 { + let priority = id; + let (transaction_ttl, transaction_priority_details) = test_transaction(priority); + container.insert_new_transaction( + TransactionId::new(id), + transaction_ttl, + transaction_priority_details, + ); + } + } + + #[test] + fn test_is_empty() { + let mut container = TransactionStateContainer::with_capacity(1); + assert!(container.is_empty()); + + push_to_container(&mut container, 1); + assert!(!container.is_empty()); + } + + #[test] + fn test_priority_queue_capacity() { + let mut container = TransactionStateContainer::with_capacity(1); + push_to_container(&mut container, 5); + + assert_eq!(container.priority_queue.len(), 1); + assert_eq!(container.id_to_transaction_state.len(), 1); + assert_eq!( + container + .id_to_transaction_state + .iter() + .map(|ts| ts.1.priority()) + .next() + .unwrap(), + 4 + ); + } + + #[test] + fn test_take_top_n() { + let mut container = TransactionStateContainer::with_capacity(5); + push_to_container(&mut container, 5); + + let taken = container.take_top_n(3).collect::>(); + assert_eq!( + taken, + vec![ + TransactionPriorityId::new(4, TransactionId::new(4)), + TransactionPriorityId::new(3, TransactionId::new(3)), + TransactionPriorityId::new(2, TransactionId::new(2)), + ] + ); + // The remainder of the queue should not be empty + assert_eq!(container.priority_queue.len(), 2); + } + + #[test] + fn test_priority_ordered_ids() { + let mut container = TransactionStateContainer::with_capacity(5); + push_to_container(&mut container, 5); + + let ordered = container.priority_ordered_ids(false); + assert_eq!( + ordered, + vec![ + TransactionPriorityId::new(4, TransactionId::new(4)), + TransactionPriorityId::new(3, TransactionId::new(3)), + TransactionPriorityId::new(2, TransactionId::new(2)), + TransactionPriorityId::new(1, TransactionId::new(1)), + TransactionPriorityId::new(0, TransactionId::new(0)), + ] + ); + assert!(container.priority_queue.is_empty()); + + push_to_container(&mut container, 5); + let ordered = container.priority_ordered_ids(true); + assert_eq!( + ordered, + vec![ + TransactionPriorityId::new(4, TransactionId::new(4)), + TransactionPriorityId::new(3, TransactionId::new(3)), + TransactionPriorityId::new(2, TransactionId::new(2)), + TransactionPriorityId::new(1, TransactionId::new(1)), + TransactionPriorityId::new(0, TransactionId::new(0)), + ] + ); + assert_eq!(container.priority_queue.len(), 5); + } + + #[test] + fn test_get_mut_transaction_state() { + let mut container = TransactionStateContainer::with_capacity(5); + push_to_container(&mut container, 5); + + let existing_id = TransactionId::new(3); + let non_existing_id = TransactionId::new(7); + assert!(container.get_mut_transaction_state(&existing_id).is_some()); + assert!(container.get_mut_transaction_state(&existing_id).is_some()); + assert!(container + .get_mut_transaction_state(&non_existing_id) + .is_none()); + } +}