From 2b5ffeb695a494665f203521adabc04857c2ed63 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Sun, 1 Dec 2019 10:33:58 +1100 Subject: [PATCH] Issue #4331 Async Close Complete WIP - moved the outstate from HttpOutput to HttpChannelState Error handling needs a big review Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/server/HttpChannel.java | 9 +- .../jetty/server/HttpChannelState.java | 373 +++++++-- .../org/eclipse/jetty/server/HttpOutput.java | 713 ++++++------------ .../org/eclipse/jetty/server/Response.java | 1 - .../jetty/server/AsyncCompletionTest.java | 6 + .../server/handler/NcsaRequestLogTest.java | 2 +- .../eclipse/jetty/servlet/ErrorPageTest.java | 2 +- .../jetty/test/HttpInputIntegrationTest.java | 4 +- 8 files changed, 538 insertions(+), 572 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9134588cfd99..40047fc95527 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -717,6 +717,13 @@ public boolean onRequestComplete() return result; } + void onResponseComplete() + { + if (LOG.isDebugEnabled()) + LOG.debug("onResponseComplete {}", this); + _combinedListener.onResponseEnd(_request); + } + public void onCompleted() { if (LOG.isDebugEnabled()) @@ -1194,8 +1201,6 @@ public void succeeded() _combinedListener.onResponseCommit(_request); if (_length > 0) _combinedListener.onResponseContent(_request, _content); - if (_complete && _state.completeResponse()) - _combinedListener.onResponseEnd(_request); super.succeeded(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index bf3027591200..4b64b4367f1f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.server; +import java.io.IOException; +import java.nio.channels.WritePendingException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,6 +30,7 @@ import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.server.handler.ErrorHandler; @@ -61,7 +64,7 @@ public class HttpChannelState * UPGRADED WOKEN * */ - public enum State + public enum DispatchState { IDLE, // Idle request HANDLING, // Request dispatched to filter/servlet or Async IO callback @@ -114,9 +117,9 @@ private enum InputState } /* - * The output committed state, which works together with {@link HttpOutput.State} + * The response state */ - private enum OutputState + private enum ResponseState { OPEN, COMMITTED, @@ -124,6 +127,28 @@ private enum OutputState ABORTED, } + /* + ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED + -------------------------------------------------------------------------------------------------- + setWriteListener() READY->owp ise ise ise ise ise ise + write() OPEN ise PENDING wpe wpe eof eof + flush() OPEN ise PENDING wpe wpe eof eof + close() CLOSING CLOSING CLOSING CLOSED CLOSED CLOSING CLOSED + isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true + write completed - - - ASYNC READY->owp CLOSED - + */ + enum OutputState + { + OPEN, // Open in blocking mode + ASYNC, // Open in async mode + READY, // isReady() has returned true + PENDING, // write operating in progress + UNREADY, // write operating in progress, isReady has returned false + ERROR, // An error has occured + CLOSING, // Asynchronous close in progress + CLOSED // Closed + } + /** * The actions to take as the channel moves from state to state. */ @@ -145,10 +170,11 @@ public enum Action private final HttpChannel _channel; private List _asyncListeners; - private State _state = State.IDLE; + private DispatchState _state = DispatchState.IDLE; private RequestState _requestState = RequestState.BLOCKING; - private OutputState _outputState = OutputState.OPEN; + private ResponseState _responseState = ResponseState.OPEN; private InputState _inputState = InputState.IDLE; + private OutputState _outputState = OutputState.OPEN; private boolean _initial = true; private boolean _sendError; private boolean _asyncWritePossible; @@ -160,7 +186,7 @@ protected HttpChannelState(HttpChannel channel) _channel = channel; } - public State getState() + public DispatchState getState() { synchronized (this) { @@ -251,7 +277,7 @@ private String getStatusStringLocked() return String.format("s=%s rs=%s os=%s is=%s awp=%b se=%b i=%b al=%d", _state, _requestState, - _outputState, + _responseState, _inputState, _asyncWritePossible, _sendError, @@ -271,10 +297,10 @@ public boolean commitResponse() { synchronized (this) { - switch (_outputState) + switch (_responseState) { case OPEN: - _outputState = OutputState.COMMITTED; + _responseState = ResponseState.COMMITTED; return true; default: @@ -287,10 +313,10 @@ public boolean partialResponse() { synchronized (this) { - switch (_outputState) + switch (_responseState) { case COMMITTED: - _outputState = OutputState.OPEN; + _responseState = ResponseState.OPEN; return true; default: @@ -299,28 +325,63 @@ public boolean partialResponse() } } - public boolean completeResponse() + public void onWriteComplete(boolean responseComplete) { + boolean wake = false; + boolean responseCompleted = false; + synchronized (this) { switch (_outputState) + { + case PENDING: + _outputState = responseComplete ? OutputState.CLOSED : OutputState.ASYNC; + break; + + case UNREADY: + _outputState = responseComplete ? OutputState.CLOSED : OutputState.READY; + _asyncWritePossible = true; + if (_state == DispatchState.WAITING) + { + _state = DispatchState.WOKEN; + wake = true; + } + break; + + case CLOSING: + _outputState = OutputState.CLOSED; + break; + + default: + } + + switch (_responseState) { case OPEN: case COMMITTED: - _outputState = OutputState.COMPLETED; - return true; + if (responseComplete) + { + _responseState = ResponseState.COMPLETED; + responseCompleted = true; + } + break; default: - return false; + break; } } + + if (responseCompleted) + _channel.onResponseComplete(); + if (wake) + _channel.execute(_channel); } public boolean isResponseCommitted() { synchronized (this) { - switch (_outputState) + switch (_responseState) { case OPEN: return false; @@ -334,7 +395,7 @@ public boolean isResponseCompleted() { synchronized (this) { - return _outputState == OutputState.COMPLETED; + return _responseState == ResponseState.COMPLETED; } } @@ -342,18 +403,18 @@ public boolean abortResponse() { synchronized (this) { - switch (_outputState) + switch (_responseState) { case ABORTED: return false; case OPEN: _channel.getResponse().setStatus(500); - _outputState = OutputState.ABORTED; + _responseState = ResponseState.ABORTED; return true; default: - _outputState = OutputState.ABORTED; + _responseState = ResponseState.ABORTED; return true; } } @@ -375,13 +436,13 @@ public Action handling() if (_requestState != RequestState.BLOCKING) throw new IllegalStateException(getStatusStringLocked()); _initial = true; - _state = State.HANDLING; + _state = DispatchState.HANDLING; return Action.DISPATCH; case WOKEN: if (_event != null && _event.getThrowable() != null && !_sendError) { - _state = State.HANDLING; + _state = DispatchState.HANDLING; return Action.ASYNC_ERROR; } @@ -413,7 +474,7 @@ protected Action unhandle() if (LOG.isDebugEnabled()) LOG.debug("unhandle {}", toStringLocked()); - if (_state != State.HANDLING) + if (_state != DispatchState.HANDLING) throw new IllegalStateException(this.getStatusStringLocked()); _initial = false; @@ -428,7 +489,7 @@ protected Action unhandle() private Action nextAction(boolean handling) { // Assume we can keep going, but exceptions are below - _state = State.HANDLING; + _state = DispatchState.HANDLING; if (_sendError) { @@ -485,7 +546,7 @@ private Action nextAction(boolean handling) Scheduler scheduler = _channel.getScheduler(); if (scheduler != null && _timeoutMs > 0 && !_event.hasTimeoutTask()) _event.setTimeoutTask(scheduler.schedule(_event, _timeoutMs, TimeUnit.MILLISECONDS)); - _state = State.WAITING; + _state = DispatchState.WAITING; return Action.WAIT; case DISPATCH: @@ -510,11 +571,11 @@ private Action nextAction(boolean handling) return Action.COMPLETE; case COMPLETING: - _state = State.WAITING; + _state = DispatchState.WAITING; return Action.WAIT; case COMPLETED: - _state = State.IDLE; + _state = DispatchState.IDLE; return Action.TERMINATED; default: @@ -530,7 +591,7 @@ public void startAsync(AsyncContextEvent event) { if (LOG.isDebugEnabled()) LOG.debug("startAsync {}", toStringLocked()); - if (_state != State.HANDLING || _requestState != RequestState.BLOCKING) + if (_state != DispatchState.HANDLING || _requestState != RequestState.BLOCKING) throw new IllegalStateException(this.getStatusStringLocked()); _requestState = RequestState.ASYNC; @@ -594,9 +655,9 @@ public void dispatch(ServletContext context, String path) if (path != null) _event.setDispatchPath(path); - if (_requestState == RequestState.ASYNC && _state == State.WAITING) + if (_requestState == RequestState.ASYNC && _state == DispatchState.WAITING) { - _state = State.WOKEN; + _state = DispatchState.WOKEN; dispatch = true; } _requestState = RequestState.DISPATCH; @@ -620,9 +681,9 @@ protected void timeout() return; _requestState = RequestState.EXPIRE; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { - _state = State.WOKEN; + _state = DispatchState.WOKEN; dispatch = true; } } @@ -643,7 +704,7 @@ protected void onTimeout() { if (LOG.isDebugEnabled()) LOG.debug("onTimeout {}", toStringLocked()); - if (_requestState != RequestState.EXPIRING || _state != State.HANDLING) + if (_requestState != RequestState.EXPIRING || _state != DispatchState.HANDLING) throw new IllegalStateException(toStringLocked()); event = _event; listeners = _asyncListeners; @@ -703,10 +764,10 @@ public void complete() default: throw new IllegalStateException(this.getStatusStringLocked()); } - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { handle = true; - _state = State.WOKEN; + _state = DispatchState.WOKEN; } } @@ -728,9 +789,9 @@ public void asyncError(Throwable failure) if (LOG.isDebugEnabled()) LOG.debug("asyncError " + toStringLocked(), failure); - if (_state == State.WAITING && _requestState == RequestState.ASYNC) + if (_state == DispatchState.WAITING && _requestState == RequestState.ASYNC) { - _state = State.WOKEN; + _state = DispatchState.WOKEN; _event.addThrowable(failure); event = _event; } @@ -758,7 +819,7 @@ protected void onError(Throwable th) LOG.debug("thrownException " + getStatusStringLocked(), th); // This can only be called from within the handle loop - if (_state != State.HANDLING) + if (_state != DispatchState.HANDLING) throw new IllegalStateException(getStatusStringLocked()); // If sendError has already been called, we can only handle one failure at a time! @@ -897,12 +958,25 @@ public void sendError(int code, String message) default: throw new IllegalStateException(getStatusStringLocked()); } - if (_outputState != OutputState.OPEN) - throw new IllegalStateException("Response is " + _outputState); + if (_responseState != ResponseState.OPEN) + throw new IllegalStateException("Response is " + _responseState); + + switch (_outputState) + { + case OPEN: + case READY: + case ASYNC: + _outputState = OutputState.CLOSED; + break; + + default: + throw new IllegalStateException(_outputState.toString()); + } response.setStatus(code); response.closedBySendError(); + request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext()); request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI()); request.setAttribute(ERROR_SERVLET_NAME, request.getServletName()); @@ -955,9 +1029,9 @@ protected void completed() _requestState = RequestState.COMPLETED; aListeners = null; event = null; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { - _state = State.WOKEN; + _state = DispatchState.WOKEN; handle = true; } } @@ -997,9 +1071,9 @@ protected void completed() synchronized (this) { _requestState = RequestState.COMPLETED; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { - _state = State.WOKEN; + _state = DispatchState.WOKEN; handle = true; } } @@ -1027,9 +1101,9 @@ protected void recycle() break; } _asyncListeners = null; - _state = State.IDLE; + _state = DispatchState.IDLE; _requestState = RequestState.BLOCKING; - _outputState = OutputState.OPEN; + _responseState = ResponseState.OPEN; _initial = true; _inputState = InputState.IDLE; _asyncWritePossible = false; @@ -1054,7 +1128,7 @@ public void upgrade() throw new IllegalStateException(getStatusStringLocked()); } _asyncListeners = null; - _state = State.UPGRADED; + _state = DispatchState.UPGRADED; _requestState = RequestState.BLOCKING; _initial = true; _inputState = InputState.IDLE; @@ -1089,7 +1163,7 @@ public boolean isIdle() { synchronized (this) { - return _state == State.IDLE; + return _state == DispatchState.IDLE; } } @@ -1114,7 +1188,7 @@ public boolean isSuspended() { synchronized (this) { - return _state == State.WAITING || _state == State.HANDLING && _requestState == RequestState.ASYNC; + return _state == DispatchState.WAITING || _state == DispatchState.HANDLING && _requestState == RequestState.ASYNC; } } @@ -1130,7 +1204,7 @@ public boolean isAsyncStarted() { synchronized (this) { - if (_state == State.HANDLING) + if (_state == DispatchState.HANDLING) return _requestState != RequestState.BLOCKING; return _requestState == RequestState.ASYNC || _requestState == RequestState.EXPIRING; } @@ -1235,7 +1309,7 @@ public void onReadUnready() { case IDLE: case READY: - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { interested = true; _inputState = InputState.REGISTERED; @@ -1287,10 +1361,10 @@ public boolean onContentAdded() case REGISTER: case REGISTERED: _inputState = InputState.READY; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { woken = true; - _state = State.WOKEN; + _state = DispatchState.WOKEN; } break; @@ -1321,10 +1395,10 @@ public boolean onReadReady() { case IDLE: _inputState = InputState.READY; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { woken = true; - _state = State.WOKEN; + _state = DispatchState.WOKEN; } break; @@ -1354,10 +1428,10 @@ public boolean onReadPossible() { case REGISTERED: _inputState = InputState.POSSIBLE; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { woken = true; - _state = State.WOKEN; + _state = DispatchState.WOKEN; } break; @@ -1384,32 +1458,199 @@ public boolean onReadEof() // Force read ready so onAllDataRead can be called _inputState = InputState.READY; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) { woken = true; - _state = State.WOKEN; + _state = DispatchState.WOKEN; } } return woken; } - public boolean onWritePossible() + void reopenOutput() { - boolean wake = false; + synchronized (this) + { + _outputState = OutputState.OPEN; + } + } + + OutputState closeOutput() + { + OutputState state; + synchronized (this) + { + switch (_outputState) + { + case CLOSING: + return OutputState.CLOSED; + + case CLOSED: + _outputState = OutputState.CLOSED; + break; + + case UNREADY: + case PENDING: + // TODO + throw new IllegalStateException(); + + default: + _outputState = OutputState.CLOSING; + break; + } + + state = _outputState; + } + return state; + } + + void closedOutput() + { + synchronized (this) + { + _outputState = OutputState.CLOSED; + } + } + + boolean isOutputClosed() + { + synchronized (this) + { + switch (_outputState) + { + case CLOSING: + case CLOSED: + return true; + default: + return false; + } + } + } + + boolean isOutputAsync() + { + synchronized (this) + { + switch (_outputState) + { + case ASYNC: + case READY: + case PENDING: + case UNREADY: + return true; + default: + return false; + } + } + } + boolean setWriteListener() + { synchronized (this) { if (LOG.isDebugEnabled()) - LOG.debug("onWritePossible {}", toStringLocked()); + LOG.debug("onSetWriteListener {}", toStringLocked()); + + if (_requestState == RequestState.BLOCKING) + throw new IllegalStateException("!ASYNC"); + + if (_outputState != OutputState.OPEN) + throw new IllegalStateException(); + + _outputState = OutputState.READY; _asyncWritePossible = true; - if (_state == State.WAITING) + if (_state == DispatchState.WAITING) + { + _state = DispatchState.WOKEN; + return true; + } + } + return false; + } + + OutputState onWrite(boolean flush) + { + synchronized (this) + { + switch (_outputState) + { + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + return _outputState = flush ? OutputState.PENDING : OutputState.ASYNC; + + case UNREADY: + throw new WritePendingException(); + + case ERROR: + case OPEN: + case PENDING: + case CLOSING: + case CLOSED: + return _outputState; + + default: + throw new IllegalStateException(_outputState.toString()); + } + } + } + + OutputState onWriteContent() throws IOException + { + synchronized (this) + { + if (_responseState != ResponseState.OPEN) + throw new IOException("cannot sendContent(), output already committed"); + + switch (_outputState) { - _state = State.WOKEN; - wake = true; + case OPEN: + _outputState = OutputState.PENDING; + break; + + case ERROR: + break; + + case CLOSING: + case CLOSED: + throw new EofException("Closed"); + + default: + throw new IllegalStateException(_outputState.toString()); } + return _outputState; } + } - return wake; + boolean isOutputReady() + { + synchronized (this) + { + switch (_outputState) + { + case OPEN: + case READY: + case ERROR: + case CLOSING: + case CLOSED: + return true; + + case ASYNC: + _outputState = OutputState.READY; + return true; + + case PENDING: + _outputState = OutputState.UNREADY; + return false; + + case UNREADY: + return false; + + default: + throw new IllegalStateException(); + } + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f651179862ab..618d1f1193e5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -24,14 +24,12 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritePendingException; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; @@ -66,28 +64,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable private static final Callback BLOCKING_CLOSE_CALLBACK = new Callback() {}; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* - ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED - -------------------------------------------------------------------------------------------------- - setWriteListener() READY->owp ise ise ise ise ise ise - write() OPEN ise PENDING wpe wpe eof eof - flush() OPEN ise PENDING wpe wpe eof eof - close() CLOSING CLOSING CLOSING CLOSED CLOSED CLOSING CLOSED - isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true - write completed - - - ASYNC READY->owp CLOSED - - */ - enum State - { - OPEN, // Open in blocking mode - ASYNC, // Open in async mode - READY, // isReady() has returned true - PENDING, // write operating in progress - UNREADY, // write operating in progress, isReady has returned false - ERROR, // An error has occured - CLOSING, // Asynchronous close in progress - CLOSED // Closed - } - /** * The HttpOutput.Interceptor is a single intercept point for all * output written to the HttpOutput: via writer; via output stream; @@ -153,11 +129,11 @@ default void resetBuffer() throws IllegalStateException private static Logger LOG = Log.getLogger(HttpOutput.class); private static final ThreadLocal _encoder = new ThreadLocal<>(); - private final AtomicReference _state = new AtomicReference<>(State.OPEN); private final HttpChannel _channel; + private final HttpChannelState _state; private final SharedBlockingCallback _writeBlocker; private Interceptor _interceptor; - private long _written; + private long _written; // Bytes written before interception (eg. before compression) private long _flushed; private long _firstByteTimeStamp = -1; private ByteBuffer _aggregate; @@ -170,6 +146,7 @@ default void resetBuffer() throws IllegalStateException public HttpOutput(HttpChannel channel) { _channel = channel; + _state = _channel.getState(); _interceptor = channel; _writeBlocker = new WriteBlocker(channel); HttpConfiguration config = channel.getHttpConfiguration(); @@ -209,13 +186,7 @@ public long getWritten() public void reopen() { - _state.set(State.OPEN); - } - - private boolean isLastContentToWrite(int len) - { - _written += len; - return _channel.getResponse().isAllContentWritten(_written); + _state.reopenOutput(); } public boolean isAllContentWritten() @@ -228,14 +199,12 @@ protected Blocker acquireWriteBlockingCallback() throws IOException return _writeBlocker.acquire(); } - private void write(ByteBuffer content, boolean complete) throws IOException + private void blockingWrite(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeBlocker.acquire()) { write(content, complete, blocker); blocker.block(); - if (complete) - closed(); } catch (Exception failure) { @@ -267,26 +236,6 @@ private void abort(Throwable failure) _channel.abort(failure); } - public void closedBySendError() - { - while (true) - { - State state = _state.get(); - switch (state) - { - case OPEN: - case READY: - case ASYNC: - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - return; - - default: - throw new IllegalStateException(state.toString()); - } - } - } - public void close(Closeable wrapper, Callback callback) { _closeCallback = callback; @@ -318,79 +267,46 @@ public void close() { Callback closeCallback = _closeCallback == null ? BLOCKING_CLOSE_CALLBACK : _closeCallback; - while (true) + switch (_state.closeOutput()) { - State state = _state.get(); - switch (state) + case CLOSED: { - case CLOSING: - case CLOSED: - { - _closeCallback = null; - closeCallback.succeeded(); - return; - } - case ASYNC: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Thus we simulate a call to isReady here, by - // trying to move to READY state. Either way we continue. - _state.compareAndSet(state, State.READY); - continue; - } - case UNREADY: - case PENDING: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Because the prior write has not yet completed - // and/or isReady has not been called, this close is allowed, but will - // abort the response. - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - IOException ex = new IOException("Closed while Pending/Unready"); - LOG.warn(ex.toString()); - LOG.debug(ex); - abort(ex); - _closeCallback = null; - closeCallback.failed(ex); - return; - } - default: + _closeCallback = null; + closeCallback.succeeded(); + return; + } + case CLOSING: + { + // Do a normal close by writing the aggregate buffer or an empty buffer. If we are + // not including, then indicate this is the last write. + try { - if (!_state.compareAndSet(state, State.CLOSING)) - continue; - - // Do a normal close by writing the aggregate buffer or an empty buffer. If we are - // not including, then indicate this is the last write. - try + ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + if (closeCallback == BLOCKING_CLOSE_CALLBACK) { - ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - if (closeCallback == BLOCKING_CLOSE_CALLBACK) - { - // Do a blocking close - write(content, !_channel.getResponse().isIncluding()); - _closeCallback = null; - closeCallback.succeeded(); - } - else - { - _closeCallback = null; - write(content, !_channel.getResponse().isIncluding(), closeCallback); - } + // Do a blocking close + blockingWrite(content, !_channel.getResponse().isIncluding()); + _state.onWriteComplete(true); + _closeCallback = null; + closeCallback.succeeded(); } - catch (IOException x) + else { - LOG.ignore(x); // Ignore it, it's been already logged in write(). _closeCallback = null; - closeCallback.failed(x); + write(content, !_channel.getResponse().isIncluding(), closeCallback); } - return; } + catch (IOException x) + { + LOG.ignore(x); // Ignore it, it's been already logged in write(). + _closeCallback = null; + closeCallback.failed(x); + } + break; } + + default: + throw new IllegalStateException(); } } @@ -400,36 +316,8 @@ public void close() */ public void closed() { - while (true) - { - State state = _state.get(); - switch (state) - { - case CLOSED: - { - return; - } - case UNREADY: - { - if (_state.compareAndSet(state, State.ERROR)) - { - if (_onError == null) - _onError = new EofException("Async closed"); - releaseBuffer(); - return; - } - break; - } - default: - { - if (!_state.compareAndSet(state, State.CLOSED)) - break; - - releaseBuffer(); - return; - } - } - } + _state.closedOutput(); + releaseBuffer(); } public ByteBuffer getBuffer() @@ -455,138 +343,56 @@ private void releaseBuffer() public boolean isClosed() { - switch (_state.get()) - { - case CLOSING: - case CLOSED: - return true; - default: - return false; - } + return _state.isOutputClosed(); } public boolean isAsync() { - switch (_state.get()) - { - case ASYNC: - case READY: - case PENDING: - case UNREADY: - return true; - default: - return false; - } + return _state.isOutputAsync(); } @Override public void flush() throws IOException { - while (true) + switch (_state.onWrite(true)) { - State state = _state.get(); - switch (state) - { - case OPEN: - write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); - return; - - case ASYNC: - throw new IllegalStateException("isReady() not called"); - - case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - new AsyncFlush().iterate(); - return; + case OPEN: + blockingWrite(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); + _state.onWriteComplete(false); + break; - case UNREADY: - throw new WritePendingException(); + case PENDING: + new AsyncFlush().iterate(); + break; - case ERROR: - throw new EofException(_onError); + case ERROR: + throw new IOException(_onError); - case PENDING: - case CLOSING: - case CLOSED: - return; + case CLOSING: + case CLOSED: + return; - default: - throw new IllegalStateException(state.toString()); - } + default: + throw new IllegalStateException(); } } @Override public void write(byte[] b, int off, int len) throws IOException { - // Async or Blocking ? - while (true) - { - State state = _state.get(); - switch (state) - { - case OPEN: - // process blocking below - break; - - case ASYNC: - throw new IllegalStateException("isReady() not called"); - - case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Should we aggregate? - boolean last = isLastContentToWrite(len); - if (!last && len <= _commitSize) - { - acquireBuffer(); - - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); - - // return if we are not complete, not full and filled all the content - if (filled == len && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(_state.get().toString()); - return; - } - - // adjust offset/length - off += filled; - len -= filled; - } - - // Do the asynchronous writing from the callback - new AsyncWrite(b, off, len, last).iterate(); - return; - - case PENDING: - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + boolean last = _channel.getResponse().isAllContentWritten(_written + len); + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + // Should we aggregate? + // Yes - if the write is smaller than the commitSize + // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. + boolean aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); + boolean flush = !aggregate || len >= space; - default: - throw new IllegalStateException(state.toString()); - } - break; - } + HttpChannelState.OutputState state = _state.onWrite(flush); - // handle blocking write + _written += len; - // Should we aggregate? - // Yes - if the write is smaller than the commitSize (==aggregate buffer size) - // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. - boolean last = isLastContentToWrite(len); - if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate))) + if (aggregate) { acquireBuffer(); @@ -594,7 +400,7 @@ public void write(byte[] b, int off, int len) throws IOException int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not the last write and have aggregated all of the content - if (!last && filled == len && !BufferUtil.isFull(_aggregate)) + if (!flush) return; // adjust offset/length @@ -602,158 +408,149 @@ public void write(byte[] b, int off, int len) throws IOException len -= filled; } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) + switch (state) { - write(_aggregate, last && len == 0); - - // should we fill aggregate again from the buffer? - if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + case OPEN: { - BufferUtil.append(_aggregate, b, off, len); - return; + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + { + blockingWrite(_aggregate, last && len == 0); + + // should we fill aggregate again from the buffer? + if (len > 0 && !last && len <= BufferUtil.space(_aggregate)) + { + BufferUtil.append(_aggregate, b, off, len); + _state.onWriteComplete(false); + return; + } + } + + // write any remaining content in the buffer directly + if (len > 0) + { + // write a buffer capacity at a time to avoid JVM pooling large direct buffers + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 + ByteBuffer view = ByteBuffer.wrap(b, off, len); + while (len > getBufferSize()) + { + int p = view.position(); + int l = p + getBufferSize(); + view.limit(p + getBufferSize()); + blockingWrite(view, false); + len -= getBufferSize(); + view.limit(l + Math.min(len, getBufferSize())); + view.position(l); + } + blockingWrite(view, last); + } + else if (last) + { + blockingWrite(BufferUtil.EMPTY_BUFFER, true); + } + _state.onWriteComplete(last); + break; } - } - // write any remaining content in the buffer directly - if (len > 0) - { - // write a buffer capacity at a time to avoid JVM pooling large direct buffers - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 - ByteBuffer view = ByteBuffer.wrap(b, off, len); - while (len > getBufferSize()) + case PENDING: { - int p = view.position(); - int l = p + getBufferSize(); - view.limit(p + getBufferSize()); - write(view, false); - len -= getBufferSize(); - view.limit(l + Math.min(len, getBufferSize())); - view.position(l); + // Do the asynchronous writing from the callback + new AsyncWrite(b, off, len, last).iterate(); + return; } - write(view, last); - } - else if (last) - { - write(BufferUtil.EMPTY_BUFFER, true); + + case ERROR: + throw new IOException(_onError); + + case CLOSING: + case CLOSED: + return; + + default: + throw new IllegalStateException(); } } public void write(ByteBuffer buffer) throws IOException { - // This write always bypasses aggregate buffer + int len = BufferUtil.length(buffer); + boolean last = _channel.getResponse().isAllContentWritten(_written + len); - // Async or Blocking ? - while (true) + // This write always bypasses aggregate buffer + switch (_state.onWrite(true)) { - State state = _state.get(); - switch (state) + case OPEN: { - case OPEN: - // process blocking below - break; + // handle blocking write + _written += len; - case ASYNC: - throw new IllegalStateException("isReady() not called"); - - case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Do the asynchronous writing from the callback - boolean last = isLastContentToWrite(buffer.remaining()); - new AsyncWrite(buffer, last).iterate(); - return; - - case PENDING: - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + blockingWrite(_aggregate, last && len == 0); - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + // write any remaining content in the buffer directly + if (len > 0) + blockingWrite(buffer, last); + else if (last) + blockingWrite(BufferUtil.EMPTY_BUFFER, true); - default: - throw new IllegalStateException(state.toString()); + _state.onWriteComplete(last); + return; } - break; - } - // handle blocking write - int len = BufferUtil.length(buffer); - boolean last = isLastContentToWrite(len); + case PENDING: + { + // Do the asynchronous writing from the callback + _written += len; + new AsyncWrite(buffer, last).iterate(); + return; + } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, last && len == 0); + case ERROR: + throw new EofException(_onError); - // write any remaining content in the buffer directly - if (len > 0) - write(buffer, last); - else if (last) - write(BufferUtil.EMPTY_BUFFER, true); + default: + throw new IllegalStateException(); + } } @Override public void write(int b) throws IOException { - _written += 1; - boolean complete = _channel.getResponse().isAllContentWritten(_written); - - // Async or Blocking ? - while (true) - { - switch (_state.get()) - { - case OPEN: - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); + boolean last = _channel.getResponse().isAllContentWritten(_written + 1); + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + boolean flush = last || space == 1; - // Check if all written or full - if (complete || BufferUtil.isFull(_aggregate)) - write(_aggregate, complete); - break; + HttpChannelState.OutputState state = _state.onWrite(flush); - case ASYNC: - throw new IllegalStateException("isReady() not called"); + _written++; - case READY: - if (!_state.compareAndSet(State.READY, State.PENDING)) - continue; + acquireBuffer(); + BufferUtil.append(_aggregate, (byte)b); - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); - - // Check if all written or full - if (!complete && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(); - return; - } - - // Do the asynchronous writing from the callback - new AsyncFlush().iterate(); - return; - - case PENDING: - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); + if (!flush) + return; - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + switch (state) + { + case OPEN: + { + blockingWrite(_aggregate, last); + _state.onWriteComplete(last); + return; + } - default: - throw new IllegalStateException(); + case PENDING: + { + // Do the asynchronous writing from the callback + new AsyncFlush().iterate(); + return; } - break; + + case ERROR: + throw new EofException(_onError); + default: + throw new IllegalStateException(); } } @@ -883,7 +680,8 @@ public void sendContent(ByteBuffer content) throws IOException LOG.debug("sendContent({})", BufferUtil.toDetailString(content)); _written += content.remaining(); - write(content, true); + blockingWrite(content, true); + _state.onWriteComplete(true); } /** @@ -1028,34 +826,29 @@ public void sendContent(HttpContent httpContent, Callback callback) callback.failed(new IOException("cannot sendContent() after write()")); return; } - if (_channel.isCommitted()) - { - callback.failed(new IOException("cannot sendContent(), output already committed")); - return; - } - while (true) + try { - switch (_state.get()) + switch (_state.onWriteContent()) { - case OPEN: - if (!_state.compareAndSet(State.OPEN, State.PENDING)) - continue; + case PENDING: break; - case ERROR: - callback.failed(new EofException(_onError)); - return; - - case CLOSING: - case CLOSED: - callback.failed(new EofException("Closed")); + callback.failed(_onError == null ? new IllegalStateException() : _onError); return; - default: throw new IllegalStateException(); } - break; + } + catch (IOException e) + { + if (_onError != null && e != _onError) + { + _onError.addSuppressed(e); + callback.failed(_onError); + } + else + callback.failed(e); } ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; @@ -1162,50 +955,16 @@ public void resetBuffer() @Override public void setWriteListener(WriteListener writeListener) { - if (!_channel.getState().isAsync()) - throw new IllegalStateException("!ASYNC"); - - if (_state.compareAndSet(State.OPEN, State.READY)) - { - _writeListener = writeListener; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); - } - else - throw new IllegalStateException(); + boolean execute = _state.setWriteListener(); + _writeListener = writeListener; + if (execute) + _channel.execute(_channel); } @Override public boolean isReady() { - while (true) - { - switch (_state.get()) - { - case OPEN: - case READY: - case ERROR: - case CLOSING: - case CLOSED: - return true; - - case ASYNC: - if (!_state.compareAndSet(State.ASYNC, State.READY)) - continue; - return true; - - case PENDING: - if (!_state.compareAndSet(State.PENDING, State.UNREADY)) - continue; - return false; - - case UNREADY: - return false; - - default: - throw new IllegalStateException(); - } - } + return _state.isOutputReady(); } @Override @@ -1213,42 +972,25 @@ public void run() { while (true) { - State state = _state.get(); + // TODO ERROR state? if (_onError != null) { - switch (state) + Throwable th = _onError; + _onError = null; + if (LOG.isDebugEnabled()) + LOG.debug("onError", th); + + try { - case CLOSING: - case CLOSED: - case ERROR: - { - _onError = null; - return; - } - default: - { - if (_state.compareAndSet(state, State.ERROR)) - { - Throwable th = _onError; - _onError = null; - if (LOG.isDebugEnabled()) - LOG.debug("onError", th); - - try - { - _writeListener.onError(th); - } - finally - { - IO.close(this); - } - - return; - } - } + _writeListener.onError(th); } - continue; + finally + { + IO.close(this); + } + + return; } // We do not check the state here. Strictly speaking the state should @@ -1269,7 +1011,7 @@ public void run() try { _writeListener.onWritePossible(); - break; + return; } catch (Throwable e) { @@ -1281,7 +1023,7 @@ public void run() @Override public String toString() { - return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get()); + return String.format("%s@%x", this.getClass().getSimpleName(), hashCode()); } private abstract class AsyncICB extends IteratingCallback @@ -1302,41 +1044,14 @@ public InvocationType getInvocationType() @Override protected void onCompleteSuccess() { - while (true) - { - State last = _state.get(); - switch (last) - { - case PENDING: - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - continue; - break; - - case UNREADY: - if (!_state.compareAndSet(State.UNREADY, State.READY)) - continue; - if (_last) - closed(); - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); - break; - - case CLOSED: - break; - - default: - throw new IllegalStateException(); - } - break; - } + _state.onWriteComplete(_last); } @Override public void onCompleteFailure(Throwable e) { _onError = e == null ? new IOException() : e; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + _state.onWriteComplete(_last); // TODO pass error? } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 60b5b10e8b68..f435ef016a24 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -148,7 +148,6 @@ public void reopen() public void closedBySendError() { setErrorSent(true); - _out.closedBySendError(); } /** diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 8ceac625e271..02bd0b3478df 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -214,7 +214,11 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr while(_threadPool.getBusyThreads() != base) { if (System.nanoTime() > end) + { + _threadPool.setDetailedDump(true); + _threadPool.dumpStdErr(); throw new TimeoutException(); + } Thread.sleep(10); } @@ -247,6 +251,7 @@ public void onWritePossible() throws IOException { while (out.isReady()) { + System.err.println("isReady "+ (bytes!=null)); if (bytes != null) { response.setContentType("text/plain"); @@ -260,6 +265,7 @@ public void onWritePossible() throws IOException return; } } + System.err.println("!isReady "); } @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/NcsaRequestLogTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/NcsaRequestLogTest.java index dc66ea72fc5e..1cfc5aebbcb0 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/NcsaRequestLogTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/NcsaRequestLogTest.java @@ -582,7 +582,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques { try { - while (baseRequest.getHttpChannel().getState().getState() != HttpChannelState.State.WAITING) + while (baseRequest.getHttpChannel().getState().getState() != HttpChannelState.DispatchState.WAITING) { Thread.sleep(10); } diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/ErrorPageTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/ErrorPageTest.java index 5e589464a05b..b2d3aca73b20 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/ErrorPageTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/ErrorPageTest.java @@ -527,7 +527,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t hold.countDown(); // Wait until request async waiting - while (Request.getBaseRequest(request).getHttpChannelState().getState() == HttpChannelState.State.HANDLING) + while (Request.getBaseRequest(request).getHttpChannelState().getState() == HttpChannelState.DispatchState.HANDLING) { try { diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java index f476a49940d9..e57b970939e7 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java @@ -257,7 +257,7 @@ public void run() case ASYNC_OTHER_WAIT: { final CountDownLatch latch = new CountDownLatch(1); - final HttpChannelState.State S = request.getHttpChannelState().getState(); + final HttpChannelState.DispatchState S = request.getHttpChannelState().getState(); new Thread() { @Override @@ -269,7 +269,7 @@ public void run() fail("latch expired"); // Spin until state change - HttpChannelState.State s = request.getHttpChannelState().getState(); + HttpChannelState.DispatchState s = request.getHttpChannelState().getState(); while (request.getHttpChannelState().getState() == S) { Thread.yield();