Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Graceful Shutdown support #250

Merged
merged 2 commits into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/codec/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum UserError {

/// The released capacity is larger than claimed capacity.
ReleaseCapacityTooBig,

/// The stream ID space is overflowed.
///
/// A new connection is needed.
Expand Down
11 changes: 10 additions & 1 deletion src/frame/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ pub struct Ping {
payload: Payload,
}

// This was just 8 randomly generated bytes. We use something besides just
// zeroes to distinguish this specific PING from any other.
const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x54];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could use [0x0b, 0x7b, 0xf0, 0xa2, 0x8b, 0x9b, 0x54, 0xfe]... :trollface:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: This is not a serious comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran node -p "require('crypto').randomBytes(8)" and copied over. No particular reason other than wanting it be different from [0u8; 8].


impl Ping {

#[cfg(feature = "unstable")]
pub const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD;

#[cfg(not(feature = "unstable"))]
pub(crate) const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD;

pub fn new(payload: Payload) -> Ping {
Ping {
ack: false,
Expand All @@ -31,7 +41,6 @@ impl Ping {
self.ack
}

#[cfg(feature = "unstable")]
pub fn payload(&self) -> &Payload {
&self.payload
}
Expand Down
2 changes: 2 additions & 0 deletions src/frame/stream_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ impl StreamId {

pub const MAX: StreamId = StreamId(u32::MAX >> 1);

pub const MAX_CLIENT: StreamId = StreamId((u32::MAX >> 1) - 1);

/// Parse the stream ID
#[inline]
pub fn parse(buf: &[u8]) -> (StreamId, bool) {
Expand Down
175 changes: 116 additions & 59 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::Stream;
use tokio_io::{AsyncRead, AsyncWrite};

use std::marker::PhantomData;
use std::io;
use std::time::Duration;

/// An H2 connection
Expand All @@ -30,6 +31,9 @@ where
/// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>,

/// Pending GOAWAY frames to write.
go_away: GoAway,

/// Ping/pong handler
ping_pong: PingPong,

Expand Down Expand Up @@ -57,11 +61,8 @@ enum State {
/// Currently open in a sane state
Open,

/// Waiting to send a GOAWAY frame
GoAway(frame::GoAway),

/// The codec must be flushed
Flush(Reason),
Closing(Reason),

/// In a closed state
Closed(Reason),
Expand Down Expand Up @@ -95,6 +96,7 @@ where
state: State::Open,
error: None,
codec: codec,
go_away: GoAway::new(),
ping_pong: PingPong::new(),
settings: Settings::new(),
streams: streams,
Expand All @@ -111,9 +113,9 @@ where
/// Returns `RecvError` as this may raise errors that are caused by delayed
/// processing of received frames.
fn poll_ready(&mut self) -> Poll<(), RecvError> {
// The order of these calls don't really matter too much as only one
// should have pending work.
// The order of these calls don't really matter too much
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
try_ready!(self.ping_pong.send_pending_ping(&mut self.codec));
try_ready!(
self.settings
.send_pending_ack(&mut self.codec, &mut self.streams)
Expand All @@ -123,9 +125,47 @@ where
Ok(().into())
}

fn transition_to_go_away(&mut self, id: StreamId, e: Reason) {
let goaway = frame::GoAway::new(id, e);
self.state = State::GoAway(goaway);
/// Send any pending GOAWAY frames.
///
/// This will return `Some(reason)` if the connection should be closed
/// afterwards. If this is a graceful shutdown, this returns `None`.
fn poll_go_away(&mut self) -> Poll<Option<Reason>, io::Error> {
self.go_away.send_pending_go_away(&mut self.codec)
}

fn go_away(&mut self, id: StreamId, e: Reason) {
let frame = frame::GoAway::new(id, e);
self.streams.send_go_away(id);
self.go_away.go_away(frame);
}

pub fn go_away_now(&mut self, e: Reason) {
let last_processed_id = self.streams.last_processed_id();
let frame = frame::GoAway::new(last_processed_id, e);
self.go_away.go_away_now(frame);
}

fn take_error(&mut self, ours: Reason) -> Poll<(), proto::Error> {
let reason = if let Some(theirs) = self.error.take() {
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
ours
};

if reason == Reason::NO_ERROR {
Ok(().into())
} else {
Err(proto::Error::Proto(reason))
}
}

/// Closes the connection by transitioning to a GOAWAY state
Expand All @@ -134,15 +174,10 @@ where
// If we poll() and realize that there are no streams or references
// then we can close the connection by transitioning to GOAWAY
if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() {
self.close_connection();
self.go_away_now(Reason::NO_ERROR);
}
}

/// Closes the connection by transitioning to a GOAWAY state
pub fn close_connection(&mut self) {
let last_processed_id = self.streams.last_processed_id();
self.transition_to_go_away(last_processed_id, Reason::NO_ERROR);
}

/// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), proto::Error> {
Expand All @@ -155,18 +190,17 @@ where
State::Open => {
match self.poll2() {
// The connection has shutdown normally
Ok(Async::Ready(())) => return Ok(().into()),
Ok(Async::Ready(())) => return self.take_error(Reason::NO_ERROR),
// The connection is not ready to make progress
Ok(Async::NotReady) => {
// Ensure all window updates have been sent.
//
// This will also handle flushing `self.codec`
try_ready!(self.streams.poll_complete(&mut self.codec));

if self.error.is_some() {
if self.error.is_some() || self.go_away.should_close_on_idle() {
if self.streams.num_active_streams() == 0 {
let last_processed_id = self.streams.last_processed_id();
self.transition_to_go_away(last_processed_id, Reason::NO_ERROR);
self.go_away_now(Reason::NO_ERROR);
continue;
}
}
Expand All @@ -179,9 +213,19 @@ where
Err(Connection(e)) => {
debug!("Connection::poll; err={:?}", e);

// We may have already sent a GOAWAY for this error,
// if so, don't send another, just flush and close up.
if let Some(reason) = self.go_away.going_away_reason() {
if reason == e {
trace!(" -> already going away");
self.state = State::Closing(e);
continue;
}
}

// Reset all active streams
let last_processed_id = self.streams.recv_err(&e.into());
self.transition_to_go_away(last_processed_id, e);
self.streams.recv_err(&e.into());
self.go_away_now(e);
},
// Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read
Expand All @@ -207,48 +251,16 @@ where
return Err(e);
},
}
},
State::GoAway(frame) => {
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());

// Buffer the GOAWAY frame
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");

// GOAWAY sent, transition the connection to a closed state
// Determine what error code should be returned to user.
let reason = if let Some(theirs) = self.error.take() {
let ours = frame.reason();
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NO_ERROR, err) | (err, Reason::NO_ERROR) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
frame.reason()
};
self.state = State::Flush(reason);
},
State::Flush(reason) => {
}
State::Closing(reason) => {
trace!("connection closing after flush, reason={:?}", reason);
// Flush the codec
try_ready!(self.codec.flush());

// Transition the state to error
self.state = State::Closed(reason);
},
State::Closed(reason) => if let Reason::NO_ERROR = reason {
return Ok(Async::Ready(()));
} else {
return Err(reason.into());
},
State::Closed(reason) => return self.take_error(reason),
}
}
}
Expand All @@ -263,6 +275,17 @@ where

loop {
// First, ensure that the `Connection` is able to receive a frame
//
// The order here matters:
// - poll_go_away may buffer a graceful shutdown GOAWAY frame
// - If it has, we've also added a PING to be sent in poll_ready
if let Some(reason) = try_ready!(self.poll_go_away()) {
if self.go_away.should_close_now() {
return Err(RecvError::Connection(reason));
}
// Only NO_ERROR should be waiting for idle
debug_assert_eq!(reason, Reason::NO_ERROR, "graceful GOAWAY should be NO_ERROR");
}
try_ready!(self.poll_ready());

match try_ready!(self.codec.poll()) {
Expand Down Expand Up @@ -292,12 +315,21 @@ where
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
// transition to GoAway.
self.streams.recv_goaway(&frame);
self.streams.recv_go_away(&frame);
self.error = Some(frame.reason());
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
self.ping_pong.recv_ping(frame);
let status = self.ping_pong.recv_ping(frame);
if status.is_shutdown() {
assert!(
self.go_away.is_going_away(),
"received unexpected shutdown ping"
);

let last_processed_id = self.streams.last_processed_id();
self.go_away(last_processed_id, Reason::NO_ERROR);
}
},
Some(WindowUpdate(frame)) => {
trace!("recv WINDOW_UPDATE; frame={:?}", frame);
Expand Down Expand Up @@ -339,4 +371,29 @@ where
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> {
self.streams.next_incoming()
}

// Graceful shutdown only makes sense for server peers.
pub fn go_away_gracefully(&mut self) {
if self.go_away.is_going_away() {
// No reason to start a new one.
return;
}

// According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
//
// > A server that is attempting to gracefully shut down a connection
// > SHOULD send an initial GOAWAY frame with the last stream
// > identifier set to 231-1 and a NO_ERROR code. This signals to the
// > client that a shutdown is imminent and that initiating further
// > requests is prohibited. After allowing time for any in-flight
// > stream creation (at least one round-trip time), the server can
// > send another GOAWAY frame with an updated last stream identifier.
// > This ensures that a connection can be cleanly shut down without
// > losing requests.
self.go_away(StreamId::MAX_CLIENT, Reason::NO_ERROR);

// We take the advice of waiting 1 RTT literally, and wait
// for a pong before proceeding.
self.ping_pong.ping_shutdown();
}
}
Loading