Skip to content

Commit

Permalink
removes outdated check for merkle shreds (#33088)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1431275)

# Conflicts:
#	core/src/cluster_nodes.rs
#	core/src/shred_fetch_stage.rs
  • Loading branch information
behzadnouri authored and mergify[bot] committed Sep 4, 2023
1 parent 0c211bd commit 5d0cb5a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 51 deletions.
4 changes: 4 additions & 0 deletions core/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,15 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool

// Returns true if the feature is effective for the shred slot.
#[must_use]
<<<<<<< HEAD:core/src/cluster_nodes.rs
pub(crate) fn check_feature_activation(
feature: &Pubkey,
shred_slot: Slot,
root_bank: &Bank,
) -> bool {
=======
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
>>>>>>> 1431275328 (removes outdated check for merkle shreds (#33088)):turbine/src/cluster_nodes.rs
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
Expand Down
80 changes: 57 additions & 23 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
<<<<<<< HEAD
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
feature_set,
=======
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags, PACKETS_PER_BATCH},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::DEFAULT_MS_PER_SLOT,
packet::{Meta, PACKET_DATA_SIZE},
pubkey::Pubkey,
>>>>>>> 1431275328 (removes outdated check for merkle shreds (#33088))
},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
Expand Down Expand Up @@ -85,19 +94,10 @@ impl ShredFetchStage {

// Limit shreds to 2 epochs away.
let max_slot = last_slot + 2 * slots_per_epoch;
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut().filter(|p| !p.meta().discard()) {
if turbine_disabled
|| should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
should_drop_merkle_shreds,
&mut stats,
)
|| should_discard_shred(packet, last_root, max_slot, shred_version, &mut stats)
{
packet.meta_mut().set_discard(true);
} else {
Expand Down Expand Up @@ -232,6 +232,7 @@ impl ShredFetchStage {
}
}

<<<<<<< HEAD
#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
Expand All @@ -243,6 +244,51 @@ fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
shred_slot,
root_bank,
)
=======
fn receive_quic_datagrams(
quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
sender: Sender<PacketBatch>,
recycler: PacketBatchRecycler,
exit: Arc<AtomicBool>,
) {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
while !exit.load(Ordering::Relaxed) {
let entry = match quic_endpoint_receiver.recv_timeout(RECV_TIMEOUT) {
Ok(entry) => entry,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
};
let mut packet_batch =
PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams");
unsafe {
packet_batch.set_len(PACKETS_PER_BATCH);
};
let deadline = Instant::now() + PACKET_COALESCE_DURATION;
let entries = std::iter::once(entry).chain(
std::iter::repeat_with(|| quic_endpoint_receiver.recv_deadline(deadline).ok())
.while_some(),
);
let size = entries
.filter(|(_, _, bytes)| bytes.len() <= PACKET_DATA_SIZE)
.zip(packet_batch.iter_mut())
.map(|((_pubkey, addr, bytes), packet)| {
*packet.meta_mut() = Meta {
size: bytes.len(),
addr: addr.ip(),
port: addr.port(),
flags: PacketFlags::empty(),
};
packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes);
})
.count();
if size > 0 {
packet_batch.truncate(size);
if sender.send(packet_batch).is_err() {
return;
}
}
}
>>>>>>> 1431275328 (removes outdated check for merkle shreds (#33088))
}

#[cfg(test)]
Expand Down Expand Up @@ -285,7 +331,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
Expand All @@ -299,7 +344,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
}
Expand All @@ -321,7 +365,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.index_overrun, 1);
Expand All @@ -343,18 +386,12 @@ mod tests {
3,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);

assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
&packet, last_root, max_slot, /*shred_version:*/ 345, &mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);

Expand All @@ -364,7 +401,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

Expand All @@ -386,7 +422,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

Expand All @@ -398,7 +433,6 @@ mod tests {
last_root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
}
Expand Down
18 changes: 0 additions & 18 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,6 @@ pub fn should_discard_shred(
root: Slot,
max_slot: Slot,
shred_version: u16,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
debug_assert!(root < max_slot);
Expand Down Expand Up @@ -984,15 +983,9 @@ pub fn should_discard_shred(
match shred_variant {
ShredVariant::LegacyCode | ShredVariant::LegacyData => (),
ShredVariant::MerkleCode(_) => {
if should_drop_merkle_shreds(slot) {
return true;
}
stats.num_shreds_merkle_code = stats.num_shreds_merkle_code.saturating_add(1);
}
ShredVariant::MerkleData(_) => {
if should_drop_merkle_shreds(slot) {
return true;
}
stats.num_shreds_merkle_data = stats.num_shreds_merkle_data.saturating_add(1);
}
}
Expand Down Expand Up @@ -1192,7 +1185,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats, ShredFetchStats::default());
Expand All @@ -1203,7 +1195,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 1);
Expand All @@ -1214,7 +1205,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 2);
Expand All @@ -1225,7 +1215,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 3);
Expand All @@ -1236,7 +1225,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.index_overrun, 4);
Expand All @@ -1247,7 +1235,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(stats.bad_parent_offset, 1);
Expand All @@ -1268,7 +1255,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));

Expand All @@ -1288,7 +1274,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.index_out_of_bounds);
Expand All @@ -1309,7 +1294,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX;
Expand All @@ -1319,7 +1303,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
Expand All @@ -1331,7 +1314,6 @@ mod tests {
root,
max_slot,
shred_version,
|_| false, // should_drop_merkle_shreds
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
Expand Down
10 changes: 0 additions & 10 deletions sdk/src/feature_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,6 @@ pub mod disable_turbine_fanout_experiments {
solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa");
}

pub mod drop_merkle_shreds {
solana_sdk::declare_id!("84zy5N23Q9vTZuLc9h1HWUtyM9yCFV2SCmyP9W9C3yHZ");
}

pub mod keep_merkle_shreds {
solana_sdk::declare_id!("HyNQzc7TMNmRhpVHXqDGjpsHzeQie82mDQXSF9hj7nAH");
}

pub mod move_serialized_len_ptr_in_cpi {
solana_sdk::declare_id!("74CoWuBmt3rUVUrCb2JiSTvh6nXyBWUsK4SaMj3CtE3T");
}
Expand Down Expand Up @@ -821,8 +813,6 @@ lazy_static! {
(commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"),
(enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"),
(disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"),
(drop_merkle_shreds::id(), "drop merkle shreds #29711"),
(keep_merkle_shreds::id(), "keep merkle shreds #29711"),
(move_serialized_len_ptr_in_cpi::id(), "cpi ignore serialized_len_ptr #29592"),
(update_hashes_per_tick::id(), "Update desired hashes per tick on epoch boundary"),
(enable_big_mod_exp_syscall::id(), "add big_mod_exp syscall #28503"),
Expand Down

0 comments on commit 5d0cb5a

Please sign in to comment.