Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove cu price rounding #3047

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ impl BankingStage {
Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
Expand Down Expand Up @@ -566,7 +565,6 @@ impl BankingStage {
bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
Expand Down Expand Up @@ -631,8 +629,7 @@ impl BankingStage {

// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer =
PacketDeserializer::new(non_vote_receiver, bank_forks.clone());
let packet_deserializer = PacketDeserializer::new(non_vote_receiver);
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
Expand Down Expand Up @@ -660,15 +657,14 @@ impl BankingStage {
fn spawn_thread_local_multi_iterator_thread<T: LikeClusterInfo>(
id: u32,
packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
mut forwarder: Forwarder<T>,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
committer,
transaction_recorder,
Expand Down
32 changes: 4 additions & 28 deletions core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ use {
},
crossbeam_channel::RecvTimeoutError,
solana_perf::packet::PacketBatch,
solana_runtime::bank_forks::BankForks,
solana_sdk::saturating_add_assign,
std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
},
std::time::{Duration, Instant},
};

/// Results from deserializing packet batches.
Expand All @@ -33,8 +29,6 @@ pub struct ReceivePacketResults {
pub struct PacketDeserializer {
/// Receiver for packet batches from sigverify stage
packet_batch_receiver: BankingPacketReceiver,
/// Provides working bank for deserializer to check feature activation
bank_forks: Arc<RwLock<BankForks>>,
}

#[derive(Default, Debug, PartialEq)]
Expand Down Expand Up @@ -83,13 +77,9 @@ impl PacketReceiverStats {
}

impl PacketDeserializer {
pub fn new(
packet_batch_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
pub fn new(packet_batch_receiver: BankingPacketReceiver) -> Self {
Self {
packet_batch_receiver,
bank_forks,
}
}

Expand All @@ -104,15 +94,9 @@ impl PacketDeserializer {
) -> Result<ReceivePacketResults, RecvTimeoutError> {
let (packet_count, packet_batches) = self.receive_until(recv_timeout, capacity)?;

// Note: this can be removed after feature `round_compute_unit_price` is activated in
// mainnet-beta
let _working_bank = self.bank_forks.read().unwrap().working_bank();
let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set

Ok(Self::deserialize_and_collect_packets(
packet_count,
&packet_batches,
round_compute_unit_price_enabled,
packet_filter,
))
}
Expand All @@ -122,7 +106,6 @@ impl PacketDeserializer {
fn deserialize_and_collect_packets(
packet_count: usize,
banking_batches: &[BankingPacketBatch],
round_compute_unit_price_enabled: bool,
packet_filter: impl Fn(
ImmutableDeserializedPacket,
) -> Result<ImmutableDeserializedPacket, PacketFilterFailure>,
Expand All @@ -147,7 +130,6 @@ impl PacketDeserializer {
deserialized_packets.extend(Self::deserialize_packets(
packet_batch,
&packet_indexes,
round_compute_unit_price_enabled,
&mut packet_stats,
&packet_filter,
));
Expand Down Expand Up @@ -218,17 +200,13 @@ impl PacketDeserializer {
fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
round_compute_unit_price_enabled: bool,
packet_stats: &'a mut PacketReceiverStats,
packet_filter: &'a impl Fn(
ImmutableDeserializedPacket,
) -> Result<ImmutableDeserializedPacket, PacketFilterFailure>,
) -> impl Iterator<Item = ImmutableDeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
let mut packet_clone = packet_batch[*packet_index].clone();
packet_clone
.meta_mut()
.set_round_compute_unit_price(round_compute_unit_price_enabled);
let packet_clone = packet_batch[*packet_index].clone();

match ImmutableDeserializedPacket::new(packet_clone)
.and_then(|packet| packet_filter(packet).map_err(Into::into))
Expand Down Expand Up @@ -260,7 +238,7 @@ mod tests {

#[test]
fn test_deserialize_and_collect_packets_empty() {
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], false, Ok);
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], Ok);
assert_eq!(results.deserialized_packets.len(), 0);
assert!(results.new_tracer_stats_option.is_none());
assert_eq!(results.packet_stats.passed_sigverify_count, 0);
Expand All @@ -277,7 +255,6 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
Ok,
);
assert_eq!(results.deserialized_packets.len(), 2);
Expand All @@ -297,7 +274,6 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
Ok,
);
assert_eq!(results.deserialized_packets.len(), 1);
Expand Down
14 changes: 3 additions & 11 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ use {
crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats},
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_runtime::bank_forks::BankForks,
solana_sdk::{saturating_add_assign, timing::timestamp},
std::{
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
},
std::{sync::atomic::Ordering, time::Duration},
};

pub struct PacketReceiver {
Expand All @@ -23,14 +19,10 @@ pub struct PacketReceiver {
}

impl PacketReceiver {
pub fn new(
id: u32,
banking_packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
pub fn new(id: u32, banking_packet_receiver: BankingPacketReceiver) -> Self {
Self {
id,
packet_deserializer: PacketDeserializer::new(banking_packet_receiver, bank_forks),
packet_deserializer: PacketDeserializer::new(banking_packet_receiver),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,7 @@ mod tests {
let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone());

let (banking_packet_sender, banking_packet_receiver) = unbounded();
let packet_deserializer =
PacketDeserializer::new(banking_packet_receiver, bank_forks.clone());
let packet_deserializer = PacketDeserializer::new(banking_packet_receiver);

let (consume_work_senders, consume_work_receivers) = create_channels(num_threads);
let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded();
Expand Down
19 changes: 2 additions & 17 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ bitflags! {
const REPAIR = 0b0000_0100;
const SIMPLE_VOTE_TX = 0b0000_1000;
const TRACER_PACKET = 0b0001_0000;
/// to be set by bank.feature_set.is_active(round_compute_unit_price::id()) at the moment
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
// Previously used - this can now be re-used for something else.
const UNUSED = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to change it to 0010_000, so there would be no "gap"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are serialized in banking_trace data, so I think best to keep the meaning of each flag consistent.

ROUND_COMPUTE_UNIT_PRICE is actually an exception since that was populated only in banking_stage's copy of the Packet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not great to leave a gap in bit flags. If don't want to introduce a break change in banking_trace at the moment, how about tag 0b0010_0000 as UNUSED?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// For marking packets from staked nodes
Expand Down Expand Up @@ -250,14 +248,6 @@ impl Meta {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
}

#[inline]
pub fn set_round_compute_unit_price(&mut self, round_compute_unit_price: bool) {
self.flags.set(
PacketFlags::ROUND_COMPUTE_UNIT_PRICE,
round_compute_unit_price,
);
}

#[inline]
pub fn forwarded(&self) -> bool {
self.flags.contains(PacketFlags::FORWARDED)
Expand All @@ -283,11 +273,6 @@ impl Meta {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
}

#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}

#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
Expand Down
Loading