Skip to content

Commit

Permalink
cleanup server code
Browse files Browse the repository at this point in the history
  • Loading branch information
feschber committed Dec 15, 2023
1 parent 0c275bc commit 5fc02d4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 63 deletions.
1 change: 0 additions & 1 deletion src/backend/producer/wayland.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,6 @@ impl Stream for WaylandEventProducer {
type Item = io::Result<(ClientHandle, Event)>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
log::trace!("producer.next()");
if let Some(event) = self.0.get_mut().state.pending_events.pop_front() {
return Poll::Ready(Some(Ok(event)));
}
Expand Down
2 changes: 0 additions & 2 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ impl FrontendListener {

#[cfg(unix)]
pub async fn accept(&mut self) -> Result<ReadHalf<UnixStream>> {
log::trace!("frontend.accept()");

let stream = self.listener.accept().await?.0;
let (rx, tx) = tokio::io::split(stream);
self.tx_streams.push(tx);
Expand Down
91 changes: 31 additions & 60 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use crate::{
producer::EventProducer,
};

/// keeps track of state to prevent a feedback loop
/// of continuously sending and receiving the same event.
#[derive(Eq, PartialEq)]
enum State {
Sending,
Expand Down Expand Up @@ -93,19 +91,16 @@ impl Server {

pub async fn run(&mut self) -> anyhow::Result<()> {
loop {
log::trace!("polling ...");
tokio::select! {
// safety: cancellation safe
udp_event = receive_event(&self.socket) => {
log::trace!("-> receive_event");
match udp_event {
Ok(e) => self.handle_udp_rx(e).await,
Err(e) => log::error!("error reading event: {e}"),
}
}
// safety: cancellation safe
res = self.producer.next() => {
log::trace!("-> producer.next()");
match res {
Some(Ok((client, event))) => {
self.handle_producer_event(client,event).await;
Expand All @@ -116,15 +111,13 @@ impl Server {
}
// safety: cancellation safe
stream = self.frontend.accept() => {
log::trace!("-> frontend.accept()");
match stream {
Ok(s) => self.handle_frontend_stream(s).await,
Err(e) => log::error!("error connecting to frontend: {e}"),
}
}
// safety: cancellation safe
frontend_event = self.frontend_rx.recv() => {
log::trace!("-> frontend.recv()");
if let Some(event) = frontend_event {
if self.handle_frontend_event(event).await {
break;
Expand All @@ -133,7 +126,6 @@ impl Server {
}
// safety: cancellation safe
e = self.consumer.dispatch() => {
log::trace!("-> consumer.dispatch()");
e?;
}
// safety: cancellation safe
Expand Down Expand Up @@ -314,66 +306,57 @@ impl Server {
state.last_seen = Some(Instant::now());
// set addr as new default for this client
state.client.active_addr = Some(addr);

match (event, addr) {
(Event::Pong(), _) => {}
(Event::Pong(), _) => {} // ignore pong events
(Event::Ping(), addr) => {
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
log::error!("udp send: {}", e);
}
// we release the mouse here,
// since its very likely, that we wont get a release event
self.producer.release();
}
(event, addr) => match self.state {
State::Sending => {
// in sending state, we dont want to process
// any events to avoid feedback loops,
// therefore we tell the event producer
// to release the pointer and move on
// first event -> release pointer
if let Event::Release() = event {
log::debug!("releasing pointer ...");
self.producer.release();
self.state = State::Receiving;
}
(event, addr) => {
// device is sending events => release pointer if captured
if self.state == State::Sending {
log::debug!("releasing pointer ...");
self.producer.release();
self.state = State::Receiving;
}
State::Receiving => {
// consume event
self.consumer.consume(event, handle).await;

// let the server know we are still alive once every second
let last_replied = state.last_replied;
if last_replied.is_none()
|| last_replied.is_some()
&& last_replied.unwrap().elapsed() > Duration::from_secs(1)
{
state.last_replied = Some(Instant::now());
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
log::error!("udp send: {}", e);
}

// consume event
self.consumer.consume(event, handle).await;
log::trace!("{event:?} => consumer");

// let the server know we are still alive once every second
let last_replied = state.last_replied;
if last_replied.is_none()
|| last_replied.is_some()
&& last_replied.unwrap().elapsed() > Duration::from_secs(1)
{
state.last_replied = Some(Instant::now());
if let Err(e) = send_event(&self.socket, Event::Pong(), addr).await {
log::error!("udp send: {}", e);
}
}
},
}
}

async fn handle_producer_event(&mut self, c: ClientHandle, e: Event) {
let mut should_release = false;
// in receiving state, only release events
// must be transmitted
if let Event::Release() = e {
self.state = State::Sending;
}

log::trace!("producer: ({c}) {e:?}");

// get client state for handle
let state = match self.client_manager.get_mut(c) {
Some(state) => state,
None => {
log::warn!("unknown client!");
return;
}
};
// otherwise we should have an address to send to

// we are sending events
self.state = State::Sending;

// otherwise we should have an address to
// transmit events to the corrensponding client
if let Some(addr) = state.client.active_addr {
if let Err(e) = send_event(&self.socket, e, addr).await {
Expand All @@ -399,7 +382,8 @@ impl Server {
// release mouse if client didnt respond to the first ping
if state.last_ping.is_some() && state.last_ping.unwrap().elapsed() < Duration::from_secs(1)
{
should_release = true;
log::info!("client not responding - releasing pointer");
self.producer.release();
}

// last ping > 500ms ago -> ping all interfaces
Expand All @@ -411,18 +395,6 @@ impl Server {
log::error!("udp send: {}", e);
}
}
// send additional release event, in case client is still in sending mode
if let Err(e) = send_event(&self.socket, Event::Release(), *addr).await {
if e.kind() != ErrorKind::WouldBlock {
log::error!("udp send: {}", e);
}
}
}

if should_release && self.state != State::Receiving {
log::info!("client not responding - releasing pointer");
self.producer.release();
self.state = State::Receiving;
}
}

Expand Down Expand Up @@ -544,7 +516,6 @@ impl Server {
async fn receive_event(
socket: &UdpSocket,
) -> std::result::Result<(Event, SocketAddr), Box<dyn Error>> {
log::trace!("receive_event");
let mut buf = vec![0u8; 22];
match socket.recv_from(&mut buf).await {
Ok((_amt, src)) => Ok((Event::try_from(buf)?, src)),
Expand Down

0 comments on commit 5fc02d4

Please sign in to comment.