Skip to content

Commit

Permalink
src/linux: Use select over select! and return BrokenPipe error (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuhn authored Apr 20, 2022
1 parent 474d041 commit adcce22
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions src/linux.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net};
use fnv::FnvHashSet;
use futures::channel::mpsc::UnboundedReceiver;
use futures::future::FutureExt;
use futures::future::Either;
use futures::stream::{Stream, TryStreamExt};
use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR};
use rtnetlink::packet::address::nlas::Nla;
Expand All @@ -12,6 +12,7 @@ use std::collections::VecDeque;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -41,19 +42,28 @@ impl IfWatcher {
let mut queue = VecDeque::default();

loop {
futures::select! {
msg = stream.try_next().fuse() => match msg {
Ok(Some(msg)) => {
for net in iter_nets(msg) {
if addrs.insert(net) {
queue.push_back(IfEvent::Up(net));
let fut = futures::future::select(conn, stream.try_next());
match fut.await {
Either::Left(_) => {
return Err(std::io::Error::new(
ErrorKind::BrokenPipe,
"rtnetlink socket closed",
))
}
Either::Right((x, c)) => {
conn = c;
match x {
Ok(Some(msg)) => {
for net in iter_nets(msg) {
if addrs.insert(net) {
queue.push_back(IfEvent::Up(net));
}
}
}
},
Ok(None) => break,
Err(err) => return Err(Error::new(ErrorKind::Other, err)),
},
_r = (&mut conn).fuse() => {}
Ok(None) => break,
Err(err) => return Err(Error::new(ErrorKind::Other, err)),
}
}
}
}
Ok(Self {
Expand Down Expand Up @@ -89,7 +99,13 @@ impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while Pin::new(&mut self.conn).poll(cx).is_ready() {}
log::trace!("polling IfWatcher {:p}", self.deref_mut());
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(std::io::Error::new(
ErrorKind::BrokenPipe,
"rtnetlink socket closed",
)));
}
while let Poll::Ready(Some((message, _))) = Pin::new(&mut self.messages).poll_next(cx) {
match message.payload {
NetlinkPayload::Error(err) => return Poll::Ready(Err(err.to_io())),
Expand Down

0 comments on commit adcce22

Please sign in to comment.