Skip to content

Commit

Permalink
Fixes #11016 - Jetty 12 IllegalStateException when stopping Server wi… (
Browse files Browse the repository at this point in the history
#11017)

* Made ServletChannel error handling more robust.
A failure in error handling is now remembered so that the Handler callback can be failed later.
* Avoid failing the Handler callback from ServletChannel.abort(), as it is too early: should be failed when processing the TERMINATED state, similarly to when it is succeeded.
* Removed dead code from HttpConnection.SendCallback.reset(), since response is always non-null.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Dec 15, 2023
1 parent 3167e0c commit eb1e9eb
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1501,9 +1501,12 @@ public void failed(Throwable failure)
Throwable unconsumed = stream.consumeAvailable();
ExceptionUtil.addSuppressedIfNotAssociated(failure, unconsumed);

ChannelResponse response = httpChannelState._response;
if (LOG.isDebugEnabled())
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", httpChannelState._stream.isCommitted(), httpChannelState._response.isCommitted(), this);
LOG.debug("failed stream.isCommitted={}, response.isCommitted={} {}", stream.isCommitted(), response.isCommitted(), this);

// There may have been an attempt to write an error response that failed.
// Do not try to write again an error response if already committed.
if (!stream.isCommitted())
errorResponse = new ErrorResponse(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,26 +741,18 @@ private boolean reset(MetaData.Request request, MetaData.Response response, Byte
_lastContent = last;
_callback = callback;
_header = null;

if (getConnector().isShutdown())
_generator.setPersistent(false);

return true;
}

if (isClosed() && response == null && last && content == null)
else
{
callback.succeeded();
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}

LOG.warn("reset failed {}", this);

if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ protected void generateAcceptableResponse(ServletContextRequest baseRequest, Htt
}

// Do an asynchronous completion.
baseRequest.getServletChannel().sendResponseAndComplete();
baseRequest.getServletChannel().sendErrorResponseAndComplete();
}

protected void handleErrorPage(HttpServletRequest request, Writer writer, int code, String message) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public void handle()
// If we can't have a body or have no ErrorHandler, then create a minimal error response.
if (HttpStatus.hasNoBody(getServletContextResponse().getStatus()) || errorHandler == null)
{
sendResponseAndComplete();
sendErrorResponseAndComplete();
}
else
{
Expand All @@ -485,7 +485,7 @@ public void handle()
// that ignores existing failures. However, the error handler needs to be able to call servlet pages,
// so it will need to do a new call to associate(req,res,callback) or similar, to make the servlet request and
// response wrap the error request and response. Have to think about what callback is passed.
errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(_state::errorHandlingComplete));
errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(() -> _state.errorHandlingComplete(null), _state::errorHandlingComplete));
}
}
catch (Throwable x)
Expand All @@ -495,23 +495,16 @@ public void handle()
else
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
if (LOG.isDebugEnabled())
LOG.debug("Could not perform ERROR dispatch, aborting", cause);
LOG.debug("Could not perform error handling, aborting", cause);
if (_state.isResponseCommitted())
{
abort(cause);
// Perform the same behavior as when the callback is failed.
_state.errorHandlingComplete(cause);
}
else
{
try
{
getServletContextResponse().resetContent();
sendResponseAndComplete();
}
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(cause, t);
abort(cause);
}
getServletContextResponse().resetContent();
sendErrorResponseAndComplete();
}
}
finally
Expand Down Expand Up @@ -684,7 +677,7 @@ protected Throwable unwrap(Throwable failure, Class<?>... targets)
return null;
}

public void sendResponseAndComplete()
public void sendErrorResponseAndComplete()
{
try
{
Expand All @@ -694,6 +687,7 @@ public void sendResponseAndComplete()
catch (Throwable x)
{
abort(x);
_state.completed(x);
}
}

Expand Down Expand Up @@ -742,10 +736,13 @@ public void onCompleted()
_servletContextRequest.setAttribute(CustomRequestLog.LOG_DETAIL, logDetail);
}

// Callback will either be succeeded here or failed in abort().
// Callback is completed only here.
Callback callback = _callback;
if (_state.completeResponse())
Throwable failure = _state.completeResponse();
if (failure == null)
callback.succeeded();
else
callback.failed(failure);
}

public boolean isCommitted()
Expand Down Expand Up @@ -783,13 +780,8 @@ protected void execute(Runnable task)
*/
public void abort(Throwable failure)
{
// Callback will either be failed here or succeeded in onCompleted().
if (_state.abortResponse())
{
if (LOG.isDebugEnabled())
LOG.debug("abort {}", this, failure);
_callback.failed(failure);
}
// Callback will be failed in onCompleted().
_state.abort(failure);
}

private void dispatch() throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public enum Action
private long _timeoutMs = DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
private Thread _onTimeoutThread;
private Throwable _failure;
private boolean _failureListener;

protected ServletChannelState(ServletChannel servletChannel)
Expand Down Expand Up @@ -293,19 +294,19 @@ public String getStatusString()
}
}

public boolean completeResponse()
public Throwable completeResponse()
{
try (AutoLock ignored = lock())
{
switch (_outputState)
{
case OPEN:
_outputState = OutputState.COMPLETED;
return true;
// This method is called when the state machine
// is about to terminate the processing, just
// before completing the Handler's callback.
assert _outputState == OutputState.OPEN || _failure != null;

default:
return false;
}
if (_outputState == OutputState.OPEN)
_outputState = OutputState.COMPLETED;

return _failure;
}
}

Expand All @@ -322,7 +323,7 @@ public boolean isResponseCompleted()
}
}

public boolean abortResponse()
private boolean abortResponse(Throwable failure)
{
try (AutoLock ignored = lock())
{
Expand All @@ -332,18 +333,34 @@ public boolean abortResponse()
case ABORTED:
return false;

case OPEN:
_servletChannel.getServletContextResponse().setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
_outputState = OutputState.ABORTED;
return true;

default:
_outputState = OutputState.ABORTED;
_failure = failure;
return true;
}
}
}

public void abort(Throwable failure)
{
boolean handle = false;
try (AutoLock ignored = lock())
{
boolean aborted = abortResponse(failure);
if (LOG.isDebugEnabled())
LOG.debug("abort={} {}", aborted, this, failure);
if (aborted)
{
handle = _state == State.WAITING;
if (handle)
_state = State.WOKEN;
_requestState = RequestState.COMPLETED;
}
}
if (handle)
scheduleDispatch();
}

/**
* @return Next handling of the request should proceed
*/
Expand Down Expand Up @@ -555,7 +572,12 @@ public String toString()
}
}

public void errorHandling()
/**
* Called when an asynchronous call to {@code ErrorHandler.handle()} is about to happen.
*
* @see #errorHandlingComplete(Throwable)
*/
void errorHandling()
{
try (AutoLock ignored = lock())
{
Expand All @@ -565,17 +587,29 @@ public void errorHandling()
}
}

public void errorHandlingComplete()
/**
* Called when the {@code Callback} passed to {@code ErrorHandler.handle()} is completed.
*
* @param failure the failure reported by the error handling,
* or {@code null} if there was no failure
*/
void errorHandlingComplete(Throwable failure)
{
boolean handle;
try (AutoLock ignored = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("errorHandlingComplete {}", toStringLocked());
LOG.debug("errorHandlingComplete {}", toStringLocked(), failure);

handle = _state == State.WAITING;
if (handle)
_state = State.WOKEN;

// If there is a failure while trying to
// handle a previous failure, just bail out.
if (failure != null)
abortResponse(failure);

if (_requestState == RequestState.ERRORING)
_requestState = RequestState.COMPLETE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AsyncServletLongPollTest
Expand Down Expand Up @@ -65,7 +72,7 @@ public void destroy() throws Exception
@Test
public void testSuspendedRequestCompletedByAnotherRequest() throws Exception
{
final CountDownLatch asyncLatch = new CountDownLatch(1);
CountDownLatch asyncLatch = new CountDownLatch(1);
prepare(new HttpServlet()
{
private volatile AsyncContext asyncContext;
Expand Down Expand Up @@ -93,7 +100,7 @@ protected void doDelete(HttpServletRequest request, HttpServletResponse response
if (param != null)
error = Integer.parseInt(param);

final AsyncContext asyncContext = this.asyncContext;
AsyncContext asyncContext = this.asyncContext;
if (asyncContext != null)
{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
Expand Down Expand Up @@ -152,4 +159,56 @@ protected void doDelete(HttpServletRequest request, HttpServletResponse response
assertEquals(200, response3.getStatus());
}
}

@Test
public void testSuspendedRequestThenServerStop() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
prepare(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
{
// Suspend the request.
AsyncContext asyncContext = request.startAsync();
asyncContextRef.set(asyncContext);
}

@Override
public void destroy()
{
// Try to write an error response when shutting down.
AsyncContext asyncContext = asyncContextRef.get();
try
{
HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
catch (IOException x)
{
throw new RuntimeException(x);
}
finally
{
asyncContext.complete();
}
}
});

try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
HttpTester.Request request = HttpTester.newRequest();
request.setURI(uri);
client.write(request.generate());

await().atMost(5, TimeUnit.SECONDS).until(asyncContextRef::get, Matchers.notNullValue());

server.stop();

client.socket().setSoTimeout(1000);
// The connection has been closed, no response.
HttpTester.Response response = HttpTester.parseResponse(client);
assertNull(response);
}
}
}
Loading

0 comments on commit eb1e9eb

Please sign in to comment.