Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
reimplemented blocking close to sometimes be async

Signed-off-by: Greg Wilkins <[email protected]>
  • Loading branch information
gregw committed Dec 12, 2019
1 parent 6caa9b9 commit 32dbf97
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 31 deletions.
187 changes: 159 additions & 28 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
v
READY ------ write/flush/close ------> PENDING
^ ^ | |
| | | |
^ ^ | | |
| | | | |
| +----------isReady==true----------+ | |
| | | |
|onWriteComplete | | |onWriteComplete
| | | |
| +----------------------------+ | | |
|onWriteComplete | | | |onWriteComplete
| | | | |
| +----------isReady==false------------+ |
| | | |
| v | v
UNREADY +---ASYNC
| | | | |
| v | | v
UNREADY | +---ASYNC
*/
enum ApiState
{
BLOCKING, // Open in blocking mode
BLOCKED, // Blocked in blocking operation
ASYNC, // Open in async mode
READY, // isReady() has returned true
PENDING, // write operating in progress
UNREADY, // write operating in progress, isReady has returned false
}

/*
OPEN/BLOCKING ---- close ----> CLOSING/BLOCKED
Expand All @@ -107,7 +99,47 @@ enum ApiState
CLOSING/READY ------- onWriteComplete ----> CLOSED/READY
CLOSING/PENDING ----- onWriteComplete ----> CLOSED/ASYNC
CLOSING/UNREADY ----- onWriteComplete ----> CLOSED/UNREADY
OPEN/BLOCKING -- close -----------+ CLOSING/BLOCKING
/ | ^ \ / | ^
/ | | \ / | |
/ v | \ / v |
| OPEN/BLOCKED ----close ---+ +--|->CLOSING/BLOCKED
\ ^ | \
\ +----------------+ \
v v
OPEN/READY -----close-----------+ CLOSING/READY CLOSED/READY
^^ | \ \ ^ | |
// | \ \ / | |
// v \ \ / v |
/| OPEN/PENDING-|---close ---+ +----|->CLOSING/PENDING |
| \ / | ^ / | / \ / | |
| / | +--/-------------+ / / +--------------------+ |
| / \ v v / / \ v v
\| OPEN/ASYNC -----close ----------+ | CLOSING/ASYNC CLOSED/ASYNC
\\ \
\\ \
\v v
OPEN/UNREADY ----close --+ CLOSING/UNREADY
^ |
+---------------+
*/
enum ApiState
{
BLOCKING, // Open in blocking mode
BLOCKED, // Blocked in blocking operation
ASYNC, // Open in async mode
READY, // isReady() has returned true
PENDING, // write operating in progress
UNREADY, // write operating in progress, isReady has returned false
}

enum State
{
OPEN, // Open
Expand Down Expand Up @@ -297,6 +329,7 @@ void onWriteComplete(boolean last, Throwable failure)
release = true;
}

// Did somebody call close(Callback) while we were writing?
if (_closedCallback != null && _state == State.OPEN)
{
_state = State.CLOSING;
Expand Down Expand Up @@ -431,6 +464,9 @@ public void close(Callback callback)
}
}

if (LOG.isDebugEnabled())
LOG.debug("close({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content));

if (succeeded)
{
callback.succeeded();
Expand All @@ -444,30 +480,124 @@ public void close(Callback callback)
}

if (content != null)
channelWrite(content, true,
new AsyncCloseCB(callback));
channelWrite(content, true, new AsyncCloseCB(callback));
}

@Override
public void close() throws IOException
{
// This close is not implemented as a call to close(Callback) with
// a blocking callback because in some cases we need to make this
// call async - ie it returns immediately and the close happens in
// the background

if (_channel.getResponse().isIncluding())
{
if (LOG.isDebugEnabled())
LOG.debug("close() include softclose");
_softClose = true;
flush();
return;
}

try (Blocker blocker = _writeBlocker.acquire())
ByteBuffer content = null;
Throwable error = null;
Blocker blocker = null;
synchronized (_channelState)
{
close(blocker);
blocker.block();
onWriteComplete(true, null);
if (_onError != null)
{
error = _onError;
}
else if (_state != State.CLOSED)
{
switch (_apiState)
{
case UNREADY:
case PENDING:
// An async operation is in progress, so we soft close now
_softClose = true;

// If we are OPEN and we will not close in onWriteComplete,
if (_state == State.OPEN && _closedCallback == null)
// then use a NOOP to trigger a close from onWriteComplete
_closedCallback = Callback.NOOP;
break;

case ASYNC:
case READY:
// We are async, but with no outstanding operation, so we close asynchronously
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

case BLOCKED:
// A blocking operation is in progress. Let's just block until it is complete
blocker = _writeBlocker.acquire();
_closedCallback = Callback.combine(_closedCallback, blocker);
break;

case BLOCKING:
// Do a blocking close
blocker = _writeBlocker.acquire();
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;

default:
throw new IllegalStateException(stateString());
}
}
}

if (LOG.isDebugEnabled())
LOG.debug("close() {} e={}, c={}, b={}", stateString(), error, BufferUtil.toDetailString(content), blocker);

// Throw any error
if (error != null)
{
if (error instanceof IOException)
throw (IOException)error;
if (error instanceof RuntimeException)
throw (RuntimeException)error;
if (error instanceof Error)
throw (Error)error;
throw new IOException(error);
}
catch (Throwable failure)

if (content == null)
{
onWriteComplete(true, failure);
throw failure;
if (blocker == null)
// nothing to do or block for.
return;

// Just wait for another close to finish.
try (Blocker b = blocker)
{
b.block();
}
}
else
{
if (blocker == null)
{
// Do an async close
channelWrite(content, true, new AsyncCloseCB(Callback.NOOP));
}
else
{
// Do a blocking close
try (Blocker b = blocker)
{
channelWrite(content, true, blocker);
b.block();
onWriteComplete(true, null);
}
catch (Throwable t)
{
onWriteComplete(true, t);
}
}
}
}

Expand All @@ -478,12 +608,13 @@ public void close() throws IOException
*/
public void closed()
{
// TODO do we really need this - if so document why!!!!!
Callback callback = null;
synchronized (_channelState)
{
if (_state != State.CLOSED)
{
callback = _closedCallback;
callback = _closedCallback; // TODO is this ever non null????
_closedCallback = null;
_state = State.CLOSED;
}
Expand Down Expand Up @@ -1369,7 +1500,7 @@ public void run()

private String stateString()
{
return String.format("s=%s,api=%s", _state, _apiState);
return String.format("s=%s,api=%s,sc=%b,e=%s", _state, _apiState, _softClose, _onError);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public static Stream<Arguments> tests()
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), false, 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), true, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), true, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), false, 200, __data});
tests.add(new Object[]{new BlockingWriteCompleteHandler(), true, 200, __data});
return tests.stream().map(Arguments::of);
}
Expand Down Expand Up @@ -351,7 +351,7 @@ public void onError(Throwable t)
@Override
public String toString()
{
return String.format("%s@%x{ur=%b,c=%b}", this.getClass().getSimpleName(), hashCode(), _unReady, _close);
return String.format("AWCH@%x{ur=%b,c=%b}", hashCode(), _unReady, _close);
}
}

Expand Down

0 comments on commit 32dbf97

Please sign in to comment.