Skip to content

Commit

Permalink
Support shutting down the stream with an error code (java-native-acce…
Browse files Browse the repository at this point in the history
…ss#213)


Motivation:

We need to support shutting down the stream (read and write) with an error code to be able to propagate errors to the remote peer (like in H3).

Modifications:

Add method overloads that allow to include the error code when shutdown one side of the stream

Result:

Be able to propagate stream errors to the remote peer
  • Loading branch information
normanmaurer authored Mar 8, 2021
1 parent 0d68bc4 commit cb1d8f7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package io.netty.incubator.codec.quic;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DuplexChannel;

import java.net.Socket;

/**
* A QUIC stream.
*/
Expand All @@ -32,6 +33,44 @@ public interface QuicStreamChannel extends DuplexChannel {
*/
ChannelFutureListener SHUTDOWN_OUTPUT = f -> ((QuicStreamChannel) f.channel()).shutdownOutput();

/**
* Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will
* be send to the remote peer and all data received will be discarded.
*
* @param error the error to send.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownInput(int error);

/**
* Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will
* be send to the remote peer and all data received will be discarded.
*
* @param error the error to send.
* @param promise will be notified on completion.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownInput(int error, ChannelPromise promise);

/**
* Shutdown the output of the stream with the given error code. This means a {@code RESET_STREAM} frame will
* be send to the remote peer and all data that is not sent yet will be discarded.
*
* @param error the error to send.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownOutput(int error);

/**
* Shutdown the output of the stream with the given error code. This means a {@code RESET_STREAM} frame will
* be send to the remote peer and all data that is not sent yet will be discarded.
*
* @param error the error to send.
* @param promise will be notified on completion.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownOutput(int error, ChannelPromise promise);

@Override
QuicStreamAddress localAddress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,16 +676,12 @@ QuicStreamType streamType(long streamId) {
return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
}

void streamShutdownRead(long streamId, ChannelPromise promise) {
streamShutdown0(streamId, true, false, 0, promise);
void streamShutdownRead(long streamId, int err, ChannelPromise promise) {
streamShutdown0(streamId, true, false, err, promise);
}

void streamShutdownWrite(long streamId, ChannelPromise promise) {
streamShutdown0(streamId, false, true, 0, promise);
}

void streamShutdownReadAndWrite(long streamId, ChannelPromise promise) {
streamShutdown0(streamId, true, true, 0, promise);
void streamShutdownWrite(long streamId, int err, ChannelPromise promise) {
streamShutdown0(streamId, false, true, err, promise);
}

private void streamShutdown0(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,47 @@ public ChannelFuture shutdownInput() {

@Override
public ChannelFuture shutdownInput(ChannelPromise channelPromise) {
return shutdownInput(0, channelPromise);
}

@Override
public ChannelFuture shutdownInput(int error) {
return shutdownInput(0);
}

@Override
public ChannelFuture shutdownInput(int error, ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
shutdownInput0(channelPromise);
shutdownInput0(error, promise);
} else {
eventLoop().execute(() -> shutdownInput0(channelPromise));
eventLoop().execute(() -> shutdownInput0(error, promise));
}
return channelPromise;
return promise;
}

@Override
public ChannelFuture shutdownOutput(int error) {
return shutdownOutput(error, newPromise());
}

@Override
public ChannelFuture shutdownOutput(int error, ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
shutdownOutput0(error, promise);
} else {
eventLoop().execute(() -> shutdownOutput0(error, promise));
}
return promise;
}

@Override
public QuicheQuicChannel parent() {
return parent;
}

private void shutdownInput0(ChannelPromise channelPromise) {
private void shutdownInput0(int err, ChannelPromise channelPromise) {
inputShutdown = true;
parent().streamShutdownRead(streamId(), channelPromise);
parent().streamShutdownRead(streamId(), err, channelPromise);
closeIfDone();
}

Expand Down Expand Up @@ -213,6 +238,12 @@ private void shutdownOutput0(ChannelPromise channelPromise) {
closeIfDone();
}

private void shutdownOutput0(int error, ChannelPromise channelPromise) {
parent().streamShutdownWrite(streamId(), error, channelPromise);
outputShutdown = true;
closeIfDone();
}

@Override
public boolean isShutdown() {
return outputShutdown && inputShutdown;
Expand Down Expand Up @@ -244,7 +275,7 @@ private void shutdown0(ChannelPromise channelPromise) {
}
inputShutdown = true;
outputShutdown = true;
parent().streamShutdownRead(streamId(), channelPromise);
parent().streamShutdownRead(streamId(), 0, channelPromise);
closeIfDone();
}

Expand Down

0 comments on commit cb1d8f7

Please sign in to comment.