diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b93bbf3d0420e2..a4161003adf6b2 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,9 +1,11 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to construct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. + use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + immutable_deserialized_packet::ImmutableDeserializedPacket, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_timing_metrics::{ LeaderExecuteAndCommitTimings, RecordTransactionsTimings, diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index 14fcfe486fa8d6..ccd367349a4fdc 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -1,5 +1,7 @@ use { - crate::unprocessed_packet_batches::{self, ImmutableDeserializedPacket}, + crate::{ + immutable_deserialized_packet::ImmutableDeserializedPacket, unprocessed_packet_batches, + }, solana_perf::packet::Packet, solana_runtime::{ bank::Bank, diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs new file mode 100644 index 00000000000000..a54f64916c13c3 --- /dev/null +++ b/core/src/immutable_deserialized_packet.rs @@ -0,0 +1,137 @@ +use { + crate::transaction_priority_details::{ + GetTransactionPriorityDetails, TransactionPriorityDetails, + }, + solana_perf::packet::Packet, + solana_sdk::{ + hash::Hash, + message::Message, + sanitize::SanitizeError, + short_vec::decode_shortu16_len, + signature::Signature, + transaction::{SanitizedVersionedTransaction, VersionedTransaction}, + }, + std::{cmp::Ordering, mem::size_of}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum DeserializedPacketError { + #[error("ShortVec Failed to Deserialize")] + // short_vec::decode_shortu16_len() currently returns () on error + ShortVecError(()), + #[error("Deserialization Error: {0}")] + DeserializationError(#[from] bincode::Error), + #[error("overflowed on signature size {0}")] + SignatureOverflowed(usize), + #[error("packet failed sanitization {0}")] + SanitizeError(#[from] SanitizeError), + #[error("transaction failed prioritization")] + PrioritizationFailure, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct ImmutableDeserializedPacket { + original_packet: Packet, + transaction: SanitizedVersionedTransaction, + message_hash: Hash, + is_simple_vote: bool, + priority_details: TransactionPriorityDetails, +} + +impl ImmutableDeserializedPacket { + pub fn new( + packet: Packet, + priority_details: Option, + ) -> Result { + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; + let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; + let message_bytes = packet_message(&packet)?; + let message_hash = Message::hash_raw_message(message_bytes); + let is_simple_vote = packet.meta.is_simple_vote_tx(); + + // drop transaction if prioritization fails. + let priority_details = priority_details + .or_else(|| sanitized_transaction.get_transaction_priority_details()) + .ok_or(DeserializedPacketError::PrioritizationFailure)?; + + Ok(Self { + original_packet: packet, + transaction: sanitized_transaction, + message_hash, + is_simple_vote, + priority_details, + }) + } + + pub fn original_packet(&self) -> &Packet { + &self.original_packet + } + + pub fn transaction(&self) -> &SanitizedVersionedTransaction { + &self.transaction + } + + pub fn message_hash(&self) -> &Hash { + &self.message_hash + } + + pub fn is_simple_vote(&self) -> bool { + self.is_simple_vote + } + + pub fn priority(&self) -> u64 { + self.priority_details.priority + } + + pub fn compute_unit_limit(&self) -> u64 { + self.priority_details.compute_unit_limit + } +} + +impl PartialOrd for ImmutableDeserializedPacket { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ImmutableDeserializedPacket { + fn cmp(&self, other: &Self) -> Ordering { + self.priority().cmp(&other.priority()) + } +} + +/// Read the transaction message from packet data +fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> { + let (sig_len, sig_size) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(DeserializedPacketError::ShortVecError(()))?; + sig_len + .checked_mul(size_of::()) + .and_then(|v| v.checked_add(sig_size)) + .and_then(|msg_start| packet.data(msg_start..)) + .ok_or(DeserializedPacketError::SignatureOverflowed(sig_size)) +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{signature::Keypair, system_transaction}, + }; + + #[test] + fn simple_deserialized_packet() { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); + let deserialized_packet = ImmutableDeserializedPacket::new(packet, None); + + assert!(matches!(deserialized_packet, Ok(_))); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 9be6ab5b9cc4ba..4133f08152bb64 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -30,6 +30,7 @@ pub mod fork_choice; pub mod forward_packet_batches_by_accounts; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; +pub mod immutable_deserialized_packet; pub mod latest_validator_votes_for_frozen_banks; pub mod leader_slot_banking_stage_metrics; pub mod leader_slot_banking_stage_timing_metrics; diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 3305e264b77d4f..69403eb9ba16e1 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,81 +1,23 @@ use { - crate::transaction_priority_details::{ - GetTransactionPriorityDetails, TransactionPriorityDetails, + crate::{ + immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, + transaction_priority_details::TransactionPriorityDetails, }, min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, solana_sdk::{ feature_set, hash::Hash, - message::Message, - sanitize::SanitizeError, - short_vec::decode_shortu16_len, - signature::Signature, - transaction::{ - AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, Transaction, - VersionedTransaction, - }, + transaction::{AddressLoader, SanitizedTransaction, Transaction}, }, std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, - mem::size_of, rc::Rc, sync::Arc, }, - thiserror::Error, }; -#[derive(Debug, Error)] -pub enum DeserializedPacketError { - #[error("ShortVec Failed to Deserialize")] - // short_vec::decode_shortu16_len() currently returns () on error - ShortVecError(()), - #[error("Deserialization Error: {0}")] - DeserializationError(#[from] bincode::Error), - #[error("overflowed on signature size {0}")] - SignatureOverflowed(usize), - #[error("packet failed sanitization {0}")] - SanitizeError(#[from] SanitizeError), - #[error("transaction failed prioritization")] - PrioritizationFailure, -} - -#[derive(Debug, PartialEq, Eq)] -pub struct ImmutableDeserializedPacket { - original_packet: Packet, - transaction: SanitizedVersionedTransaction, - message_hash: Hash, - is_simple_vote: bool, - priority_details: TransactionPriorityDetails, -} - -impl ImmutableDeserializedPacket { - pub fn original_packet(&self) -> &Packet { - &self.original_packet - } - - pub fn transaction(&self) -> &SanitizedVersionedTransaction { - &self.transaction - } - - pub fn message_hash(&self) -> &Hash { - &self.message_hash - } - - pub fn is_simple_vote(&self) -> bool { - self.is_simple_vote - } - - pub fn priority(&self) -> u64 { - self.priority_details.priority - } - - pub fn compute_unit_limit(&self) -> u64 { - self.priority_details.compute_unit_limit - } -} - /// Holds deserialized messages, as well as computed message_hash and other things needed to create /// SanitizedTransaction #[derive(Debug, Clone, PartialEq, Eq)] @@ -101,25 +43,10 @@ impl DeserializedPacket { packet: Packet, priority_details: Option, ) -> Result { - let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; - let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; - let message_bytes = packet_message(&packet)?; - let message_hash = Message::hash_raw_message(message_bytes); - let is_simple_vote = packet.meta.is_simple_vote_tx(); - - // drop transaction if prioritization fails. - let priority_details = priority_details - .or_else(|| sanitized_transaction.get_transaction_priority_details()) - .ok_or(DeserializedPacketError::PrioritizationFailure)?; + let immutable_section = ImmutableDeserializedPacket::new(packet, priority_details)?; Ok(Self { - immutable_section: Rc::new(ImmutableDeserializedPacket { - original_packet: packet, - transaction: sanitized_transaction, - message_hash, - is_simple_vote, - priority_details, - }), + immutable_section: Rc::new(immutable_section), forwarded: false, }) } @@ -143,18 +70,6 @@ impl Ord for DeserializedPacket { } } -impl PartialOrd for ImmutableDeserializedPacket { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ImmutableDeserializedPacket { - fn cmp(&self, other: &Self) -> Ordering { - self.priority().cmp(&other.priority()) - } -} - /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. @@ -389,19 +304,6 @@ pub fn deserialize_packets<'a>( }) } -/// Read the transaction message from packet data -pub fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> { - let (sig_len, sig_size) = packet - .data(..) - .and_then(|bytes| decode_shortu16_len(bytes).ok()) - .ok_or(DeserializedPacketError::ShortVecError(()))?; - sig_len - .checked_mul(size_of::()) - .and_then(|v| v.checked_add(sig_size)) - .and_then(|msg_start| packet.data(msg_start..)) - .ok_or(DeserializedPacketError::SignatureOverflowed(sig_size)) -} - pub fn transactions_to_deserialized_packets( transactions: &[Transaction], ) -> Result, DeserializedPacketError> {