diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 9afceeb35a8ba5..9d73c2a0a2b6e4 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -154,6 +154,7 @@ fn handle_chunk( remote_addr: &SocketAddr, packet_sender: &Sender, stats: Arc, + stake: u64, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -178,6 +179,7 @@ fn handle_chunk( let mut batch = PacketBatch::with_capacity(1); let mut packet = Packet::default(); packet.meta.set_addr(remote_addr); + packet.meta.sender_stake = stake; batch.packets.push(packet); *maybe_batch = Some(batch); stats @@ -433,6 +435,7 @@ fn handle_connection( connection_table: Arc>, stream_exit: Arc, stats: Arc, + stake: u64, ) { tokio::spawn(async move { debug!( @@ -455,6 +458,7 @@ fn handle_connection( &remote_addr, &packet_sender, stats.clone(), + stake, ) { last_update.store(timing::timestamp(), Ordering::Relaxed); break; @@ -535,21 +539,21 @@ pub fn spawn_server( let remote_addr = connection.remote_address(); - let mut connection_table_l = - if staked_nodes.read().unwrap().contains_key(&remote_addr.ip()) { - let mut connection_table_l = - staked_connection_table.lock().unwrap(); - let num_pruned = - connection_table_l.prune_oldest(max_staked_connections); - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - connection_table_l - } else { - let mut connection_table_l = connection_table.lock().unwrap(); - let num_pruned = - connection_table_l.prune_oldest(max_unstaked_connections); - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - connection_table_l - }; + let (mut connection_table_l, stake) = if let Some(stake) = + staked_nodes.read().unwrap().get(&remote_addr.ip()) + { + let mut connection_table_l = staked_connection_table.lock().unwrap(); + let num_pruned = + connection_table_l.prune_oldest(max_staked_connections); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + (connection_table_l, *stake) + } else { + let mut connection_table_l = connection_table.lock().unwrap(); + let num_pruned = + connection_table_l.prune_oldest(max_unstaked_connections); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + (connection_table_l, 0) + }; if let Some((last_update, stream_exit)) = connection_table_l .try_add_connection( @@ -570,6 +574,7 @@ pub fn spawn_server( connection_table1, stream_exit, stats, + stake, ); } else { stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);