Skip to content

Commit

Permalink
Send pings in P2PStream.poll_ready (#931)
Browse files Browse the repository at this point in the history
  • Loading branch information
literallymarvellous authored Jan 19, 2023
1 parent 78ffd0a commit 54e9b12
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions crates/net/eth-wire/src/p2pstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,31 +279,6 @@ where
return Poll::Ready(None)
}

// poll the pinger to determine if we should send a ping
match this.pinger.poll_ping(cx) {
Poll::Pending => {}
Poll::Ready(Ok(PingerEvent::Ping)) => {
// encode the ping message
let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes);

// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}

// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(ping_bytes.into());
}
_ => {
// encode the disconnect message
this.start_disconnect(DisconnectReason::PingTimeout)?;

// End the stream after ping related error
return Poll::Ready(None)
}
}

// we should loop here to ensure we don't return Poll::Pending if we have a message to
// return behind any pings we need to respond to
while let Poll::Ready(res) = this.inner.poll_next_unpin(cx) {
Expand Down Expand Up @@ -416,6 +391,31 @@ where
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.as_mut();

// poll the pinger to determine if we should send a ping
match this.pinger.poll_ping(cx) {
Poll::Pending => {}
Poll::Ready(Ok(PingerEvent::Ping)) => {
// encode the ping message
let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes);

// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Err(P2PStreamError::SendBufferFull))
}

// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(ping_bytes.into());
}
_ => {
// encode the disconnect message
this.start_disconnect(DisconnectReason::PingTimeout)?;

// End the stream after ping related error
return Poll::Ready(Ok(()))
}
}

match this.inner.poll_ready_unpin(cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(P2PStreamError::Io(err))),
Expand Down

0 comments on commit 54e9b12

Please sign in to comment.