Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when receiving a GOAWAY, allow earlier streams to still process #133

Merged
merged 1 commit into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/frame/go_away.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ impl GoAway {
}
}

#[cfg(feature = "unstable")]
pub fn last_stream_id(&self) -> StreamId {
self.last_stream_id
}
Expand All @@ -27,9 +26,7 @@ impl GoAway {

pub fn load(payload: &[u8]) -> Result<GoAway, Error> {
if payload.len() < 8 {
// Invalid payload len
// TODO: Handle error
unimplemented!();
return Err(Error::BadFrameSize);
}

let (last_stream_id, _) = StreamId::parse(&payload[..4]);
Expand Down
70 changes: 55 additions & 15 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ where
/// Tracks the connection level state transitions.
state: State,

/// An error to report back once complete.
///
/// This exists separately from State in order to support
/// graceful shutdown.
error: Option<Reason>,

/// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>,

Expand All @@ -41,14 +47,14 @@ enum State {
/// Currently open in a sane state
Open,

/// Waiting to send a GO_AWAY frame
/// Waiting to send a GOAWAY frame
GoAway(frame::GoAway),

/// The codec must be flushed
Flush(Reason),

/// In an errored state
Error(Reason),
/// In a closed state
Closed(Reason),
}

impl<T, P, B> Connection<T, P, B>
Expand All @@ -74,6 +80,7 @@ where
});
Connection {
state: State::Open,
error: None,
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
Expand Down Expand Up @@ -118,10 +125,19 @@ where
// This will also handle flushing `self.codec`
try_ready!(self.streams.poll_complete(&mut self.codec));

if self.error.is_some() {
if self.streams.num_active_streams() == 0 {
let id = self.streams.last_processed_id();
let goaway = frame::GoAway::new(id, Reason::NoError);
self.state = State::GoAway(goaway);
continue;
}
}

return Ok(Async::NotReady);
},
// Attempting to read a frame resulted in a connection level
// error. This is handled by setting a GO_AWAY frame followed by
// error. This is handled by setting a GOAWAY frame followed by
// terminating the connection.
Err(Connection(e)) => {
debug!("Connection::poll; err={:?}", e);
Expand Down Expand Up @@ -164,24 +180,45 @@ where
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());

// Buffer the GO_AWAY frame
// Buffer the GOAWAY frame
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");

// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
// GOAWAY sent, transition the connection to a closed state
// Determine what error code should be returned to user.
let reason = if let Some(theirs) = self.error.take() {
let ours = frame.reason();
match (ours, theirs) {
// If either side reported an error, return that
// to the user.
(Reason::NoError, err) |
(err, Reason::NoError) => err,
// If both sides reported an error, give their
// error back to th user. We assume our error
// was a consequence of their error, and less
// important.
(_, theirs) => theirs,
}
} else {
frame.reason()
};
self.state = State::Flush(reason);
},
State::Flush(reason) => {
// Flush the codec
try_ready!(self.codec.flush());

// Transition the state to error
self.state = State::Error(reason);
self.state = State::Closed(reason);
},
State::Error(reason) => {
return Err(reason.into());
State::Closed(reason) => {
if let Reason::NoError = reason {
return Ok(Async::Ready(()));
} else {
return Err(reason.into());
}
},
}
}
Expand Down Expand Up @@ -215,11 +252,14 @@ where
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
},
Some(GoAway(_)) => {
// TODO: handle the last_processed_id. Also, should this be
// handled as an error?
// let _ = RecvError::Proto(frame.reason());
return Ok(().into());
Some(GoAway(frame)) => {
trace!("recv GOAWAY; frame={:?}", frame);
// This should prevent starting new streams,
// but should allow continuing to process current streams
// until they are all EOS. Once they are, State should
// transition to GoAway.
self.streams.recv_goaway(&frame);
self.error = Some(frame.reason());
},
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
Expand Down
25 changes: 15 additions & 10 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,14 @@ where
// Transition the state
stream.state.set_reset(reason);

// Clear all pending outbound frames
self.prioritize.clear_queue(stream);

// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
self.recv_err(stream);

let frame = frame::Reset::new(stream.id, reason);

trace!("send_reset -- queueing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task);

// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);

}

pub fn send_data(
Expand Down Expand Up @@ -221,6 +213,19 @@ where
Ok(())
}

pub fn recv_err(&mut self, stream: &mut store::Ptr<B, P>) {
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);

// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.prioritize
.assign_connection_capacity(available, stream);
}

pub fn apply_remote_settings(
&mut self,
settings: &frame::Settings,
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ where
}
}

#[cfg(feature = "unstable")]
impl<B, P> Store<B, P>
where
P: Peer,
Expand All @@ -231,6 +230,7 @@ where
self.ids.len()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.slab.len()
}
Expand Down
34 changes: 33 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ where
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
})
Expand All @@ -202,6 +203,37 @@ where
last_processed_id
}

pub fn recv_goaway(&mut self, frame: &frame::GoAway) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let actions = &mut me.actions;
let counts = &mut me.counts;

let last_stream_id = frame.last_stream_id();
let err = frame.reason().into();

me.store
.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(&err, &mut *stream);
actions.send.recv_err(stream);
Ok::<_, ()>(())
})
} else {
Ok::<_, ()>(())
}
})
.unwrap();

actions.conn_error = Some(err);
}

pub fn last_processed_id(&self) -> StreamId {
self.inner.lock().unwrap().actions.recv.last_processed_id()
}

pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
Expand Down Expand Up @@ -446,7 +478,6 @@ where
}
}

#[cfg(feature = "unstable")]
impl<B, P> Streams<B, P>
where
B: Buf,
Expand All @@ -457,6 +488,7 @@ where
me.store.num_active_streams()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.store.num_wired_streams()
Expand Down
69 changes: 69 additions & 0 deletions tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,75 @@ fn configure_max_frame_size() {
let _ = h2.join(srv).wait().expect("wait");
}

#[test]
fn recv_goaway_finishes_processed_streams() {
let _ = ::env_logger::init();
let (io, srv) = mock::new();

let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://example.com/")
.eos(),
)
.recv_frame(
frames::headers(3)
.request("GET", "https://example.com/")
.eos(),
)
.send_frame(frames::go_away(1))
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_384]).eos())
// expecting a goaway of 0, since server never initiated a stream
.recv_frame(frames::go_away(0));
//.close();

let h2 = Client::handshake(io)
.expect("handshake")
.and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();

let req1 = client.send_request(request, true)
.unwrap()
.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.into_parts().1;
body.concat2().expect("body")
})
.and_then(|buf| {
assert_eq!(buf.len(), 16_384);
Ok(())
});


// this request will trigger a goaway
let request = Request::builder()
.method(Method::GET)
.uri("https://example.com/")
.body(())
.unwrap();
let req2 = client.send_request(request, true)
.unwrap()
.then(|res| {
let err = res.unwrap_err();
assert_eq!(err.to_string(), "protocol error: not a result of an error");
Ok::<(), ()>(())
});

h2.expect("client").join3(req1, req2)
});


h2.join(srv).wait().expect("wait");
}

/*
#[test]
fn send_data_after_headers_eos() {
Expand Down