diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 40cd821ac82680..761d11e4e96d86 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -68,6 +68,7 @@ mod latest_unprocessed_votes; mod leader_slot_timing_metrics; mod multi_iterator_scanner; mod packet_deserializer; +mod packet_filter; mod packet_receiver; mod read_write_account_set; #[allow(dead_code)] diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index fba145f3088652..0c0cc00d1a3653 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -1,5 +1,5 @@ use { - solana_cost_model::block_cost_limits::BUILT_IN_INSTRUCTION_COSTS, + super::packet_filter::PacketFilterFailure, solana_perf::packet::Packet, solana_runtime::compute_budget_details::{ComputeBudgetDetails, GetComputeBudgetDetails}, solana_sdk::{ @@ -8,7 +8,6 @@ use { message::Message, pubkey::Pubkey, sanitize::SanitizeError, - saturating_add_assign, short_vec::decode_shortu16_len, signature::Signature, transaction::{ @@ -35,6 +34,8 @@ pub enum DeserializedPacketError { PrioritizationFailure, #[error("vote transaction failure")] VoteTransactionError, + #[error("Packet filter failure: {0}")] + FailedFilter(#[from] PacketFilterFailure), } #[derive(Debug, PartialEq, Eq)] @@ -101,22 +102,6 @@ impl ImmutableDeserializedPacket { self.compute_budget_details.clone() } - /// Returns true if the transaction's compute unit limit is at least as - /// large as the sum of the static builtins' costs. - /// This is a simple sanity check so the leader can discard transactions - /// which are statically known to exceed the compute budget, and will - /// result in no useful state-change. - pub fn compute_unit_limit_above_static_builtins(&self) -> bool { - let mut static_builtin_cost_sum: u64 = 0; - for (program_id, _) in self.transaction.get_message().program_instructions_iter() { - if let Some(ix_cost) = BUILT_IN_INSTRUCTION_COSTS.get(program_id) { - saturating_add_assign!(static_builtin_cost_sum, *ix_cost); - } - } - - self.compute_unit_limit() >= static_builtin_cost_sum - } - // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( @@ -197,7 +182,11 @@ mod tests { // 1. compute_unit_limit under static builtins // 2. compute_unit_limit equal to static builtins // 3. compute_unit_limit above static builtins - for (cu_limit, expectation) in [(250, false), (300, true), (350, true)] { + for (cu_limit, expectation) in [ + (250, Err(PacketFilterFailure::InsufficientComputeLimit)), + (300, Ok(())), + (350, Ok(())), + ] { let keypair = Keypair::new(); let bpf_program_id = Pubkey::new_unique(); let ixs = vec![ @@ -214,7 +203,7 @@ mod tests { let packet = Packet::from_data(None, tx).unwrap(); let deserialized_packet = ImmutableDeserializedPacket::new(packet).unwrap(); assert_eq!( - deserialized_packet.compute_unit_limit_above_static_builtins(), + deserialized_packet.check_insufficent_compute_unit_limit(), expectation ); } diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 38d64689bda49b..390e128b6c8428 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -1,6 +1,7 @@ use { super::{ leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, + packet_deserializer::PacketReceiverStats, unprocessed_transaction_storage::{ InsertPacketBatchSummary, UnprocessedTransactionStorage, }, @@ -113,6 +114,21 @@ struct LeaderSlotPacketCountMetrics { // total number of packets TPU received from sigverify that failed signature verification. newly_failed_sigverify_count: u64, + // total number of packets filtered due to sanitization failures during receiving from sigverify + failed_sanitization_count: u64, + + // total number of packets filtered due to prioritization failures during receiving from sigverify + failed_prioritization_count: u64, + + // total number of packets filtered due to insufficient compute limits during receiving from sigverify + insufficient_compute_limit_count: u64, + + // total number of packets filtered due to excessive precompile signatures during receiving from sigverify + excessive_precompile_count: u64, + + // total number of invalid vote packets filtered out during receiving from sigverify + invalid_votes_count: u64, + // total number of dropped packet due to the thread's buffered packets capacity being reached. exceeded_buffer_limit_dropped_packets_count: u64, @@ -203,120 +219,145 @@ impl LeaderSlotPacketCountMetrics { datapoint_info!( "banking_stage-leader_slot_packet_counts", "id" => id, - ("slot", slot as i64, i64), + ("slot", slot, i64), ( "total_new_valid_packets", - self.total_new_valid_packets as i64, + self.total_new_valid_packets, i64 ), ( "newly_failed_sigverify_count", - self.newly_failed_sigverify_count as i64, + self.newly_failed_sigverify_count, + i64 + ), + ( + "failed_sanitization_count", + self.failed_sanitization_count, + i64 + ), + ( + "failed_prioritization_count", + self.failed_prioritization_count, + i64 + ), + ( + "insufficient_compute_limit_count", + self.insufficient_compute_limit_count, + i64 + ), + ( + "excessive_precompile_count", + self.excessive_precompile_count, + i64 + ), + ( + "invalid_votes_count", + self.invalid_votes_count, i64 ), ( "exceeded_buffer_limit_dropped_packets_count", - self.exceeded_buffer_limit_dropped_packets_count as i64, + self.exceeded_buffer_limit_dropped_packets_count, i64 ), ( "newly_buffered_packets_count", - self.newly_buffered_packets_count as i64, + self.newly_buffered_packets_count, i64 ), ( "retryable_packets_filtered_count", - self.retryable_packets_filtered_count as i64, + self.retryable_packets_filtered_count, i64 ), ( "transactions_attempted_execution_count", - self.transactions_attempted_execution_count as i64, + self.transactions_attempted_execution_count, i64 ), ( "committed_transactions_count", - self.committed_transactions_count as i64, + self.committed_transactions_count, i64 ), ( "committed_transactions_with_successful_result_count", - self.committed_transactions_with_successful_result_count as i64, + self.committed_transactions_with_successful_result_count, i64 ), ( "retryable_errored_transaction_count", - self.retryable_errored_transaction_count as i64, + self.retryable_errored_transaction_count, i64 ), ( "retryable_packets_count", - self.retryable_packets_count as i64, + self.retryable_packets_count, i64 ), ( "nonretryable_errored_transactions_count", - self.nonretryable_errored_transactions_count as i64, + self.nonretryable_errored_transactions_count, i64 ), ( "executed_transactions_failed_commit_count", - self.executed_transactions_failed_commit_count as i64, + self.executed_transactions_failed_commit_count, i64 ), ( "account_lock_throttled_transactions_count", - self.account_lock_throttled_transactions_count as i64, + self.account_lock_throttled_transactions_count, i64 ), ( "account_locks_limit_throttled_transactions_count", - self.account_locks_limit_throttled_transactions_count as i64, + self.account_locks_limit_throttled_transactions_count, i64 ), ( "cost_model_throttled_transactions_count", - self.cost_model_throttled_transactions_count as i64, + self.cost_model_throttled_transactions_count, i64 ), ( "failed_forwarded_packets_count", - self.failed_forwarded_packets_count as i64, + self.failed_forwarded_packets_count, i64 ), ( "successful_forwarded_packets_count", - self.successful_forwarded_packets_count as i64, + self.successful_forwarded_packets_count, i64 ), ( "packet_batch_forward_failure_count", - self.packet_batch_forward_failure_count as i64, + self.packet_batch_forward_failure_count, i64 ), ( "cleared_from_buffer_after_forward_count", - self.cleared_from_buffer_after_forward_count as i64, + self.cleared_from_buffer_after_forward_count, i64 ), ( "forwardable_batches_count", - self.forwardable_batches_count as i64, + self.forwardable_batches_count, i64 ), ( "end_of_slot_unprocessed_buffer_len", - self.end_of_slot_unprocessed_buffer_len as i64, + self.end_of_slot_unprocessed_buffer_len, i64 ), ( "min_prioritization_fees", - self.min_prioritization_fees as i64, + self.min_prioritization_fees, i64 ), ( "max_prioritization_fees", - self.max_prioritization_fees as i64, + self.max_prioritization_fees, i64 ), ); @@ -656,24 +697,34 @@ impl LeaderSlotMetricsTracker { } // Packet inflow/outflow/processing metrics - pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) { + pub(crate) fn increment_received_packet_counts(&mut self, stats: PacketReceiverStats) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + let metrics = &mut leader_slot_metrics.packet_count_metrics; + let PacketReceiverStats { + passed_sigverify_count, + failed_sigverify_count, + invalid_vote_count, + failed_prioritization_count, + failed_sanitization_count, + excessive_precompile_count, + insufficient_compute_limit_count, + } = stats; + + saturating_add_assign!(metrics.total_new_valid_packets, passed_sigverify_count); + saturating_add_assign!(metrics.newly_failed_sigverify_count, failed_sigverify_count); + saturating_add_assign!(metrics.invalid_votes_count, invalid_vote_count); saturating_add_assign!( - leader_slot_metrics - .packet_count_metrics - .total_new_valid_packets, - count + metrics.failed_prioritization_count, + failed_prioritization_count ); - } - } - - pub(crate) fn increment_newly_failed_sigverify_count(&mut self, count: u64) { - if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!(metrics.failed_sanitization_count, failed_sanitization_count); saturating_add_assign!( - leader_slot_metrics - .packet_count_metrics - .newly_failed_sigverify_count, - count + metrics.excessive_precompile_count, + excessive_precompile_count + ); + saturating_add_assign!( + metrics.insufficient_compute_limit_count, + insufficient_compute_limit_count ); } } diff --git a/core/src/banking_stage/packet_deserializer.rs b/core/src/banking_stage/packet_deserializer.rs index 1d1079eaf97fcd..e310d5505c03c9 100644 --- a/core/src/banking_stage/packet_deserializer.rs +++ b/core/src/banking_stage/packet_deserializer.rs @@ -1,7 +1,10 @@ //! Deserializes packets from sigverify stage. Owned by banking stage. use { - super::immutable_deserialized_packet::ImmutableDeserializedPacket, + super::{ + immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, + packet_filter::PacketFilterFailure, + }, crate::{ banking_trace::{BankingPacketBatch, BankingPacketReceiver}, sigverify::SigverifyTracerPacketStats, @@ -9,6 +12,7 @@ use { crossbeam_channel::RecvTimeoutError, solana_perf::packet::PacketBatch, solana_runtime::bank_forks::BankForks, + solana_sdk::saturating_add_assign, std::{ sync::{Arc, RwLock}, time::{Duration, Instant}, @@ -21,10 +25,9 @@ pub struct ReceivePacketResults { pub deserialized_packets: Vec, /// Aggregate tracer stats for all received packet batches pub new_tracer_stats_option: Option, - /// Number of packets passing sigverify - pub passed_sigverify_count: u64, - /// Number of packets failing sigverify - pub failed_sigverify_count: u64, + /// Counts of packets received and errors recorded during deserialization + /// and filtering + pub packet_stats: PacketReceiverStats, } pub struct PacketDeserializer { @@ -34,6 +37,51 @@ pub struct PacketDeserializer { bank_forks: Arc>, } +#[derive(Default, Debug, PartialEq)] +pub struct PacketReceiverStats { + /// Number of packets passing sigverify + pub passed_sigverify_count: u64, + /// Number of packets failing sigverify + pub failed_sigverify_count: u64, + /// Number of packets dropped due to sanitization error + pub failed_sanitization_count: u64, + /// Number of packets dropped due to prioritization error + pub failed_prioritization_count: u64, + /// Number of vote packets dropped + pub invalid_vote_count: u64, + /// Number of packets dropped due to excessive precompiles + pub excessive_precompile_count: u64, + /// Number of packets dropped due to insufficient compute limit + pub insufficient_compute_limit_count: u64, +} + +impl PacketReceiverStats { + pub fn increment_error_count(&mut self, err: &DeserializedPacketError) { + match err { + DeserializedPacketError::ShortVecError(..) + | DeserializedPacketError::DeserializationError(..) + | DeserializedPacketError::SignatureOverflowed(..) + | DeserializedPacketError::SanitizeError(..) => { + saturating_add_assign!(self.failed_sanitization_count, 1); + } + DeserializedPacketError::PrioritizationFailure => { + saturating_add_assign!(self.failed_prioritization_count, 1); + } + DeserializedPacketError::VoteTransactionError => { + saturating_add_assign!(self.invalid_vote_count, 1); + } + DeserializedPacketError::FailedFilter(PacketFilterFailure::ExcessivePrecompiles) => { + saturating_add_assign!(self.excessive_precompile_count, 1); + } + DeserializedPacketError::FailedFilter( + PacketFilterFailure::InsufficientComputeLimit, + ) => { + saturating_add_assign!(self.insufficient_compute_limit_count, 1); + } + } + } +} + impl PacketDeserializer { pub fn new( packet_batch_receiver: BankingPacketReceiver, @@ -50,7 +98,9 @@ impl PacketDeserializer { &self, recv_timeout: Duration, capacity: usize, - packet_filter: impl Fn(&ImmutableDeserializedPacket) -> bool, + packet_filter: impl Fn( + ImmutableDeserializedPacket, + ) -> Result, ) -> Result { let (packet_count, packet_batches) = self.receive_until(recv_timeout, capacity)?; @@ -63,7 +113,7 @@ impl PacketDeserializer { packet_count, &packet_batches, round_compute_unit_price_enabled, - &packet_filter, + packet_filter, )) } @@ -73,10 +123,11 @@ impl PacketDeserializer { packet_count: usize, banking_batches: &[BankingPacketBatch], round_compute_unit_price_enabled: bool, - packet_filter: &impl Fn(&ImmutableDeserializedPacket) -> bool, + packet_filter: impl Fn( + ImmutableDeserializedPacket, + ) -> Result, ) -> ReceivePacketResults { - let mut passed_sigverify_count: usize = 0; - let mut failed_sigverify_count: usize = 0; + let mut packet_stats = PacketReceiverStats::default(); let mut deserialized_packets = Vec::with_capacity(packet_count); let mut aggregated_tracer_packet_stats_option = None::; @@ -84,14 +135,21 @@ impl PacketDeserializer { for packet_batch in &banking_batch.0 { let packet_indexes = Self::generate_packet_indexes(packet_batch); - passed_sigverify_count += packet_indexes.len(); - failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len()); + saturating_add_assign!( + packet_stats.passed_sigverify_count, + packet_indexes.len() as u64 + ); + saturating_add_assign!( + packet_stats.failed_sigverify_count, + packet_batch.len().saturating_sub(packet_indexes.len()) as u64 + ); deserialized_packets.extend(Self::deserialize_packets( packet_batch, &packet_indexes, round_compute_unit_price_enabled, - packet_filter, + &mut packet_stats, + &packet_filter, )); } @@ -111,8 +169,7 @@ impl PacketDeserializer { ReceivePacketResults { deserialized_packets, new_tracer_stats_option: aggregated_tracer_packet_stats_option, - passed_sigverify_count: passed_sigverify_count as u64, - failed_sigverify_count: failed_sigverify_count as u64, + packet_stats, } } @@ -162,16 +219,26 @@ impl PacketDeserializer { packet_batch: &'a PacketBatch, packet_indexes: &'a [usize], round_compute_unit_price_enabled: bool, - packet_filter: &'a (impl Fn(&ImmutableDeserializedPacket) -> bool + 'a), + packet_stats: &'a mut PacketReceiverStats, + packet_filter: &'a impl Fn( + ImmutableDeserializedPacket, + ) -> Result, ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { let mut packet_clone = packet_batch[*packet_index].clone(); packet_clone .meta_mut() .set_round_compute_unit_price(round_compute_unit_price_enabled); - ImmutableDeserializedPacket::new(packet_clone) - .ok() - .filter(packet_filter) + + match ImmutableDeserializedPacket::new(packet_clone) + .and_then(|packet| packet_filter(packet).map_err(Into::into)) + { + Ok(packet) => Some(packet), + Err(err) => { + packet_stats.increment_error_count(&err); + None + } + } }) } } @@ -193,11 +260,11 @@ mod tests { #[test] fn test_deserialize_and_collect_packets_empty() { - let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], false, &|_| true); + let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], false, Ok); assert_eq!(results.deserialized_packets.len(), 0); assert!(results.new_tracer_stats_option.is_none()); - assert_eq!(results.passed_sigverify_count, 0); - assert_eq!(results.failed_sigverify_count, 0); + assert_eq!(results.packet_stats.passed_sigverify_count, 0); + assert_eq!(results.packet_stats.failed_sigverify_count, 0); } #[test] @@ -211,12 +278,12 @@ mod tests { packet_count, &[BankingPacketBatch::new((packet_batches, None))], false, - &|_| true, + Ok, ); assert_eq!(results.deserialized_packets.len(), 2); assert!(results.new_tracer_stats_option.is_none()); - assert_eq!(results.passed_sigverify_count, 2); - assert_eq!(results.failed_sigverify_count, 0); + assert_eq!(results.packet_stats.passed_sigverify_count, 2); + assert_eq!(results.packet_stats.failed_sigverify_count, 0); } #[test] @@ -231,11 +298,11 @@ mod tests { packet_count, &[BankingPacketBatch::new((packet_batches, None))], false, - &|_| true, + Ok, ); assert_eq!(results.deserialized_packets.len(), 1); assert!(results.new_tracer_stats_option.is_none()); - assert_eq!(results.passed_sigverify_count, 1); - assert_eq!(results.failed_sigverify_count, 1); + assert_eq!(results.packet_stats.passed_sigverify_count, 1); + assert_eq!(results.packet_stats.failed_sigverify_count, 1); } } diff --git a/core/src/banking_stage/packet_filter.rs b/core/src/banking_stage/packet_filter.rs new file mode 100644 index 00000000000000..5e90902e01816f --- /dev/null +++ b/core/src/banking_stage/packet_filter.rs @@ -0,0 +1,55 @@ +use { + super::immutable_deserialized_packet::ImmutableDeserializedPacket, + solana_cost_model::block_cost_limits::BUILT_IN_INSTRUCTION_COSTS, + solana_sdk::{ed25519_program, saturating_add_assign, secp256k1_program}, + thiserror::Error, +}; + +#[derive(Debug, Error, PartialEq)] +pub enum PacketFilterFailure { + #[error("Insufficient compute unit limit")] + InsufficientComputeLimit, + #[error("Excessive precompile usage")] + ExcessivePrecompiles, +} + +impl ImmutableDeserializedPacket { + /// Returns ok if the transaction's compute unit limit is at least as + /// large as the sum of the static builtins' costs. + /// This is a simple sanity check so the leader can discard transactions + /// which are statically known to exceed the compute budget, and will + /// result in no useful state-change. + pub fn check_insufficent_compute_unit_limit(&self) -> Result<(), PacketFilterFailure> { + let mut static_builtin_cost_sum: u64 = 0; + for (program_id, _) in self.transaction().get_message().program_instructions_iter() { + if let Some(ix_cost) = BUILT_IN_INSTRUCTION_COSTS.get(program_id) { + saturating_add_assign!(static_builtin_cost_sum, *ix_cost); + } + } + + if self.compute_unit_limit() >= static_builtin_cost_sum { + Ok(()) + } else { + Err(PacketFilterFailure::InsufficientComputeLimit) + } + } + + /// Returns ok if the number of precompile signature verifications + /// performed by the transaction is not excessive. + pub fn check_excessive_precompiles(&self) -> Result<(), PacketFilterFailure> { + let mut num_precompile_signatures: u64 = 0; + for (program_id, ix) in self.transaction().get_message().program_instructions_iter() { + if secp256k1_program::check_id(program_id) || ed25519_program::check_id(program_id) { + let num_signatures = ix.data.first().map_or(0, |byte| u64::from(*byte)); + saturating_add_assign!(num_precompile_signatures, num_signatures); + } + } + + const MAX_ALLOWED_PRECOMPILE_SIGNATURES: u64 = 8; + if num_precompile_signatures <= MAX_ALLOWED_PRECOMPILE_SIGNATURES { + Ok(()) + } else { + Err(PacketFilterFailure::ExcessivePrecompiles) + } + } +} diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 4586650ee234ef..65c0d5816472c4 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -49,7 +49,11 @@ impl PacketReceiver { .receive_packets( recv_timeout, unprocessed_transaction_storage.max_receive_size(), - |packet| packet.compute_unit_limit_above_static_builtins(), + |packet| { + packet.check_insufficent_compute_unit_limit()?; + packet.check_excessive_precompiles()?; + Ok(packet) + }, ) // Consumes results if Ok, otherwise we keep the Err .map(|receive_packet_results| { @@ -98,8 +102,7 @@ impl PacketReceiver { ReceivePacketResults { deserialized_packets, new_tracer_stats_option, - passed_sigverify_count, - failed_sigverify_count, + packet_stats, }: ReceivePacketResults, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, @@ -109,14 +112,11 @@ impl PacketReceiver { let packet_count = deserialized_packets.len(); debug!("@{:?} txs: {} id: {}", timestamp(), packet_count, self.id); + slot_metrics_tracker.increment_received_packet_counts(packet_stats); if let Some(new_sigverify_stats) = &new_tracer_stats_option { tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats); } - // Track all the packets incoming from sigverify, both valid and invalid - slot_metrics_tracker.increment_total_new_valid_packets(passed_sigverify_count); - slot_metrics_tracker.increment_newly_failed_sigverify_count(failed_sigverify_count); - let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; let mut newly_buffered_forwarded_packets_count = 0; diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 0c8b209b4120ff..30e20a5207b579 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -438,7 +438,10 @@ impl SchedulerController { let (received_packet_results, receive_time_us) = measure_us!(self .packet_receiver - .receive_packets(recv_timeout, remaining_queue_capacity, |_| true)); + .receive_packets(recv_timeout, remaining_queue_capacity, |packet| { + packet.check_excessive_precompiles()?; + Ok(packet) + })); self.timing_metrics.update(|timing_metrics| { saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us);