Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shutdown of non-persistent HTTP/1 connections #12212 #12216

Merged
merged 9 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,16 @@ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
@Override
public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
if (isRequestBufferEmpty())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
return null;
}
return null;
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
}

@Override
Expand All @@ -341,10 +342,10 @@ void releaseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
RetainableByteBuffer buffer = _retainableByteBuffer;
_retainableByteBuffer = null;
if (!buffer.release())
throw new IllegalStateException("unreleased buffer " + buffer);
}
}

Expand All @@ -369,7 +370,9 @@ public void onFillable()
HttpConnection last = setCurrentConnection(this);
try
{
while (getEndPoint().isOpen())
// We must loop until we fill -1 or there is an async pause in handling.
// Note that the endpoint might already be closed in some special circumstances.
while (true)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
// Fill the request buffer (if needed).
int filled = fillRequestBuffer();
Expand Down Expand Up @@ -906,6 +909,13 @@ private void releaseChunk()
@Override
protected void onCompleteSuccess()
{
// If we are a non-persistent connection and have succeeded the last write...
if (_shutdownOut && !(_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection))
{
// then we shutdown the output here so that the client sees the body termination ASAP and
// cannot be delayed by any further server handling before the stream callback is completed.
getEndPoint().shutdownOutput();
}
release().succeeded();
}

Expand Down Expand Up @@ -1513,8 +1523,7 @@ public void succeeded()
return;
}

Connection upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
if (upgradeConnection != null)
if (_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection upgradeConnection)
{
getEndPoint().upgrade(upgradeConnection);
_httpChannel.recycle();
Expand All @@ -1523,13 +1532,8 @@ public void succeeded()
return;
}

// As this is not an upgrade, we can shutdown the output if we know we are not persistent
if (_sendCallback._shutdownOut)
getEndPoint().shutdownOutput();

_httpChannel.recycle();


// If a 100 Continue is still expected to be sent, but no content was read, then
// close the parser so that seeks EOF below, not the next request.
if (_expects100Continue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,6 +72,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -1085,6 +1090,56 @@ public void testCloseWhileWriteBlocked() throws Exception
}
}

@Test
public void testCloseWhileCompletePending() throws Exception
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
String content = "The End!\r\n";
CountDownLatch handleComplete = new CountDownLatch(1);
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
FutureCallback writeComplete = new FutureCallback();
Content.Sink.write(response, true, content, writeComplete);
// Wait until the write is complete
writeComplete.get(30, TimeUnit.SECONDS);

// Wait until test lets the handling complete
assertTrue(handleComplete.await(30, TimeUnit.SECONDS));

callback.succeeded();
return true;
}
});

try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream output = client.getOutputStream();
output.write("""
GET / HTTP/1.1\r
Host: localhost:%d\r
Connection: close\r
\r
""".formatted(_serverURI.getPort())
.getBytes());
output.flush();

client.setSoTimeout(5000);
long start = NanoTime.now();
HttpTester.Input input = HttpTester.from(client.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(content, response.getContent());
assertFalse(input.isEOF());
assertEquals(-1, input.fillBuffer());
assertTrue(input.isEOF());
assertThat(NanoTime.secondsSince(start), lessThan(5L));

}
handleComplete.countDown();
}

@Test
public void testBigBlocks() throws Exception
{
Expand Down Expand Up @@ -1813,8 +1868,9 @@ public void testChunkedShutdown() throws Exception
}
}

@Test
public void testHoldContent() throws Exception
@ParameterizedTest
@ValueSource(booleans = {false /* TODO, true */})
public void testHoldContent(boolean close) throws Exception
{
Queue<Content.Chunk> contents = new ConcurrentLinkedQueue<>();
final int bufferSize = 1024;
Expand Down Expand Up @@ -1857,6 +1913,10 @@ public void onClosed(Connection connection)
}

response.setStatus(200);

if (close)
request.getConnectionMetaData().getConnection().getEndPoint().close();

callback.succeeded();
return true;
}
Expand Down Expand Up @@ -1897,9 +1957,12 @@ public void onClosed(Connection connection)
out.flush();

// check the response
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
if (!close)
{
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
}
}

assertTrue(closed.await(10, TimeUnit.SECONDS));
Expand Down
Loading