Skip to content

Commit

Permalink
Merge pull request #5953 from eclipse/jetty-10.0.x-5605-wakeup-blocke…
Browse files Browse the repository at this point in the history
…d-threads

Jetty 10.0.x Fix #5605 Unblock non container Threads
  • Loading branch information
lorban authored Feb 17, 2021
2 parents 07c59ce + ace019a commit 4cfe7b9
Show file tree
Hide file tree
Showing 15 changed files with 1,266 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@
package org.eclipse.jetty.server;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block
* Non-blocking {@link ContentProducer} implementation. Calling {@link ContentProducer#nextContent()} will never block
* but will return null when there is no available content.
*/
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);

private final AutoLock _lock = new AutoLock();
private final HttpChannel _httpChannel;
private HttpInput.Interceptor _interceptor;
private HttpInput.Content _rawContent;
Expand All @@ -41,9 +44,16 @@ class AsyncContentProducer implements ContentProducer
_httpChannel = httpChannel;
}

@Override
public AutoLock lock()
{
return _lock.lock();
}

@Override
public void recycle()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("recycling {}", this);
_interceptor = null;
Expand All @@ -57,18 +67,21 @@ public void recycle()
@Override
public HttpInput.Interceptor getInterceptor()
{
assertLocked();
return _interceptor;
}

@Override
public void setInterceptor(HttpInput.Interceptor interceptor)
{
assertLocked();
this._interceptor = interceptor;
}

@Override
public int available()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled())
Expand All @@ -79,6 +92,7 @@ public int available()
@Override
public boolean hasContent()
{
assertLocked();
boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {} {}", hasContent, this);
Expand All @@ -88,6 +102,7 @@ public boolean hasContent()
@Override
public boolean isError()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("isError = {} {}", _error, this);
return _error;
Expand All @@ -96,6 +111,7 @@ public boolean isError()
@Override
public void checkMinDataRate()
{
assertLocked();
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
Expand Down Expand Up @@ -127,6 +143,7 @@ public void checkMinDataRate()
@Override
public long getRawContentArrived()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
return _rawContentArrived;
Expand All @@ -135,6 +152,7 @@ public long getRawContentArrived()
@Override
public boolean consumeAll(Throwable x)
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}] {}", x, this);
failCurrentContent(x);
Expand Down Expand Up @@ -177,11 +195,16 @@ private void failCurrentContent(Throwable x)
_rawContent.failed(x);
_rawContent = null;
}

HttpInput.ErrorContent errorContent = new HttpInput.ErrorContent(x);
_transformedContent = errorContent;
_rawContent = errorContent;
}

@Override
public boolean onContentProducible()
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible {}", this);
return _httpChannel.getState().onReadReady();
Expand All @@ -190,6 +213,7 @@ public boolean onContentProducible()
@Override
public HttpInput.Content nextContent()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent = {} {}", content, this);
Expand All @@ -201,6 +225,7 @@ public HttpInput.Content nextContent()
@Override
public void reclaim(HttpInput.Content content)
{
assertLocked();
if (LOG.isDebugEnabled())
LOG.debug("reclaim {} {}", content, this);
if (_transformedContent == content)
Expand All @@ -215,6 +240,7 @@ public void reclaim(HttpInput.Content content)
@Override
public boolean isReady()
{
assertLocked();
HttpInput.Content content = nextTransformedContent();
if (content != null)
{
Expand Down Expand Up @@ -274,6 +300,13 @@ private HttpInput.Content nextTransformedContent()
{
// TODO does EOF need to be passed to the interceptors?

// In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel.
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;

_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
Expand Down Expand Up @@ -352,6 +385,12 @@ private HttpInput.Content produceRawContent()
return content;
}

private void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("ContentProducer must be called within lock scope");
}

@Override
public String toString()
{
Expand All @@ -365,4 +404,53 @@ public String toString()
_httpChannel
);
}

LockedSemaphore newLockedSemaphore()
{
return new LockedSemaphore();
}

/**
* A semaphore that assumes working under {@link AsyncContentProducer#lock()} scope.
*/
class LockedSemaphore
{
private final Condition _condition;
private int _permits;

private LockedSemaphore()
{
this._condition = _lock.newCondition();
}

void assertLocked()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException("LockedSemaphore must be called within lock scope");
}

void drainPermits()
{
_permits = 0;
}

void acquire() throws InterruptedException
{
while (_permits == 0)
_condition.await();
_permits--;
}

void release()
{
_permits++;
_condition.signal();
}

@Override
public String toString()
{
return getClass().getSimpleName() + " permits=" + _permits;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,31 @@

package org.eclipse.jetty.server;

import java.util.concurrent.Semaphore;

import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when
* Blocking implementation of {@link ContentProducer}. Calling {@link ContentProducer#nextContent()} will block when
* there is no available content but will never return null.
*/
class BlockingContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class);

private final Semaphore _semaphore = new Semaphore(0);
private final AsyncContentProducer _asyncContentProducer;
private final AsyncContentProducer.LockedSemaphore _semaphore;

BlockingContentProducer(AsyncContentProducer delegate)
{
_asyncContentProducer = delegate;
_semaphore = _asyncContentProducer.newLockedSemaphore();
}

@Override
public AutoLock lock()
{
return _asyncContentProducer.lock();
}

@Override
Expand Down Expand Up @@ -76,7 +82,9 @@ public long getRawContentArrived()
@Override
public boolean consumeAll(Throwable x)
{
return _asyncContentProducer.consumeAll(x);
boolean eof = _asyncContentProducer.consumeAll(x);
_semaphore.release();
return eof;
}

@Override
Expand Down Expand Up @@ -142,6 +150,7 @@ public void setInterceptor(HttpInput.Interceptor interceptor)
@Override
public boolean onContentProducible()
{
_semaphore.assertLocked();
// In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state
// DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why
// this method always returns false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.eclipse.jetty.server;

import org.eclipse.jetty.util.thread.AutoLock;

/**
* ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}.
* It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()},
Expand All @@ -24,6 +26,13 @@
*/
public interface ContentProducer
{
/**
* Lock this instance. The lock must be held before any method of this instance's
* method be called, and must be manually released afterward.
* @return the lock that is guarding this instance.
*/
AutoLock lock();

/**
* Reset all internal state and clear any held resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,20 @@ else if (noStack != null)
}

if (isCommitted())
{
abort(failure);
}
else
_state.onError(failure);
{
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,24 @@ else if (filled < 0)
}

/**
* Parse and fill data, looking for content
* Parse and fill data, looking for content.
* We do parse first, and only fill if we're out of bytes to avoid unnecessary system calls.
*/
void parseAndFillForContent()
{
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown()
int filled = Integer.MAX_VALUE;
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().

// This loop was designed by a committee and voted by a majority.
while (_parser.inContentState())
{
boolean handled = parseRequestBuffer();
if (handled || filled <= 0)
if (parseRequestBuffer())
break;
// Re-check the parser state after parsing to avoid filling,
// otherwise fillRequestBuffer() would acquire a ByteBuffer
// that may be leaked.
if (_parser.inContentState() && fillRequestBuffer() <= 0)
break;
filled = fillRequestBuffer();
}
}

Expand Down Expand Up @@ -412,9 +417,21 @@ private boolean upgrade()
@Override
public void onCompleted()
{
// Handle connection upgrades.
if (upgrade())
return;
// If we are fill interested, then a read is pending and we must abort
if (isFillInterested())
{
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
abort(new IllegalStateException());
}
else
{
// Handle connection upgrades.
if (upgrade())
return;
}

// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();

// Finish consuming the request
// If we are still expecting
Expand All @@ -424,7 +441,7 @@ public void onCompleted()
_parser.close();
}
// else abort if we can't consume all
else if (_generator.isPersistent() && !_input.consumeAll())
else if (_generator.isPersistent() && !complete)
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser);
Expand Down
Loading

0 comments on commit 4cfe7b9

Please sign in to comment.