From 446cae338f3ab2ceab424462d16f63e8b60014f6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Sun, 27 Oct 2024 21:41:14 +0100 Subject: [PATCH] Fixes #5888 - Limit usage of HTTP/2 connections. * Now using a placeholder HTTP2Stream for streams opened with a PRIORITY, but not yet with a HEADERS. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/http2/HTTP2Session.java | 85 +++++++++++++------ .../org/eclipse/jetty/http2/HTTP2Stream.java | 8 +- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 94e2d1923a0c..2eadca494988 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -836,13 +836,21 @@ protected HTTP2Stream createLocalStream(int streamId, MetaData.Request request, } HTTP2Stream stream = newStream(streamId, request, true); - if (streams.putIfAbsent(streamId, stream) == null) + + HTTP2Stream newStream = streams.compute(streamId, (k, v) -> { - stream.setIdleTimeout(getStreamIdleTimeout()); - flowControl.onStreamCreated(stream); + if (v == null || v.isPlaceHolder()) + return stream; + return null; + }); + + if (newStream != null) + { + newStream.setIdleTimeout(getStreamIdleTimeout()); + flowControl.onStreamCreated(newStream); if (LOG.isDebugEnabled()) - LOG.debug("Created local {} for {}", stream, this); - return stream; + LOG.debug("Created local {} for {}", newStream, this); + return newStream; } else { @@ -2111,20 +2119,30 @@ private int priority(PriorityFrame frame, Callback callback) int streamId = reserveSlot(slot, currentStreamId, callback::failed); if (streamId > 0) { - if (currentStreamId <= 0) + HTTP2Stream stream; + if (currentStreamId > 0) + { + stream = streams.get(streamId); + } + else + { frame = frame.withStreamId(streamId); - slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x -> + // Create a placeholder stream, replaced when a follow-up HEADERS frame will be sent. + stream = HTTP2Session.this.createLocalStream(streamId, null, callback::failed); + } + + if (stream != null) { - HTTP2Session.this.onStreamDestroyed(streamId); - callback.failed(x); - }))); - flush(); - } - else if (streamId < 0) - { - close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP); + slot.entries = List.of(newEntry(frame, stream, Callback.from(callback::succeeded, x -> + { + HTTP2Session.this.onStreamDestroyed(streamId); + callback.failed(x); + }))); + flush(); + return streamId; + } } - return streamId; + return 0; } private void newLocalStream(HTTP2Stream.FrameList frameList, Promise promise, Stream.Listener listener) @@ -2145,10 +2163,6 @@ private void newLocalStream(HTTP2Stream.FrameList frameList, Promise pro return; freeSlot(slot, streamId); } - else if (streamId < 0) - { - close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP); - } } private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer failFn) @@ -2199,10 +2213,6 @@ private void push(PushPromiseFrame frame, Promise promise, Stream.Listen return; freeSlot(slot, streamId); } - else if (streamId < 0) - { - close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP); - } } private boolean createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) @@ -2255,6 +2265,7 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) return 0; } + boolean created = false; int reservedStreamId = 0; Throwable failure = null; try (AutoLock ignored = lock.lock()) @@ -2268,9 +2279,14 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) reservedStreamId = localStreamIds.getAndAdd(2); // Check for overflow. if (reservedStreamId > 0) + { slots.offer(slot); + created = true; + } else + { failure = new IllegalStateException("max streams exceeded"); + } } else { @@ -2287,13 +2303,21 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) { reservedStreamId = streamId; slots.offer(slot); + created = true; break; } } else { - reservedStreamId = streamId; - slots.offer(slot); + if (streams.containsKey(streamId)) + { + reservedStreamId = streamId; + slots.offer(slot); + } + else + { + failure = new IllegalArgumentException("invalid stream id " + streamId); + } break; } } @@ -2314,9 +2338,16 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) } } if (failure == null) - HTTP2Session.this.onStreamCreated(streamId); + { + if (created) + HTTP2Session.this.onStreamCreated(streamId); + } else + { fail.accept(failure); + if (reservedStreamId < 0) + close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP); + } return reservedStreamId; } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index aa91f4d37c4b..fe235e9337c9 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -110,6 +110,11 @@ public int hashCode() return streamId; } + boolean isPlaceHolder() + { + return request == null; + } + @Override public Object getAttachment() { @@ -959,7 +964,7 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,attachment=%s}", + return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,request=%s,attachment=%s}", getClass().getSimpleName(), getId(), session.hashCode(), @@ -971,6 +976,7 @@ public String toString() remoteReset, closeState, NanoTime.millisSince(creationNanoTime), + request, attachment); }