Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Separate file for ImmutableDeserializedPacket type #26951

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.

use {
crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::ImmutableDeserializedPacket,
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
Expand Down
4 changes: 3 additions & 1 deletion core/src/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use {
crate::unprocessed_packet_batches::{self, ImmutableDeserializedPacket},
crate::{
immutable_deserialized_packet::ImmutableDeserializedPacket, unprocessed_packet_batches,
},
solana_perf::packet::Packet,
solana_runtime::{
bank::Bank,
Expand Down
137 changes: 137 additions & 0 deletions core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_perf::packet::Packet,
solana_sdk::{
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{SanitizedVersionedTransaction, VersionedTransaction},
},
std::{cmp::Ordering, mem::size_of},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn new(
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

Ok(Self {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
})
}

pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{signature::Keypair, system_transaction},
};

#[test]
fn simple_deserialized_packet() {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let deserialized_packet = ImmutableDeserializedPacket::new(packet, None);

assert!(matches!(deserialized_packet, Ok(_)));
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod fork_choice;
pub mod forward_packet_batches_by_accounts;
pub mod gen_keys;
pub mod heaviest_subtree_fork_choice;
pub mod immutable_deserialized_packet;
pub mod latest_validator_votes_for_frozen_banks;
pub mod leader_slot_banking_stage_metrics;
pub mod leader_slot_banking_stage_timing_metrics;
Expand Down
110 changes: 6 additions & 104 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,23 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
crate::{
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
transaction_priority_details::TransactionPriorityDetails,
},
min_max_heap::MinMaxHeap,
solana_perf::packet::{Packet, PacketBatch},
solana_sdk::{
feature_set,
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{
AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, Transaction,
VersionedTransaction,
},
transaction::{AddressLoader, SanitizedTransaction, Transaction},
},
std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
mem::size_of,
rc::Rc,
sync::Arc,
},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

/// Holds deserialized messages, as well as computed message_hash and other things needed to create
/// SanitizedTransaction
#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -101,25 +43,10 @@ impl DeserializedPacket {
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;
let immutable_section = ImmutableDeserializedPacket::new(packet, priority_details)?;

Ok(Self {
immutable_section: Rc::new(ImmutableDeserializedPacket {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
}),
immutable_section: Rc::new(immutable_section),
forwarded: false,
})
}
Expand All @@ -143,18 +70,6 @@ impl Ord for DeserializedPacket {
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store
/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer
/// to pick proper packets to add to the block.
Expand Down Expand Up @@ -389,19 +304,6 @@ pub fn deserialize_packets<'a>(
})
}

/// Read the transaction message from packet data
pub fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

pub fn transactions_to_deserialized_packets(
transactions: &[Transaction],
) -> Result<Vec<DeserializedPacket>, DeserializedPacketError> {
Expand Down