Skip to content

Commit

Permalink
Fixes #5888 - Limit usage of HTTP/2 connections.
Browse files Browse the repository at this point in the history
* Now using a placeholder HTTP2Stream for streams opened with a PRIORITY, but not yet with a HEADERS.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Oct 27, 2024
1 parent eccf90c commit 446cae3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,13 +836,21 @@ protected HTTP2Stream createLocalStream(int streamId, MetaData.Request request,
}

HTTP2Stream stream = newStream(streamId, request, true);
if (streams.putIfAbsent(streamId, stream) == null)

HTTP2Stream newStream = streams.compute(streamId, (k, v) ->
{
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (v == null || v.isPlaceHolder())
return stream;
return null;
});

if (newStream != null)
{
newStream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(newStream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {} for {}", stream, this);
return stream;
LOG.debug("Created local {} for {}", newStream, this);
return newStream;
}
else
{
Expand Down Expand Up @@ -2111,20 +2119,30 @@ private int priority(PriorityFrame frame, Callback callback)
int streamId = reserveSlot(slot, currentStreamId, callback::failed);
if (streamId > 0)
{
if (currentStreamId <= 0)
HTTP2Stream stream;
if (currentStreamId > 0)
{
stream = streams.get(streamId);
}
else
{
frame = frame.withStreamId(streamId);
slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x ->
// Create a placeholder stream, replaced when a follow-up HEADERS frame will be sent.
stream = HTTP2Session.this.createLocalStream(streamId, null, callback::failed);
}

if (stream != null)
{
HTTP2Session.this.onStreamDestroyed(streamId);
callback.failed(x);
})));
flush();
}
else if (streamId < 0)
{
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
slot.entries = List.of(newEntry(frame, stream, Callback.from(callback::succeeded, x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
callback.failed(x);
})));
flush();
return streamId;
}
}
return streamId;
return 0;
}

private void newLocalStream(HTTP2Stream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
Expand All @@ -2145,10 +2163,6 @@ private void newLocalStream(HTTP2Stream.FrameList frameList, Promise<Stream> pro
return;
freeSlot(slot, streamId);
}
else if (streamId < 0)
{
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
}
}

private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
Expand Down Expand Up @@ -2199,10 +2213,6 @@ private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listen
return;
freeSlot(slot, streamId);
}
else if (streamId < 0)
{
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
}
}

private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<Stream> promise, Stream.Listener listener, int streamId)
Expand Down Expand Up @@ -2255,6 +2265,7 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
return 0;
}

boolean created = false;
int reservedStreamId = 0;
Throwable failure = null;
try (AutoLock ignored = lock.lock())
Expand All @@ -2268,9 +2279,14 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
reservedStreamId = localStreamIds.getAndAdd(2);
// Check for overflow.
if (reservedStreamId > 0)
{
slots.offer(slot);
created = true;
}
else
{
failure = new IllegalStateException("max streams exceeded");
}
}
else
{
Expand All @@ -2287,13 +2303,21 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
reservedStreamId = streamId;
slots.offer(slot);
created = true;
break;
}
}
else
{
reservedStreamId = streamId;
slots.offer(slot);
if (streams.containsKey(streamId))
{
reservedStreamId = streamId;
slots.offer(slot);
}
else
{
failure = new IllegalArgumentException("invalid stream id " + streamId);
}
break;
}
}
Expand All @@ -2314,9 +2338,16 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
}
}
if (failure == null)
HTTP2Session.this.onStreamCreated(streamId);
{
if (created)
HTTP2Session.this.onStreamCreated(streamId);
}
else
{
fail.accept(failure);
if (reservedStreamId < 0)
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
}
return reservedStreamId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public int hashCode()
return streamId;
}

boolean isPlaceHolder()
{
return request == null;
}

@Override
public Object getAttachment()
{
Expand Down Expand Up @@ -959,7 +964,7 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,request=%s,attachment=%s}",
getClass().getSimpleName(),
getId(),
session.hashCode(),
Expand All @@ -971,6 +976,7 @@ public String toString()
remoteReset,
closeState,
NanoTime.millisSince(creationNanoTime),
request,
attachment);
}

Expand Down

0 comments on commit 446cae3

Please sign in to comment.