Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Add sender stake to quic packets
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 6, 2022
1 parent df9a4a0 commit 7bf33d4
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ fn handle_chunk(
remote_addr: &SocketAddr,
packet_sender: &Sender<PacketBatch>,
stats: Arc<StreamStats>,
stake: u64,
) -> bool {
match chunk {
Ok(maybe_chunk) => {
Expand All @@ -178,6 +179,7 @@ fn handle_chunk(
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta.set_addr(remote_addr);
packet.meta.sender_stake = stake;
batch.packets.push(packet);
*maybe_batch = Some(batch);
stats
Expand Down Expand Up @@ -433,6 +435,7 @@ fn handle_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
stake: u64,
) {
tokio::spawn(async move {
debug!(
Expand All @@ -455,6 +458,7 @@ fn handle_connection(
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
Expand Down Expand Up @@ -535,21 +539,21 @@ pub fn spawn_server(

let remote_addr = connection.remote_address();

let mut connection_table_l =
if staked_nodes.read().unwrap().contains_key(&remote_addr.ip()) {
let mut connection_table_l =
staked_connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
} else {
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
};
let (mut connection_table_l, stake) = if let Some(stake) =
staked_nodes.read().unwrap().get(&remote_addr.ip())
{
let mut connection_table_l = staked_connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
(connection_table_l, *stake)
} else {
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
(connection_table_l, 0)
};

if let Some((last_update, stream_exit)) = connection_table_l
.try_add_connection(
Expand All @@ -570,6 +574,7 @@ pub fn spawn_server(
connection_table1,
stream_exit,
stats,
stake,
);
} else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
Expand Down

0 comments on commit 7bf33d4

Please sign in to comment.