Skip to content

Commit

Permalink
Add sender stake to quic packets (#25054)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored May 7, 2022
1 parent d9deab4 commit 5be1388
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ fn handle_chunk(
remote_addr: &SocketAddr,
packet_sender: &Sender<PacketBatch>,
stats: Arc<StreamStats>,
stake: u64,
) -> bool {
match chunk {
Ok(maybe_chunk) => {
Expand All @@ -178,6 +179,7 @@ fn handle_chunk(
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta.set_addr(remote_addr);
packet.meta.sender_stake = stake;
batch.packets.push(packet);
*maybe_batch = Some(batch);
stats
Expand Down Expand Up @@ -433,6 +435,7 @@ fn handle_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
stake: u64,
) {
tokio::spawn(async move {
debug!(
Expand All @@ -455,6 +458,7 @@ fn handle_connection(
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
Expand Down Expand Up @@ -535,21 +539,26 @@ pub fn spawn_server(

let remote_addr = connection.remote_address();

let mut connection_table_l =
if staked_nodes.read().unwrap().contains_key(&remote_addr.ip()) {
let (mut connection_table_l, stake) = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.get(&remote_addr.ip()) {
let stake = *stake;
drop(staked_nodes);
let mut connection_table_l =
staked_connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
(connection_table_l, stake)
} else {
drop(staked_nodes);
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
};
(connection_table_l, 0)
}
};

if let Some((last_update, stream_exit)) = connection_table_l
.try_add_connection(
Expand All @@ -570,6 +579,7 @@ pub fn spawn_server(
connection_table1,
stream_exit,
stats,
stake,
);
} else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
Expand Down

0 comments on commit 5be1388

Please sign in to comment.