diff --git a/src/main/java/io/github/nstdio/http/ext/DecompressingSubscriber.java b/src/main/java/io/github/nstdio/http/ext/DecompressingSubscriber.java index bf33fa9..a66d721 100644 --- a/src/main/java/io/github/nstdio/http/ext/DecompressingSubscriber.java +++ b/src/main/java/io/github/nstdio/http/ext/DecompressingSubscriber.java @@ -25,6 +25,7 @@ import java.net.http.HttpResponse.BodySubscriber; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; @@ -34,6 +35,7 @@ import java.util.zip.GZIPInputStream; class DecompressingSubscriber implements BodySubscriber { + private static final int MIN_BYTES_TO_INIT = 10; private static final List LAST_ITEM = List.of(); private final BodySubscriber downstream; @@ -86,25 +88,14 @@ public void onNext(List item) { } if (initError.get()) { - onNext0(item); + pushNext(item); return; } item.forEach(is::add); - if (decompressingStream == null) { - // trying to buffer at least 10 bytes - // to normally initialize decompressingStream - - int available = currentlyAvailable(); - if (available < 10) { - // nothing changed since last execution - if (lastAvailable.getAndSet(available) == available) { - pushRemainingBytes(); - } - - return; - } + if (!hasEnoughBytesForInit()) { + return; } if (!initDecompressingStream()) { @@ -112,6 +103,21 @@ public void onNext(List item) { return; } + try { + List dec = decompress(); + pushNext(dec); + } catch (IOException e) { + if (item == LAST_ITEM) { + completed.set(true); + } else { + onError(e); + } + + onComplete(); + } + } + + private List decompress() throws IOException { List dec = new ArrayList<>(); ByteBuffer buf = newBuffer(); @@ -129,24 +135,40 @@ public void onNext(List item) { } } - pushNext(dec, buf); + add(dec, buf); } catch (EOFException e) { - pushNext(dec, buf); - } catch (IOException e) { - if (item == LAST_ITEM) { - completed.set(true); - } + add(dec, buf); + } - onError(e); - onComplete(); + return Collections.unmodifiableList(dec); + } + private boolean hasEnoughBytesForInit() { + if (decompressingStream != null) { + // already initialized + return true; } + + // trying to buffer at least 10 bytes + // to normally initialize decompressingStream + + int available = available(); + if (available < MIN_BYTES_TO_INIT) { + // nothing changed since last execution + if (lastAvailable.getAndSet(available) == available) { + pushRemainingBytes(); + } + + return false; + } + + return true; } private void pushRemainingBytes() { try { List wrap = List.of(ByteBuffer.wrap(is.readAllBytes())); - onNext0(wrap); + pushNext(wrap); } catch (IOException ignored) { // cannot happen because using ByteBufferInputStream which // is not connect to any I/O device. @@ -155,19 +177,12 @@ private void pushRemainingBytes() { close(); } - private void onNext0(List item) { - if (item != LAST_ITEM) { + private void pushNext(List item) { + if (!item.isEmpty()) { downstream.onNext(item); } } - private void pushNext(List dec, ByteBuffer buf) { - add(dec, buf); - if (!dec.isEmpty()) { - onNext0(dec); - } - } - private ByteBuffer newBuffer() { return ByteBuffer.allocate(bufferSize); } @@ -198,7 +213,7 @@ private boolean initDecompressingStream() { return !initError.get(); } - private int currentlyAvailable() { + private int available() { int available = 0; try { available = is.available(); @@ -222,9 +237,9 @@ private void close() { @Override public void onComplete() { onNext(LAST_ITEM); + completed.set(true); close(); - completed.set(true); downstream.onComplete(); } }