Skip to content

Commit

Permalink
Fixes #6251 - Use CyclicTimeout for HTTP2Streams.
Browse files Browse the repository at this point in the history
Introduced CyclicTimeouts to manage many entities that may timeout.
Rewritten HttpDestination request timeouts using CyclicTimeouts.
HTTP2Stream does not inherit from IdleTimeout anymore; now a
CyclicTimeouts in HTTP2Session manages the stream timeouts.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 12, 2021
1 parent 4204526 commit 37f7634
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,11 @@ public void send()
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
long timeoutAt = request.getTimeoutAt();
if (timeoutAt != -1)
long timeoutAt = exchange.getExpireNanoTime();
if (timeoutAt != Long.MAX_VALUE)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(request, timeoutAt);
_totalTimeout.schedule(exchange.getRequest(), timeoutAt);
}
send(exchange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
Expand All @@ -31,7 +30,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
Expand Down Expand Up @@ -255,10 +254,7 @@ public void send(HttpExchange exchange)
{
if (enqueue(exchanges, exchange))
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
requestTimeouts.schedule(expiresAt);

requestTimeouts.schedule(exchange);
if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
Expand Down Expand Up @@ -531,61 +527,25 @@ public interface Multiplexed
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
*/
private class RequestTimeouts extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public void onTimeoutExpired()
public Iterator<HttpExchange> iterator()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);

long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);

// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
long expiresAt = request.getTimeoutAt();
if (expiresAt == -1)
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
return exchanges.iterator();
}

private void schedule(long expiresAt)
@Override
protected boolean onExpired(HttpExchange exchange)
{
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpExchange
public class HttpExchange implements CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpExchange.class);

Expand Down Expand Up @@ -89,6 +90,12 @@ public Throwable getResponseFailure()
}
}

@Override
public long getExpireNanoTime()
{
return request.getTimeoutAt();
}

/**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HttpRequest implements Request
private boolean versionExplicit;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
private long timeoutAt = Long.MAX_VALUE;
private Content content;
private boolean followRedirects;
private List<HttpCookie> cookies;
Expand Down Expand Up @@ -821,11 +821,12 @@ private void sendAsync(BiConsumer<HttpRequest, List<Response.ResponseListener>>
void sent()
{
long timeout = getTimeout();
timeoutAt = timeout > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
if (timeout > 0)
timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
}

/**
* @return The nanoTime at which the timeout expires or -1 if there is no timeout.
* @return The nanoTime at which the timeout expires or {@link Long#MAX_VALUE} if there is no timeout.
* @see #timeout(long, TimeUnit)
*/
long getTimeoutAt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.AtomicBiInteger;
Expand Down Expand Up @@ -89,12 +91,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicLong bytesWritten = new AtomicLong();
private final Scheduler scheduler;
private final EndPoint endPoint;
private final Generator generator;
private final Session.Listener listener;
private final FlowControlStrategy flowControl;
private final HTTP2Flusher flusher;
private final StreamTimeouts streamTimeouts;
private int maxLocalStreams;
private int maxRemoteStreams;
private long streamIdleTimeout;
Expand All @@ -105,12 +107,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio

public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{
this.scheduler = scheduler;
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.flusher = new HTTP2Flusher(this);
this.streamTimeouts = new StreamTimeouts(scheduler);
this.maxLocalStreams = -1;
this.maxRemoteStreams = -1;
this.localStreamIds.set(initialStreamId);
Expand Down Expand Up @@ -613,7 +615,7 @@ public Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Con

protected IStream newStream(int streamId, MetaData.Request request, boolean local)
{
return new HTTP2Stream(scheduler, this, streamId, request, local);
return new HTTP2Stream(this, streamId, request, local);
}

@Override
Expand Down Expand Up @@ -989,6 +991,11 @@ public void onFrame(Frame frame)
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
}

void scheduleTimeout(HTTP2Stream stream)
{
streamTimeouts.schedule(stream);
}

private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -1026,6 +1033,7 @@ public void onFlushed(long bytes) throws IOException
private void terminate(Throwable cause)
{
flusher.terminate(cause);
streamTimeouts.destroy();
disconnect();
}

Expand Down Expand Up @@ -2303,4 +2311,25 @@ private class Slot
private volatile List<HTTP2Flusher.Entry> entries;
}
}

private class StreamTimeouts extends CyclicTimeouts<HTTP2Stream>
{
private StreamTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public Iterator<HTTP2Stream> iterator()
{
return streams.values().stream().map(HTTP2Stream.class::cast).iterator();
}

@Override
protected boolean onExpired(HTTP2Stream stream)
{
stream.onIdleExpired(new TimeoutException("Idle timeout " + stream.getIdleTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.Promise;
Expand All @@ -48,7 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpable
public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Stream.class);

Expand All @@ -74,10 +74,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private boolean dataInitial;
private boolean dataProcess;
private boolean committed;
private long idleTimeout;
private long expireNanoTime = Long.MAX_VALUE;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
public HTTP2Stream(ISession session, int streamId, MetaData.Request request, boolean local)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
this.request = request;
Expand All @@ -86,6 +87,12 @@ public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData
this.dataInitial = true;
}

@Deprecated
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
{
this(session, streamId, request, local);
}

@Override
public int getId()
{
Expand Down Expand Up @@ -268,13 +275,39 @@ public boolean isCommitted()
return committed;
}

@Override
public boolean isOpen()
{
return !isClosed();
}

@Override
public void notIdle()
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0)
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
}

@Override
public long getExpireNanoTime()
{
return expireNanoTime;
}

@Override
public long getIdleTimeout()
{
return idleTimeout;
}

@Override
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
notIdle();
((HTTP2Session)session).scheduleTimeout(this);
}

protected void onIdleExpired(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -683,10 +716,8 @@ public void close()
}
}

@Override
public void onClose()
{
super.onClose();
notifyClosed(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
* session there is a CyclicTimeout and at the beginning of the
* request processing the timeout is canceled (via cancel()), but at
* the end of the request processing the timeout is re-scheduled.</p>
* <p>Another typical scenario is for a parent entity to manage
* the timeouts of many children entities; the timeout is scheduled
* for the child entity that expires the earlier; when the timeout
* expires, the implementation scans the children entities to find
* the expired child entities and to find the next child entity
* that expires the earlier. </p>
* <p>This implementation has a {@link Timeout} holding the time
* at which the scheduled task should fire, and a linked list of
* {@link Wakeup}, each holding the actual scheduled task.</p>
Expand All @@ -59,6 +53,8 @@
* When the Wakeup task fires, it will see that the Timeout is now
* in the future and will attach a new Wakeup with the future time
* to the Timeout, and submit a scheduler task for the new Wakeup.</p>
*
* @see CyclicTimeouts
*/
public abstract class CyclicTimeout implements Destroyable
{
Expand Down
Loading

0 comments on commit 37f7634

Please sign in to comment.