Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
Work in progress

Signed-off-by: Greg Wilkins <[email protected]>
  • Loading branch information
gregw committed Dec 4, 2019
1 parent 85eaa1f commit a699aa0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 44 deletions.
130 changes: 86 additions & 44 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
Expand Down Expand Up @@ -63,10 +62,9 @@
public class HttpOutput extends ServletOutputStream implements Runnable
{
private static final String LSTRING_FILE = "javax.servlet.LocalStrings";
private static final Callback BLOCKING_CLOSE_CALLBACK = new Callback() {};
private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE);

/*
/* TODO UPDATE!!!
ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED
--------------------------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise ise ise
Expand Down Expand Up @@ -157,6 +155,7 @@ default void resetBuffer() throws IllegalStateException
private final HttpChannelState _channelState;
private final SharedBlockingCallback _writeBlocker;
private State _state = State.OPEN;
private boolean _completing = false;
private Interceptor _interceptor;
private long _written;
private long _flushed;
Expand All @@ -166,7 +165,6 @@ default void resetBuffer() throws IllegalStateException
private int _commitSize;
private WriteListener _writeListener;
private volatile Throwable _onError;
private Callback _completeCallback;
private Callback _closeCallback;

public HttpOutput(HttpChannel channel)
Expand Down Expand Up @@ -299,32 +297,31 @@ public void complete(Closeable wrapper, Callback callback)
return;
}

// otherwise we must remember the callback and call the wrappers close.
// otherwise we must close the wrapper, but all calls to close() will now
// be treated as async anyway.
synchronized (_channelState)
{
if (_completeCallback != null)
throw new IllegalStateException();
_completeCallback = callback;
_completing = true;
_closeCallback = Callback.combine(_closeCallback, callback);
}

try
{
if (wrapper != null)
wrapper.close();
wrapper.close();
}
catch (Throwable th)
{
LOG.ignore(th);
}

// Was our close method wasn't actually called, then do a normal async close
// If the wrapper intercepted the close, then initiate directly
boolean closed;
synchronized (_channelState)
{
if (_completeCallback != null)
callback = null;
closed = _state == State.CLOSED || _state == State.CLOSING;
}
if (callback != null)
close(callback);
if (!closed)
close(null);
}

public void close(Callback callback)
Expand All @@ -338,21 +335,59 @@ public void close(Callback callback)
return;

case CLOSING:
// TODO merge the callbacks
// Close already initiated, so just add the callback to those
// executed when it is complete.
_closeCallback = Callback.combine(_closeCallback, callback);
return;

case ERROR:
callback.failed(_onError);
// TODO State change???
// TODO is this right?
Callback cb = Callback.combine(_closeCallback, callback);
_closeCallback = null;
cb.failed(_onError);
_state = State.CLOSED;
return;

case PENDING:
case UNREADY:
// Let's just add the callback so it get's noticed once write is possible.
_closeCallback = Callback.combine(_closeCallback, callback);
break;

default:
_state = State.CLOSING;
_closeCallback = Callback.combine(_closeCallback, callback);
break;
}
}

ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
channelWrite(content, !_channel.getResponse().isIncluding(), callback);
channelWrite(content, !_channel.getResponse().isIncluding(), new Callback()
{
@Override
public void succeeded()
{
callback().succeeded();
}

@Override
public void failed(Throwable x)
{
callback().failed(x);
}

public Callback callback()
{
Callback closeCallback;
synchronized (_channelState)
{
_state = State.CLOSED;
closeCallback = _closeCallback;
_closeCallback = null;
}
return closeCallback == null ? Callback.NOOP : closeCallback;
}
});
}

@Override
Expand All @@ -361,12 +396,9 @@ public void close() throws IOException
Callback callback = null;
synchronized (_channelState)
{
if (_completeCallback != null)
{
// This is a completion close,
callback = _completeCallback;
_completeCallback = null;
}
if (_completing)
// Completion has started so all closes are async
close(null);
// Else handle with blocking unless already closed.
else if (_state == State.CLOSED)
return;
Expand Down Expand Up @@ -1083,7 +1115,7 @@ public void sendContent(HttpContent httpContent, Callback callback)
InputStream in = null;
try
{
in = httpContent.getInputStream();
in = httpContent.getInputStream();
}
catch (Throwable x)
{
Expand Down Expand Up @@ -1141,20 +1173,24 @@ public void onFlushed(long bytes) throws IOException

public void recycle()
{
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;
_firstByteTimeStamp = -1;
_flushed = 0;
_closeCallback = null;
reopen();
synchronized (_channelState)
{
_state = State.OPEN;
_completing = false;
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;
_firstByteTimeStamp = -1;
_flushed = 0;
_closeCallback = null;
}
}

public void resetBuffer()
Expand Down Expand Up @@ -1224,7 +1260,6 @@ public void run()
{
error = _onError;
_onError = null;
// TODO change state?
}
}

Expand All @@ -1251,15 +1286,14 @@ public void run()
LOG.debug("onError", error);
_writeListener.onError(error);
}
catch(Throwable t)
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug(t);
}
finally
{
// TODO is this needed?
IO.close(this);
closed();
}
}

Expand Down Expand Up @@ -1289,17 +1323,22 @@ public InvocationType getInvocationType()
@Override
protected void onCompleteSuccess()
{
boolean close = false;
boolean wake = false;
synchronized (_channelState)
{
switch (_state)
{
case PENDING:
_state = State.ASYNC;
if (_closeCallback != null)
close = true;
break;

case UNREADY:
_state = _last ? State.CLOSED : State.READY;
// TODO should we close first and then call OWP?
close = true;
wake = _channel.getState().onWritePossible();
break;

Expand All @@ -1311,6 +1350,9 @@ protected void onCompleteSuccess()
}
}

if (close)
HttpOutput.this.close(null);

if (wake)
_channel.execute(_channel); // TODO can we call directly? Why execute?
}
Expand Down
52 changes: 52 additions & 0 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,58 @@ public InvocationType getInvocationType()
}
}

interface InvocableCallback extends Invocable, Callback
{
}

static Callback combine(Callback cb1, Callback cb2)
{
if (cb1 == null || cb1 == cb2)
return cb2;
if (cb2 == null)
return cb1;

return new InvocableCallback()
{
@Override
public void succeeded()
{
try
{
cb1.succeeded();
}
finally
{
cb2.succeeded();
}
}

@Override
public void failed(Throwable x)
{
try
{
cb1.failed(x);
}
catch (Throwable t)
{
if (x != t)
x.addSuppressed(t);
}
finally
{
cb2.failed(x);
}
}

@Override
public InvocationType getInvocationType()
{
return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2));
}
};
}

/**
* <p>A CompletableFuture that is also a Callback.</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ static void invokeNonBlocking(Runnable task)
}
}

static InvocationType combine(InvocationType it1, InvocationType it2)
{
if (it1 != null && it2 != null)
{
if (it1 == it2)
return it1;
if (it1 == InvocationType.EITHER)
return it2;
if (it2 == InvocationType.EITHER)
return it1;
}
return InvocationType.BLOCKING;
}

/**
* Get the invocation type of an Object.
*
Expand Down

0 comments on commit a699aa0

Please sign in to comment.