Skip to content

Commit

Permalink
feat: improve player disconnection handling and packet sending effici…
Browse files Browse the repository at this point in the history
…ency

Enhances the networking system with better disconnection handling and packet sending optimizations:

- Add `PlayerDisconnectReason` enum to provide detailed disconnect reasons like `CouldNotKeepUp` and `LostConnection`
- Implement memory protection by adding `MAX_PLAYER_PENDING_MESSAGES` (1024) limit
- Replace `is_flush()` string comparison with more efficient constant check using `order == u32::MAX`
- Add new `DataBundle` to optimize packet sending by batching multiple packets together
- Improve error handling in `egress.rs` by properly removing disconnected players when send operations fail
- Replace individual packet sends with batched sends in chunk synchronization code

Breaking changes:
- Changed `PlayerDisconnect` to be generic over lifetime `'a`
- Modified flush packet implementation in `OrderedBytes`
  • Loading branch information
andrewgazelka committed Oct 30, 2024
1 parent 0e2fd90 commit 5c8bda8
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 135 deletions.
15 changes: 13 additions & 2 deletions crates/hyperion-proto/src/proxy_to_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@ pub struct PlayerConnect {
}

#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq, Debug)]
pub struct PlayerDisconnect {
pub struct PlayerDisconnect<'a> {
pub stream: u64,
pub reason: PlayerDisconnectReason<'a>,
}

#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq, Debug)]
#[non_exhaustive]
pub enum PlayerDisconnectReason<'a> {
/// If cannot receive packets fast enough
CouldNotKeepUp,
LostConnection,

Other(#[rkyv(with = InlineAsBox)] &'a str),
}

#[derive(Archive, Deserialize, Serialize, Clone, PartialEq, Debug)]
pub enum ProxyToServerMessage<'a> {
PlayerConnect(PlayerConnect),
PlayerDisconnect(PlayerDisconnect),
PlayerDisconnect(PlayerDisconnect<'a>),
PlayerPackets(PlayerPackets<'a>),
}
8 changes: 4 additions & 4 deletions crates/hyperion-proxy/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ impl OrderedBytes {
exclusions: None,
};
pub const FLUSH: Self = Self {
order: 0,
order: u32::MAX,
offset: 0,
data: Bytes::from_static(b"flush"),
data: Bytes::from_static(b""),
exclusions: None,
};

pub fn is_flush(&self) -> bool {
self.data.as_ref() == b"flush" // todo: this is REALLY jank let's maybe not do this
pub const fn is_flush(&self) -> bool {
self.order == u32::MAX
}

pub const fn no_order(data: Bytes) -> Self {
Expand Down
66 changes: 52 additions & 14 deletions crates/hyperion-proxy/src/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Egress {

// imo it makes sense to read once... it is a fast loop
#[allow(clippy::significant_drop_in_scrutinee)]
for player in players.values() {
for (player_id, player) in &players {
if !player.can_receive_broadcasts.load(Ordering::Relaxed) {
continue;
}
Expand All @@ -94,12 +94,21 @@ impl Egress {
exclusions.clone(),
);

if let Err(e) = player.writer.try_send(to_send) {
warn!("Failed to send data to player: {:?}", e);
match player.writer.try_send(to_send) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
players.remove(player_id);
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
players.remove(player_id);
}
}
}
}
.instrument(info_span!("broadcast_global_task")),
.instrument(info_span!("broadcast_global_task")),
)
.unwrap();
}
Expand All @@ -110,13 +119,22 @@ impl Egress {

tokio::spawn(
async move {
for player in players.values() {
if let Err(e) = player.writer.try_send(OrderedBytes::FLUSH) {
warn!("Failed to send data to player: {:?}", e);
for (id, player) in &players {
match player.writer.try_send(OrderedBytes::FLUSH) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
players.remove(id);
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
players.remove(id);
}
}
}
}
.instrument(info_span!("flush_task")),
.instrument(info_span!("flush_task")),
);
}

Expand Down Expand Up @@ -164,18 +182,29 @@ impl Egress {

let data = data.slice(start..end);

if let Err(e) = player.writer.try_send(OrderedBytes {
let to_send = OrderedBytes {
order,
offset: slice.start,
data,
exclusions: Some(exclusions.clone()),
}) {
warn!("Failed to send data to player: {:?}", e);
};

match player.writer.try_send(to_send) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
players.remove(id);
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
players.remove(id);
}
}
}
}
}
.instrument(info_span!("broadcast_local_task")),
.instrument(info_span!("broadcast_local_task")),
)
.unwrap();
}
Expand Down Expand Up @@ -205,8 +234,17 @@ impl Egress {
};

// todo: handle error; kick player if cannot send (buffer full)
if let Err(e) = player.writer.try_send(ordered) {
warn!("Failed to send data to player: {:?}", e);
match player.writer.try_send(ordered) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
players.remove(&id);
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
players.remove(&id);
}
}

drop(players);
Expand Down
30 changes: 17 additions & 13 deletions crates/hyperion-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ use crate::{
server_sender::launch_server_writer,
};

/// 4 KiB
const DEFAULT_BUFFER_SIZE: usize = 4 * 1024;

/// Maximum number of pending messages in a player's communication channel.
/// If this limit is exceeded, the player will be disconnected to prevent
/// memory exhaustion from slow or unresponsive clients.
const MAX_PLAYER_PENDING_MESSAGES: usize = 1_024;

pub mod cache;
pub mod data;
pub mod egress;
Expand Down Expand Up @@ -141,12 +147,11 @@ async fn connect_to_server_and_run_proxy(
tokio::task::Builder::new()
.name("s2prox")
.spawn({
let mut shutdown_rx = shutdown_rx.clone();
let mut shutdown_rx = shutdown_rx.clone();

async move {

loop {
tokio::select! {
async move {
loop {
tokio::select! {
_ = shutdown_rx.wait_for(Option::is_some) => return,
result = handler.handle_next() => {
match result {
Expand All @@ -162,14 +167,14 @@ async fn connect_to_server_and_run_proxy(
}
}
}
}
}

debug!("Sending shutdown to all players");
debug!("Sending shutdown to all players");

shutdown_tx.send(Some(ShutdownType::Reconnect)).unwrap();
}
.instrument(info_span!("server_reader_loop"))
}).unwrap();
shutdown_tx.send(Some(ShutdownType::Reconnect)).unwrap();
}
.instrument(info_span!("server_reader_loop"))
}).unwrap();

// 0 is reserved for "None" value
let mut player_id_on = 1;
Expand All @@ -189,8 +194,7 @@ async fn connect_to_server_and_run_proxy(
let registry = player_registry.pin();

// todo: re-add bounding but issues if have MASSIVE number of packets
// let (tx, rx) = kanal::bounded_async(1024);
let (tx, rx) = kanal::unbounded_async();
let (tx, rx) = kanal::bounded_async(MAX_PLAYER_PENDING_MESSAGES);
registry.insert(player_id_on, PlayerHandle {
writer: tx,
can_receive_broadcasts: AtomicBool::new(false),
Expand Down
8 changes: 5 additions & 3 deletions crates/hyperion-proxy/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use std::io::IoSlice;

use hyperion_proto::{
ChunkPosition, PlayerConnect, PlayerDisconnect, PlayerPackets, ProxyToServerMessage,
ChunkPosition, PlayerConnect, PlayerDisconnect, PlayerDisconnectReason, PlayerPackets,
ProxyToServerMessage,
};
use rkyv::ser::allocator::Arena;
use rustc_hash::FxBuildHasher;
Expand Down Expand Up @@ -64,7 +65,7 @@ pub fn initiate_player_connection(
)
.unwrap();

server_sender.try_send(connect).unwrap();
server_sender.send(connect).await.unwrap();

let mut arena = Arena::new();

Expand Down Expand Up @@ -98,7 +99,7 @@ pub fn initiate_player_connection(

read_buffer.clear();

if let Err(e) = server_sender.try_send(aligned_vec) {
if let Err(e) = server_sender.send(aligned_vec).await {
warn!("Error forwarding player packets to server: {e:?}");
panic!("Error forwarding player packets to server: {e:?}");
}
Expand Down Expand Up @@ -150,6 +151,7 @@ pub fn initiate_player_connection(
let disconnect = rkyv::to_bytes::<rkyv::rancor::Error>(
&ProxyToServerMessage::PlayerDisconnect(PlayerDisconnect {
stream: player_id,
reason: PlayerDisconnectReason::LostConnection,
})).unwrap();

let map_ref = player_registry.pin_owned();
Expand Down
Loading

0 comments on commit 5c8bda8

Please sign in to comment.