From 7c837e126509f0215122760edd28f28b9b276ce5 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 10 Jun 2018 16:04:57 -0700 Subject: [PATCH] feat(h2): implement flow control for h2 bodies This is implemented sligntly differently than what the TODO comment said, but matches with tower-h2's implementation. h2 already handles the buffering of the pending chunk and capacity management for it, so we just want to make sure there's some capacity available (i.e. the previous chunk has been fully written out). Closes #1548 --- src/proto/h2/mod.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 167adf4f74..67a9a46726 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -103,20 +103,21 @@ where fn poll(&mut self) -> Poll { loop { if !self.data_done { - // TODO: make use of flow control on SendStream - // If you're looking at this and thinking of trying to fix this TODO, - // you may want to look at: - // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html - // - // With that doc open, we'd want to do these things: - // - check self.body_tx.capacity() to see if we can send *any* data - // - if > 0: - // - poll self.stream - // - reserve chunk.len() more capacity (because its about to be used)? - // - send the chunk - // - else: - // - try reserve a smallish amount of capacity - // - call self.body_tx.poll_capacity(), return if NotReady + if self.body_tx.capacity() == 0 { + // we don't have the next chunk of data yet, so just reserve 1 byte to make + // sure there's some capacity available. h2 will handle the capacity management + // for the actual body chunk. + self.body_tx.reserve_capacity(1); + + loop { + match try_ready!(self.body_tx.poll_capacity().map_err(::Error::new_h2)) { + Some(0) => {} + Some(_) => break, + None => return Err(::Error::new_canceled(None::<::Error>)), + } + } + } + match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) { Some(chunk) => { let is_eos = self.stream.is_end_stream(); @@ -136,6 +137,7 @@ where } } None => { + self.body_tx.reserve_capacity(0); let is_eos = self.stream.is_end_stream(); if is_eos { return self.send_eos_frame().map(Async::Ready);