Skip to content

Commit

Permalink
Implemented server-side reset of the stream using error code NO_ERROR…
Browse files Browse the repository at this point in the history
… in case the request content is not read.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 15, 2023
1 parent de5dffe commit c5e40f5
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,12 @@ public void onPriority(PriorityFrame frame)
@Override
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {} on {}", frame, this);

int streamId = frame.getStreamId();
HTTP2Stream stream = getStream(streamId);

if (LOG.isDebugEnabled())
LOG.debug("Received {} for {} on {}", frame, stream, this);

if (stream != null)
{
stream.process(frame, new OnResetCallback());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ public int updateRecvWindow(int delta)
@Override
public void close()
{
if (LOG.isDebugEnabled())
LOG.debug("Close for {}", this);
CloseState oldState = closeState.getAndSet(CloseState.CLOSED);
if (oldState != CloseState.CLOSED)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public void succeeded()
// Send a reset to the other end so that it stops sending data.
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{}: unconsumed request content, resetting stream", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.NO_ERROR.code), Callback.NOOP);
}
}
_httpChannel.recycle();
Expand All @@ -619,9 +619,10 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
ErrorCode errorCode = x == HttpStream.CONTENT_NOT_CONSUMED ? ErrorCode.NO_ERROR : ErrorCode.CANCEL_STREAM_ERROR;
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{} aborted", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
LOG.debug("HTTP2 response #{}/{} failed {}", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()), errorCode, x);
_stream.reset(new ResetFrame(_stream.getId(), errorCode.code), Callback.NOOP);
}

private class SendTrailers extends Callback.Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package org.eclipse.jetty.http3.server.internal;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -494,16 +493,17 @@ public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{}: unconsumed request content, resetting stream", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), new IOException("unconsumed content"));
stream.reset(HTTP3ErrorCode.NO_ERROR.code(), CONTENT_NOT_CONSUMED);
}
}

@Override
public void failed(Throwable x)
{
HTTP3ErrorCode errorCode = x == HttpStream.CONTENT_NOT_CONSUMED ? HTTP3ErrorCode.NO_ERROR : HTTP3ErrorCode.REQUEST_CANCELLED_ERROR;
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} aborted", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
LOG.debug("HTTP3 Response #{}/{} failed {}", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), errorCode, x);
stream.reset(errorCode.code(), x);
}

public void onIdleTimeout(TimeoutException failure, BiConsumer<Runnable, Boolean> consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public interface HttpStream extends Callback
{
Exception CONTENT_NOT_CONSUMED = new StaticException("Content not consumed");
Exception CONTENT_NOT_CONSUMED = new StaticException("Unconsumed request content");

/**
* <p>Attribute name to be used as a {@link Request} attribute to store/retrieve
Expand Down Expand Up @@ -119,7 +119,7 @@ static Throwable consumeAvailable(HttpStream stream, HttpConfiguration httpConfi

// if we cannot read to EOF then fail the stream rather than wait for unconsumed content
if (content == null)
return CONTENT_NOT_CONSUMED;
break;

// Always release any returned content. This is a noop for EOF and Error content.
content.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public boolean handle(Request request, Response response, Callback callback)

assertThat(stream.isComplete(), is(true));
assertThat(stream.getFailure(), notNullValue());
assertThat(stream.getFailure().getMessage(), containsString("Content not consumed"));
assertThat(stream.getFailure().getMessage(), containsString("Unconsumed request content"));
assertThat(stream.getResponse(), notNullValue());
assertThat(stream.getResponse().getStatus(), equalTo(200));
assertThat(stream.getResponseHeaders().get(HttpHeader.CONTENT_TYPE), equalTo(MimeTypes.Type.TEXT_PLAIN_UTF_8.asString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.HttpStream;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.component.Destroyable;
Expand All @@ -34,7 +35,6 @@ class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private static final HttpInput.ErrorContent RECYCLED_ERROR_CONTENT = new HttpInput.ErrorContent(new StaticException("ContentProducer has been recycled"));
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new StaticException("Unconsumed content");

private final AutoLock _lock = new AutoLock();
private final HttpChannel _httpChannel;
Expand Down Expand Up @@ -182,7 +182,7 @@ public long getRawContentArrived()
public boolean consumeAll()
{
assertLocked();
Throwable x = UNCONSUMED_CONTENT_EXCEPTION;
Throwable x = HttpStream.CONTENT_NOT_CONSUMED;
if (LOG.isTraceEnabled())
{
x = new StaticException("Unconsumed content", true);
Expand Down

0 comments on commit c5e40f5

Please sign in to comment.