From cb1d8f7348af421def47f73b0dbe7a9eb2a8d16b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 8 Mar 2021 18:01:17 +0100 Subject: [PATCH] Support shutting down the stream with an error code (#213) Motivation: We need to support shutting down the stream (read and write) with an error code to be able to propagate errors to the remote peer (like in H3). Modifications: Add method overloads that allow to include the error code when shutdown one side of the stream Result: Be able to propagate stream errors to the remote peer --- .../codec/quic/QuicStreamChannel.java | 41 +++++++++++++++++- .../codec/quic/QuicheQuicChannel.java | 12 ++---- .../codec/quic/QuicheQuicStreamChannel.java | 43 ++++++++++++++++--- 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicStreamChannel.java b/src/main/java/io/netty/incubator/codec/quic/QuicStreamChannel.java index d1fdc4b1d..20e937a8f 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicStreamChannel.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicStreamChannel.java @@ -15,12 +15,13 @@ */ package io.netty.incubator.codec.quic; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.socket.DuplexChannel; +import java.net.Socket; + /** * A QUIC stream. */ @@ -32,6 +33,44 @@ public interface QuicStreamChannel extends DuplexChannel { */ ChannelFutureListener SHUTDOWN_OUTPUT = f -> ((QuicStreamChannel) f.channel()).shutdownOutput(); + /** + * Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will + * be send to the remote peer and all data received will be discarded. + * + * @param error the error to send. + * @return the future that is notified on completion. + */ + ChannelFuture shutdownInput(int error); + + /** + * Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will + * be send to the remote peer and all data received will be discarded. + * + * @param error the error to send. + * @param promise will be notified on completion. + * @return the future that is notified on completion. + */ + ChannelFuture shutdownInput(int error, ChannelPromise promise); + + /** + * Shutdown the output of the stream with the given error code. This means a {@code RESET_STREAM} frame will + * be send to the remote peer and all data that is not sent yet will be discarded. + * + * @param error the error to send. + * @return the future that is notified on completion. + */ + ChannelFuture shutdownOutput(int error); + + /** + * Shutdown the output of the stream with the given error code. This means a {@code RESET_STREAM} frame will + * be send to the remote peer and all data that is not sent yet will be discarded. + * + * @param error the error to send. + * @param promise will be notified on completion. + * @return the future that is notified on completion. + */ + ChannelFuture shutdownOutput(int error, ChannelPromise promise); + @Override QuicStreamAddress localAddress(); diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java index f4a3a6e19..754406182 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java @@ -676,16 +676,12 @@ QuicStreamType streamType(long streamId) { return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL; } - void streamShutdownRead(long streamId, ChannelPromise promise) { - streamShutdown0(streamId, true, false, 0, promise); + void streamShutdownRead(long streamId, int err, ChannelPromise promise) { + streamShutdown0(streamId, true, false, err, promise); } - void streamShutdownWrite(long streamId, ChannelPromise promise) { - streamShutdown0(streamId, false, true, 0, promise); - } - - void streamShutdownReadAndWrite(long streamId, ChannelPromise promise) { - streamShutdown0(streamId, true, true, 0, promise); + void streamShutdownWrite(long streamId, int err, ChannelPromise promise) { + streamShutdown0(streamId, false, true, err, promise); } private void streamShutdown0(long streamId, boolean read, boolean write, int err, ChannelPromise promise) { diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java index 233a32029..c45d2c622 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java @@ -160,12 +160,37 @@ public ChannelFuture shutdownInput() { @Override public ChannelFuture shutdownInput(ChannelPromise channelPromise) { + return shutdownInput(0, channelPromise); + } + + @Override + public ChannelFuture shutdownInput(int error) { + return shutdownInput(0); + } + + @Override + public ChannelFuture shutdownInput(int error, ChannelPromise promise) { if (eventLoop().inEventLoop()) { - shutdownInput0(channelPromise); + shutdownInput0(error, promise); } else { - eventLoop().execute(() -> shutdownInput0(channelPromise)); + eventLoop().execute(() -> shutdownInput0(error, promise)); } - return channelPromise; + return promise; + } + + @Override + public ChannelFuture shutdownOutput(int error) { + return shutdownOutput(error, newPromise()); + } + + @Override + public ChannelFuture shutdownOutput(int error, ChannelPromise promise) { + if (eventLoop().inEventLoop()) { + shutdownOutput0(error, promise); + } else { + eventLoop().execute(() -> shutdownOutput0(error, promise)); + } + return promise; } @Override @@ -173,9 +198,9 @@ public QuicheQuicChannel parent() { return parent; } - private void shutdownInput0(ChannelPromise channelPromise) { + private void shutdownInput0(int err, ChannelPromise channelPromise) { inputShutdown = true; - parent().streamShutdownRead(streamId(), channelPromise); + parent().streamShutdownRead(streamId(), err, channelPromise); closeIfDone(); } @@ -213,6 +238,12 @@ private void shutdownOutput0(ChannelPromise channelPromise) { closeIfDone(); } + private void shutdownOutput0(int error, ChannelPromise channelPromise) { + parent().streamShutdownWrite(streamId(), error, channelPromise); + outputShutdown = true; + closeIfDone(); + } + @Override public boolean isShutdown() { return outputShutdown && inputShutdown; @@ -244,7 +275,7 @@ private void shutdown0(ChannelPromise channelPromise) { } inputShutdown = true; outputShutdown = true; - parent().streamShutdownRead(streamId(), channelPromise); + parent().streamShutdownRead(streamId(), 0, channelPromise); closeIfDone(); }