Skip to content

Commit

Permalink
v1.18: BankingStage Forwarding Filter (backport of solana-labs#685) (s…
Browse files Browse the repository at this point in the history
…olana-labs#697)

* BankingStage Forwarding Filter (solana-labs#685)

* add PacketFlags::FROM_STAKED_NODE

* Only forward packets from staked node

* fix local-cluster test forwarding

* review comment

* tpu_votes get marked as from_staked_node

(cherry picked from commit 1744e9e)

# Conflicts:
#	sdk/src/packet.rs

* resolve conflict

* revert: local-cluster test changes

---------

Co-authored-by: Andrew Fitzgerald <[email protected]>
Co-authored-by: Trent Nelson <[email protected]>
  • Loading branch information
3 people authored Apr 11, 2024
1 parent 77c6121 commit 7e9a53e
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 1 deletion.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn main() -> Result<()> {
Duration::from_millis(1), // coalesce
true,
None,
false,
));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Forwarder {
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| p.meta().is_from_staked_node())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
Expand Down
3 changes: 3 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -190,6 +191,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -210,6 +212,7 @@ impl FetchStage {
coalesce,
true,
None,
true, // only staked connections should be voting
)
})
.collect();
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);

let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
Expand Down Expand Up @@ -1302,6 +1303,7 @@ mod test {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl ServeRepairService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl ShredFetchStage {
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
false,
)
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl GossipService {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
Expand Down
12 changes: 12 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For marking packets from staked nodes
const FROM_STAKED_NODE = 0b1000_0000;
}
}

Expand Down Expand Up @@ -213,6 +215,11 @@ impl Meta {
self.port = socket_addr.port();
}

pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
self.flags
.set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
}

#[inline]
pub fn discard(&self) -> bool {
self.flags.contains(PacketFlags::DISCARD)
Expand Down Expand Up @@ -265,6 +272,11 @@ impl Meta {
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}

#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
}
}

impl Default for Meta {
Expand Down
1 change: 1 addition & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ async fn handle_chunk(
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_)));
*packet_accum = Some(PacketAccumulator {
meta,
chunks: SmallVec::new(),
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn recv_loop(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
Expand Down Expand Up @@ -147,7 +148,9 @@ fn recv_loop(
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}

packet_batch
.iter_mut()
.for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
packet_batch_sender.send(packet_batch)?;
}
break;
Expand All @@ -156,6 +159,7 @@ fn recv_loop(
}
}

#[allow(clippy::too_many_arguments)]
pub fn receiver(
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
Expand All @@ -165,6 +169,7 @@ pub fn receiver(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Expand All @@ -180,6 +185,7 @@ pub fn receiver(
coalesce,
use_pinned_memory,
in_vote_only_mode,
is_staked_service,
);
})
.unwrap()
Expand Down Expand Up @@ -488,6 +494,7 @@ mod test {
Duration::from_millis(1), // coalesce
true,
None,
false,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
Expand Down

0 comments on commit 7e9a53e

Please sign in to comment.