Skip to content

Commit

Permalink
when receiving a GOAWAY, allow earlier streams to still process (#133)
Browse files Browse the repository at this point in the history
Once all active streams have finished, send a GOAWAY back and close the
connection.
  • Loading branch information
seanmonstar authored Oct 5, 2017
1 parent c4ca8f7 commit ecd2764
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 31 deletions.
5 changes: 1 addition & 4 deletions src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ impl GoAway {
}
}

#[cfg(feature = "unstable")]
pub fn last_stream_id(&self) -> StreamId {
self.last_stream_id
}
Expand All @@ -27,9 +26,7 @@ impl GoAway {

pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
if payload.len() < 8 {
// Invalid payload len
// TODO: Handle error
unimplemented!();
return Err(Error::BadFrameSize);
}

let (last_stream_id, _) = StreamId::parse(&payload[..4]);
Expand Down
70 changes: 55 additions & 15 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ where
/// Tracks the connection level state transitions.
state: State,

/// An error to report back once complete.
///
/// This exists separately from State in order to support
/// graceful shutdown.
error: Option<Reason>,

/// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>,

Expand All @@ -41,14 +47,14 @@ enum State {
/// Currently open in a sane state
Open,

/// Waiting to send a GO_AWAY frame
/// Waiting to send a GOAWAY frame
GoAway(frame::GoAway),

/// The codec must be flushed
Flush(Reason),

/// In an errored state
Error(Reason),
/// In a closed state
Closed(Reason),
}

impl<T, P, B> Connection<T, P, B>
Expand All @@ -74,6 +80,7 @@ where
});
Connection {
state: State::Open,
error: None,
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
Expand Down Expand Up @@ -118,10 +125,19 @@ where
// This will also handle flushing `self.codec`
try_ready!(self.streams.poll_complete(&mut self.codec));

if self.error.is_some() {
if self.streams.num_active_streams() == 0 {
let id = self.streams.last_processed_id();
let goaway = frame::GoAway::new(id, Reason::NoError);
self.state = State::GoAway(goaway);
continue;
}
}

return Ok(Async::NotReady);
},
// Attempting to read a frame resulted in a connection level
// error. This is handled by setting a GO_AWAY frame followed by
// error. This is handled by setting a GOAWAY frame followed by
// terminating the connection.
Err(Connection(e)) => {
debug!("Connection::poll; err={:?}", e);
Expand Down Expand Up @@ -164,24 +180,45 @@ where
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());

// Buffer the GO_AWAY frame
// Buffer the GOAWAY frame
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");

// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
// GOAWAY sent, transition the connection to a closed state
// Determine what error code should be returned to user.
let reason = if let Some(theirs) = self.error.take() {
let ours = frame.reason();
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NoError, err) |
(err, Reason::NoError) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
frame.reason()
};
self.state = State::Flush(reason);
},
State::Flush(reason) => {
// Flush the codec
try_ready!(self.codec.flush());

// Transition the state to error
self.state = State::Error(reason);
self.state = State::Closed(reason);
},
State::Error(reason) => {
return Err(reason.into());
State::Closed(reason) => {
if let Reason::NoError = reason {
return Ok(Async::Ready(()));
} else {
return Err(reason.into());
}
},
}
}
Expand Down Expand Up @@ -215,11 +252,14 @@ where
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
},
Some(GoAway(_)) => {
// TODO: handle the last_processed_id. Also, should this be
// handled as an error?
// let _ = RecvError::Proto(frame.reason());
return Ok(().into());
Some(GoAway(frame)) => {
trace!("recv GOAWAY; frame={:?}", frame);
// This should prevent starting new streams,
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
// transition to GoAway.
self.streams.recv_goaway(&frame);
self.error = Some(frame.reason());
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
Expand Down
25 changes: 15 additions & 10 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,14 @@ where
// Transition the state
stream.state.set_reset(reason);

// Clear all pending outbound frames
self.prioritize.clear_queue(stream);

// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
self.recv_err(stream);

let frame = frame::Reset::new(stream.id, reason);

trace!("send_reset -- queueing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task);

// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);

}

pub fn send_data(
Expand Down Expand Up @@ -221,6 +213,19 @@ where
Ok(())
}

pub fn recv_err(&mut self, stream: &mut store::Ptr<B, P>) {
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);

// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);
}

pub fn apply_remote_settings(
&mut self,
settings: &frame::Settings,
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ where
}
}

#[cfg(feature = "unstable")]
impl<B, P> Store<B, P>
where
P: Peer,
Expand All @@ -231,6 +230,7 @@ where
self.ids.len()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.slab.len()
}
Expand Down
34 changes: 33 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ where
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
})
Expand All @@ -202,6 +203,37 @@ where
last_processed_id
}

pub fn recv_goaway(&mut self, frame: &frame::GoAway) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let actions = &mut me.actions;
let counts = &mut me.counts;

let last_stream_id = frame.last_stream_id();
let err = frame.reason().into();

me.store
.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(&err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
} else {
Ok::<_, ()>(())
}
})
.unwrap();

actions.conn_error = Some(err);
}

pub fn last_processed_id(&self) -> StreamId {
self.inner.lock().unwrap().actions.recv.last_processed_id()
}

pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
Expand Down Expand Up @@ -446,7 +478,6 @@ where
}
}

#[cfg(feature = "unstable")]
impl<B, P> Streams<B, P>
where
B: Buf,
Expand All @@ -457,6 +488,7 @@ where
me.store.num_active_streams()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.store.num_wired_streams()
Expand Down
69 changes: 69 additions & 0 deletions tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,75 @@ fn configure_max_frame_size() {
let _ = h2.join(srv).wait().expect("wait");
}

#[test]
fn recv_goaway_finishes_processed_streams() {
let _ = ::env_logger::init();
let (io, srv) = mock::new();

let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.recv_frame(
frames::headers(3)
.request("GET", "https://example.com/")
.eos(),
)
.send_frame(frames::go_away(1))
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_384]).eos())
// expecting a goaway of 0, since server never initiated a stream
.recv_frame(frames::go_away(0));
//.close();

let h2 = Client::handshake(io)
.expect("handshake")
.and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();

let req1 = client.send_request(request, true)
.unwrap()
.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().expect("body")
})
.and_then(|buf| {
assert_eq!(buf.len(), 16_384);
Ok(())
});


// this request will trigger a goaway
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();
let req2 = client.send_request(request, true)
.unwrap()
.then(|res| {
let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: not a result of an error");
Ok::<(), ()>(())
});

h2.expect("client").join3(req1, req2)
});


h2.join(srv).wait().expect("wait");
}

/*
#[test]
fn send_data_after_headers_eos() {
Expand Down

0 comments on commit ecd2764

Please sign in to comment.