Skip to content

Commit

Permalink
Separate file for ImmutableDeserializedPacket type
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Aug 9, 2022
1 parent 4e43aa6 commit 650e023
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 105 deletions.
2 changes: 2 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::ImmutableDeserializedPacket,
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
Expand Down
4 changes: 3 additions & 1 deletion core/src/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use {
crate::unprocessed_packet_batches::{self, ImmutableDeserializedPacket},
crate::{
immutable_deserialized_packet::ImmutableDeserializedPacket, unprocessed_packet_batches,
},
solana_perf::packet::Packet,
solana_runtime::{
bank::Bank,
Expand Down
137 changes: 137 additions & 0 deletions core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_perf::packet::Packet,
solana_sdk::{
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{SanitizedVersionedTransaction, VersionedTransaction},
},
std::{cmp::Ordering, mem::size_of},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn new(
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

Ok(Self {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
})
}

pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{signature::Keypair, system_transaction},
};

#[test]
fn simple_deserialized_packet() {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let deserialized_packet = ImmutableDeserializedPacket::new(packet, None);

assert!(matches!(deserialized_packet, Ok(_)));
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod fork_choice;
pub mod forward_packet_batches_by_accounts;
pub mod gen_keys;
pub mod heaviest_subtree_fork_choice;
pub mod immutable_deserialized_packet;
pub mod latest_validator_votes_for_frozen_banks;
pub mod leader_slot_banking_stage_metrics;
pub mod leader_slot_banking_stage_timing_metrics;
Expand Down
110 changes: 6 additions & 104 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,23 @@
use {
crate::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
crate::{
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
transaction_priority_details::TransactionPriorityDetails,
},
min_max_heap::MinMaxHeap,
solana_perf::packet::{Packet, PacketBatch},
solana_sdk::{
feature_set,
hash::Hash,
message::Message,
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{
AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, Transaction,
VersionedTransaction,
},
transaction::{AddressLoader, SanitizedTransaction, Transaction},
},
std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
mem::size_of,
rc::Rc,
sync::Arc,
},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
#[error("packet failed sanitization {0}")]
SanitizeError(#[from] SanitizeError),
#[error("transaction failed prioritization")]
PrioritizationFailure,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
transaction: SanitizedVersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
}

impl ImmutableDeserializedPacket {
pub fn original_packet(&self) -> &Packet {
&self.original_packet
}

pub fn transaction(&self) -> &SanitizedVersionedTransaction {
&self.transaction
}

pub fn message_hash(&self) -> &Hash {
&self.message_hash
}

pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}

pub fn priority(&self) -> u64 {
self.priority_details.priority
}

pub fn compute_unit_limit(&self) -> u64 {
self.priority_details.compute_unit_limit
}
}

/// Holds deserialized messages, as well as computed message_hash and other things needed to create
/// SanitizedTransaction
#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -101,25 +43,10 @@ impl DeserializedPacket {
packet: Packet,
priority_details: Option<TransactionPriorityDetails>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();

// drop transaction if prioritization fails.
let priority_details = priority_details
.or_else(|| sanitized_transaction.get_transaction_priority_details())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;
let immutable_section = ImmutableDeserializedPacket::new(packet, priority_details)?;

Ok(Self {
immutable_section: Rc::new(ImmutableDeserializedPacket {
original_packet: packet,
transaction: sanitized_transaction,
message_hash,
is_simple_vote,
priority_details,
}),
immutable_section: Rc::new(immutable_section),
forwarded: false,
})
}
Expand All @@ -143,18 +70,6 @@ impl Ord for DeserializedPacket {
}
}

impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
self.priority().cmp(&other.priority())
}
}

/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store
/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer
/// to pick proper packets to add to the block.
Expand Down Expand Up @@ -389,19 +304,6 @@ pub fn deserialize_packets<'a>(
})
}

/// Read the transaction message from packet data
pub fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(DeserializedPacketError::ShortVecError(()))?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.and_then(|msg_start| packet.data(msg_start..))
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}

pub fn transactions_to_deserialized_packets(
transactions: &[Transaction],
) -> Result<Vec<DeserializedPacket>, DeserializedPacketError> {
Expand Down

0 comments on commit 650e023

Please sign in to comment.