Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide abstraction for a channel #160

Merged
merged 31 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ff89dbb
Add WebRtcChannel
garryod Mar 19, 2023
44abe21
Remove redundant as_mut
garryod Mar 19, 2023
935e61e
Re-add new_senders_and_receivers
garryod Mar 19, 2023
305bd03
Return result from channel getters
garryod Mar 19, 2023
d5d557d
Refactor WebRtcChannel creation
garryod Mar 19, 2023
dca79b5
Remove returns None from channel getter docstrings
garryod Mar 20, 2023
a307cae
Remove option hack from bevy ggrs example
garryod Mar 24, 2023
a6b0608
Make WebRtcSocket generic over channels contents
garryod Mar 24, 2023
84c2470
Remove superfluous trait
garryod Mar 24, 2023
fda1983
Simplify one-channel usage
johanhelsing Mar 24, 2023
30d9e79
Remove explicit type from simple example
garryod Mar 24, 2023
7786a86
Add default type to WebRtcSocketBuilder
garryod Mar 24, 2023
de06b92
Promote WebRtcSocketBuilder when adding channels
garryod Mar 24, 2023
3f825da
Simplify add_channel for multiple channels
garryod Mar 24, 2023
a862797
Add doctest to take_channel
garryod Mar 24, 2023
3b7fa34
Clean up docs
garryod Mar 24, 2023
ec9171b
Add doctest to channel
garryod Mar 24, 2023
b0e9003
Use ChannelPlurality trait for type state
garryod Mar 24, 2023
ccb891d
Add trait to tag buildabe quantities
garryod Mar 24, 2023
e975543
Allow channel access for single channel sockets
garryod Mar 24, 2023
a3a9ce6
Factor out socket config
garryod Mar 24, 2023
4531492
Allow basic method access regardless of channels
garryod Mar 24, 2023
6fdfdb4
Use deref in bevy ggrs example
garryod Mar 24, 2023
751c77f
Specify mutable reference for channel getter
garryod Mar 25, 2023
43a045a
Fix spelling in WebRtcChannel docstring
garryod Mar 25, 2023
377246a
Correctly reference WebRtcChannel in WebRtcSocket docstring
garryod Mar 25, 2023
1ced9e4
Fix spelling in NoChannels docstring
garryod Mar 25, 2023
ee8640d
Fix spelling in SingleChannel docstring
garryod Mar 25, 2023
a0a6c4a
Fix spelling in MultipleChannels docstring
garryod Mar 25, 2023
e28acd0
Use unreachable for no channel check
garryod Mar 25, 2023
02d7107
Improve get channel error naming
garryod Mar 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions examples/bevy_ggrs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bevy::{log::LogPlugin, prelude::*, tasks::IoTaskPool};
use bevy_ggrs::{GGRSPlugin, Session};
use ggrs::SessionBuilder;
use matchbox_socket::{PeerState, WebRtcSocket};
use matchbox_socket::{PeerState, SingleChannel, WebRtcSocket};

mod args;
mod box_game;
Expand All @@ -20,8 +20,8 @@ enum AppState {

const SKY_COLOR: Color = Color::rgb(0.69, 0.69, 0.69);

#[derive(Default, Resource)]
struct SocketResource(Option<WebRtcSocket>);
#[derive(Debug, Resource, Deref, DerefMut)]
struct SocketResource(WebRtcSocket<SingleChannel>);

fn main() {
// read query string or command line arguments
Expand Down Expand Up @@ -98,7 +98,7 @@ fn start_matchbox_socket(mut commands: Commands, args: Res<Args>) {
let task_pool = IoTaskPool::get();
task_pool.spawn(message_loop).detach();

commands.insert_resource(SocketResource(Some(socket)));
commands.insert_resource(SocketResource(socket));
}

// Marker components for UI
Expand Down Expand Up @@ -160,15 +160,15 @@ fn lobby_system(
mut query: Query<&mut Text, With<LobbyText>>,
) {
// regularly call update_peers to update the list of connected peers
for (peer, new_state) in socket.0.as_mut().unwrap().update_peers() {
for (peer, new_state) in socket.update_peers() {
// you can also handle the specific dis(connections) as they occur:
match new_state {
PeerState::Connected => info!("peer {peer:?} connected"),
PeerState::Disconnected => info!("peer {peer:?} disconnected"),
}
}

let connected_peers = socket.0.as_ref().unwrap().connected_peers().count();
let connected_peers = socket.connected_peers().count();
garryod marked this conversation as resolved.
Show resolved Hide resolved
let remaining = args.players - (connected_peers + 1);
query.single_mut().sections[0].value = format!("Waiting for {remaining} more player(s)",);
if remaining > 0 {
Expand All @@ -177,9 +177,6 @@ fn lobby_system(

info!("All peers have joined, going in-game");

// consume the socket (currently required because ggrs takes ownership of its socket)
let socket = socket.0.take().unwrap();

// extract final player list
let players = socket.players();

Expand All @@ -199,9 +196,11 @@ fn lobby_system(
.expect("failed to add player");
}

let channel = socket.take_channel(0).unwrap();

// start the GGRS session
let sess = sess_build
.start_p2p_session(socket)
.start_p2p_session(channel)
.expect("failed to start session");

commands.insert_resource(Session::P2PSession(sess));
Expand Down
54 changes: 35 additions & 19 deletions matchbox_socket/src/ggrs_socket.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
use ggrs::{Message, PlayerType};
use matchbox_protocol::PeerId;

use crate::{ChannelConfig, MessageLoopFuture, WebRtcSocket, WebRtcSocketBuilder};
use crate::{
ChannelConfig, MessageLoopFuture, Packet, SingleChannel, WebRtcChannel, WebRtcSocket,
WebRtcSocketBuilder,
};

impl ChannelConfig {
/// Creates a [`ChannelConfig`] suitable for use with GGRS.
pub fn ggrs() -> Self {
Self::unreliable()
}
}

impl WebRtcSocket {
/// Creates a [`WebRtcSocket`] and the corresponding [`MessageLoopFuture`] for a
Expand All @@ -11,12 +21,16 @@ impl WebRtcSocket {
/// be sent and received.
///
/// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets.
pub fn new_ggrs(room_url: impl Into<String>) -> (WebRtcSocket, MessageLoopFuture) {
pub fn new_ggrs(
room_url: impl Into<String>,
) -> (WebRtcSocket<SingleChannel>, MessageLoopFuture) {
WebRtcSocketBuilder::new(room_url)
.add_ggrs_channel()
.add_channel(ChannelConfig::ggrs())
.build()
}
}

impl WebRtcSocket {
/// Returns a Vec of connected peers as [`ggrs::PlayerType`]
pub fn players(&self) -> Vec<PlayerType<PeerId>> {
let Some(our_id) = self.id() else {
Expand Down Expand Up @@ -44,27 +58,29 @@ impl WebRtcSocket {
}
}

impl ggrs::NonBlockingSocket<PeerId> for WebRtcSocket {
fn build_packet(msg: &Message) -> Packet {
bincode::serialize(&msg).unwrap().into_boxed_slice()
}

fn deserialize_packet(message: (PeerId, Packet)) -> (PeerId, Message) {
(message.0, bincode::deserialize(&message.1).unwrap())
}

impl ggrs::NonBlockingSocket<PeerId> for WebRtcSocket<SingleChannel> {
fn send_to(&mut self, msg: &Message, addr: &PeerId) {
let buf = bincode::serialize(&msg).unwrap();
let packet = buf.into_boxed_slice();
self.send(packet, *addr);
self.send(build_packet(msg), *addr);
}

fn receive_all_messages(&mut self) -> Vec<(PeerId, Message)> {
let mut messages = vec![];
for (id, packet) in self.receive().into_iter() {
let msg = bincode::deserialize(&packet).unwrap();
messages.push((id, msg));
}
messages
self.receive().into_iter().map(deserialize_packet).collect()
}
}

impl WebRtcSocketBuilder {
/// Adds a new channel configured correctly for usage with GGRS to the [`WebRtcSocket`].
pub fn add_ggrs_channel(mut self) -> Self {
self.channels.push(ChannelConfig::unreliable());
self
impl ggrs::NonBlockingSocket<PeerId> for WebRtcChannel {
fn send_to(&mut self, msg: &Message, addr: &PeerId) {
self.send(build_packet(msg), *addr);
}

fn receive_all_messages(&mut self) -> Vec<(PeerId, Message)> {
self.receive().into_iter().map(deserialize_packet).collect()
}
}
3 changes: 2 additions & 1 deletion matchbox_socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod webrtc_socket;
pub use error::Error;
pub use matchbox_protocol::PeerId;
pub use webrtc_socket::{
ChannelConfig, MessageLoopFuture, Packet, PeerState, RtcIceServerConfig, WebRtcSocket,
BuildablePlurality, ChannelConfig, ChannelPlurality, MessageLoopFuture, MultipleChannels,
NoChannels, Packet, PeerState, RtcIceServerConfig, SingleChannel, WebRtcChannel, WebRtcSocket,
WebRtcSocketBuilder,
};
8 changes: 8 additions & 0 deletions matchbox_socket/src/webrtc_socket/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ use crate::webrtc_socket::messages::PeerEvent;
use cfg_if::cfg_if;
use futures_channel::mpsc::TrySendError;

#[derive(Debug, thiserror::Error)]
pub enum GetChannelError {
#[error("This channel was never created")]
NotFound,
#[error("This channel has already been taken and is no longer on the socket")]
Taken,
}

/// An error that can occur with WebRTC signalling.
#[derive(Debug, thiserror::Error)]
pub enum SignallingError {
Expand Down
15 changes: 9 additions & 6 deletions matchbox_socket/src/webrtc_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod messages;
mod signal_peer;
mod socket;

use self::error::{MessagingError, SignallingError};
use crate::{webrtc_socket::signal_peer::SignalPeer, Error};
use async_trait::async_trait;
use cfg_if::cfg_if;
Expand All @@ -14,11 +15,12 @@ use log::{debug, warn};
use matchbox_protocol::PeerId;
use messages::*;
pub(crate) use socket::MessageLoopChannels;
pub use socket::{ChannelConfig, PeerState, RtcIceServerConfig, WebRtcSocket, WebRtcSocketBuilder};
pub use socket::{
BuildablePlurality, ChannelConfig, ChannelPlurality, MultipleChannels, NoChannels, PeerState,
RtcIceServerConfig, SingleChannel, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
};
use std::{collections::HashMap, pin::Pin, time::Duration};

use self::error::{MessagingError, SignallingError};

cfg_if! {
if #[cfg(target_arch = "wasm32")] {
mod wasm;
Expand Down Expand Up @@ -126,7 +128,8 @@ trait Messenger {

async fn message_loop<M: Messenger>(
id_tx: crossbeam_channel::Sender<PeerId>,
config: WebRtcSocketBuilder,
ice_server_config: &RtcIceServerConfig,
channel_configs: &[ChannelConfig],
channels: MessageLoopChannels,
) {
let MessageLoopChannels {
Expand Down Expand Up @@ -170,14 +173,14 @@ async fn message_loop<M: Messenger>(
let (signal_tx, signal_rx) = futures_channel::mpsc::unbounded();
handshake_signals.insert(peer_uuid, signal_tx);
let signal_peer = SignalPeer::new(peer_uuid, requests_sender.clone());
handshakes.push(M::offer_handshake(signal_peer, signal_rx, messages_from_peers_tx.clone(), &config.ice_server, &config.channels))
handshakes.push(M::offer_handshake(signal_peer, signal_rx, messages_from_peers_tx.clone(), ice_server_config, channel_configs))
},
PeerEvent::PeerLeft(peer_uuid) => {peer_state_tx.unbounded_send((peer_uuid, PeerState::Disconnected)).expect("fail to report peer as disconnected");},
PeerEvent::Signal { sender, data } => {
handshake_signals.entry(sender).or_insert_with(|| {
let (from_peer_tx, peer_signal_rx) = futures_channel::mpsc::unbounded();
let signal_peer = SignalPeer::new(sender, requests_sender.clone());
handshakes.push(M::accept_handshake(signal_peer, peer_signal_rx, messages_from_peers_tx.clone(), &config.ice_server, &config.channels));
handshakes.push(M::accept_handshake(signal_peer, peer_signal_rx, messages_from_peers_tx.clone(), ice_server_config, channel_configs));
from_peer_tx
}).unbounded_send(data).expect("failed to forward signal to handshaker");
},
Expand Down
Loading