From 22ed0ac4caf5d60628ff8836dd1dc2d3289cf43f Mon Sep 17 00:00:00 2001 From: Remco Verhoef Date: Thu, 23 Sep 2021 23:15:57 +0200 Subject: [PATCH 1/2] flush data still available in the decoder when no input (#123) --- src/codec/gzip/decoder.rs | 14 +++++++++++++- src/futures/bufread/generic/decoder.rs | 25 +++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/codec/gzip/decoder.rs b/src/codec/gzip/decoder.rs index fc38a43c..6e9958fd 100644 --- a/src/codec/gzip/decoder.rs +++ b/src/codec/gzip/decoder.rs @@ -80,8 +80,20 @@ impl GzipDecoder { State::Decoding => { let prior = output.written().len(); - let done = inner(self, input, output)?; + + let done = inner(self, input, output).or_else(|e| match e { + // we need to make an exception, if output flushed + // from the decoder, to update the crc + e if e.kind() == std::io::ErrorKind::Other + && output.written().len() > 0 => + { + Ok(false) + } + _ => Err(e), + })?; + self.crc.update(&output.written()[prior..]); + if done { self.state = State::Footer(vec![0; 8].into()) } diff --git a/src/futures/bufread/generic/decoder.rs b/src/futures/bufread/generic/decoder.rs index d19daae5..591bd306 100644 --- a/src/futures/bufread/generic/decoder.rs +++ b/src/futures/bufread/generic/decoder.rs @@ -65,18 +65,39 @@ impl Decoder { ) -> Poll> { let mut this = self.project(); + let mut first = true; + loop { *this.state = match this.state { State::Decoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; + let input = if first { + &[][..] + } else { + ready!(this.reader.as_mut().poll_fill_buf(cx))? + }; + if input.is_empty() { // Avoid attempting to reinitialise the decoder if the reader // has returned EOF. *this.multiple_members = false; + } + + if input.is_empty() && !first { State::Flushing } else { let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output)?; + let done = this.decoder.decode(&mut input, output).or_else(|e| { + // ignore the first error, occurs when input is empty + // but we need to run decode to flush + if first { + Ok(false) + } else { + Err(e) + } + })?; + + first = false; + let len = input.written().len(); this.reader.as_mut().consume(len); if done { From db0d11f5f4404b5be821de2e1e573d25c87e7f96 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 19 Sep 2023 17:16:48 +0200 Subject: [PATCH 2/2] fix merge error --- CHANGELOG.md | 2 ++ src/codec/gzip/decoder.rs | 18 +++++++----------- src/futures/bufread/generic/decoder.rs | 12 +++++------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d8152b0..732309eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0), ## Unreleased +- Flush available data in decoder even when there's no incoming input. + ## 0.4.5 - Add `{Lzma, Xz}Decoder::with_mem_limit()` methods. diff --git a/src/codec/gzip/decoder.rs b/src/codec/gzip/decoder.rs index 6e9958fd..4bb42606 100644 --- a/src/codec/gzip/decoder.rs +++ b/src/codec/gzip/decoder.rs @@ -81,18 +81,14 @@ impl GzipDecoder { State::Decoding => { let prior = output.written().len(); - let done = inner(self, input, output).or_else(|e| match e { - // we need to make an exception, if output flushed - // from the decoder, to update the crc - e if e.kind() == std::io::ErrorKind::Other - && output.written().len() > 0 => - { - Ok(false) - } - _ => Err(e), - })?; + let res = inner(self, input, output); - self.crc.update(&output.written()[prior..]); + if (output.written().len() > prior) { + // update CRC even if there was an error + self.crc.update(&output.written()[prior..]); + } + + let done = res?; if done { self.state = State::Footer(vec![0; 8].into()) diff --git a/src/futures/bufread/generic/decoder.rs b/src/futures/bufread/generic/decoder.rs index 591bd306..a3a968a7 100644 --- a/src/futures/bufread/generic/decoder.rs +++ b/src/futures/bufread/generic/decoder.rs @@ -76,23 +76,21 @@ impl Decoder { ready!(this.reader.as_mut().poll_fill_buf(cx))? }; - if input.is_empty() { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. + if input.is_empty() && !first { + // Avoid attempting to reinitialise the decoder if the + // reader has returned EOF. *this.multiple_members = false; - } - if input.is_empty() && !first { State::Flushing } else { let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output).or_else(|e| { + let done = this.decoder.decode(&mut input, output).or_else(|err| { // ignore the first error, occurs when input is empty // but we need to run decode to flush if first { Ok(false) } else { - Err(e) + Err(err) } })?;