From b9cdfd3a9dfc9c3878386a733856f8569fcb451f Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 31 Oct 2024 17:54:18 +0200 Subject: [PATCH] Fixes #5888 - Limit usage of HTTP/2 connections. (#12401) * Made the high-level HttpConnectionOverHTTP2 implement MaxUsable, so it cannot be used to open more streams than allowed. * Introduced HTTP2Session.maxTotalLocalStreams to limit the max number of streams that might be created in a connection. Linked this new property with the high-level AbstractConnectionPool.maxUsage. * Implemented low-level handling of explicit stream ids provided by applications. * Implemented low-level handling of stream id overflow. * Added test cases. Signed-off-by: Simone Bordet Signed-off-by: Ludovic Orban Co-authored-by: Ludovic Orban --- .../jetty/client/AbstractConnectionPool.java | 2 +- .../internal/HTTPSessionListenerPromise.java | 9 +- .../internal/HttpConnectionOverHTTP2.java | 13 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 221 +++++++++++++++--- .../org/eclipse/jetty/http2/HTTP2Stream.java | 3 +- .../eclipse/jetty/http2/tests/HTTP2Test.java | 155 ++++++++++++ 6 files changed, 361 insertions(+), 42 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 4d9171a23fad..35233d70493d 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -341,7 +341,7 @@ protected Connection activate() int maxUsage = getMaxUsage(); if (connection instanceof MaxUsable maxUsable) - maxUsage = maxUsable.getMaxUsage(); + maxUsage = Math.min(maxUsage, maxUsable.getMaxUsage()); if (maxUsage > 0) { EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment(); diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java index 54d2d5a7d7a6..2d9e50d6a02f 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java @@ -18,7 +18,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicMarkableReference; +import org.eclipse.jetty.client.AbstractConnectionPool; import org.eclipse.jetty.client.Connection; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.Destination; import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.http2.HTTP2Connection; @@ -73,10 +75,15 @@ public void onSettings(Session session, SettingsFrame frame) private void onServerPreface(Session session) { + Destination destination = destination(); HTTP2Connection http2Connection = (HTTP2Connection)context.get(HTTP2Connection.class.getName()); - HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session, http2Connection); + HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination, session, http2Connection); if (this.connection.compareAndSet(null, connection, false, true)) { + ConnectionPool connectionPool = destination.getConnectionPool(); + if (connectionPool instanceof AbstractConnectionPool pool) + connection.setMaxUsage(pool.getMaxUsage()); + // The connection promise must be called synchronously // so that the HTTP/1 to HTTP/2 upgrade can create the // HTTP/2 stream that represents the HTTP/1 request. diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java index ade527797cb0..15e77d308b9d 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java @@ -50,7 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable +public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP2.class); @@ -108,6 +108,17 @@ public int getMaxMultiplex() return ((HTTP2Session)session).getMaxLocalStreams(); } + @Override + public int getMaxUsage() + { + return ((HTTP2Session)session).getMaxTotalLocalStreams(); + } + + void setMaxUsage(int maxUsage) + { + ((HTTP2Session)session).setMaxTotalLocalStreams(maxUsage); + } + @Override protected Iterator getHttpChannels() { diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 43e71f9be644..5f622fa09a47 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; @@ -81,8 +82,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session, Parser.Listener { private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class); + // SPEC: stream numbers can go up to 2^31-1, but increment by 2. + private static final int MAX_TOTAL_LOCAL_STREAMS = Integer.MAX_VALUE / 2; private final Map streams = new ConcurrentHashMap<>(); + private final Set priorityStreams = ConcurrentHashMap.newKeySet(); private final AtomicLong streamsOpened = new AtomicLong(); private final AtomicLong streamsClosed = new AtomicLong(); private final StreamsState streamsState = new StreamsState(); @@ -93,6 +97,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger(); private final AtomicLong bytesWritten = new AtomicLong(); + private final AtomicInteger totalLocalStreams = new AtomicInteger(); private final EndPoint endPoint; private final Parser parser; private final Generator generator; @@ -102,6 +107,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session private final StreamTimeouts streamTimeouts; private int maxLocalStreams; private int maxRemoteStreams; + private int maxTotalLocalStreams; private long streamIdleTimeout; private int initialSessionRecvWindow; private int writeThreshold; @@ -120,6 +126,7 @@ public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Parser parser, Gener this.streamTimeouts = new StreamTimeouts(scheduler); this.maxLocalStreams = -1; this.maxRemoteStreams = -1; + this.maxTotalLocalStreams = MAX_TOTAL_LOCAL_STREAMS; this.localStreamIds.set(initialStreamId); this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); @@ -165,6 +172,20 @@ public void setMaxLocalStreams(int maxLocalStreams) this.maxLocalStreams = maxLocalStreams; } + @ManagedAttribute("The maximum number of local streams that can be opened") + public int getMaxTotalLocalStreams() + { + return maxTotalLocalStreams; + } + + public void setMaxTotalLocalStreams(int maxTotalLocalStreams) + { + if (maxTotalLocalStreams > MAX_TOTAL_LOCAL_STREAMS) + throw new IllegalArgumentException("Invalid max total local streams " + maxTotalLocalStreams); + if (maxTotalLocalStreams > 0) + this.maxTotalLocalStreams = maxTotalLocalStreams; + } + @ManagedAttribute("The maximum number of concurrent remote streams") public int getMaxRemoteStreams() { @@ -908,9 +929,21 @@ void updateStreamCount(boolean local, int deltaStreams, int deltaClosing) remoteStreamCount.add(deltaStreams, deltaClosing); } + private boolean removeStream(int streamId) + { + HTTP2Stream removed = streams.get(streamId); + if (removed != null) + return removeStream(removed); + priorityStreams.remove(streamId); + onStreamClosed(streamId); + onStreamDestroyed(streamId); + return true; + } + public boolean removeStream(Stream stream) { int streamId = stream.getId(); + priorityStreams.remove(streamId); HTTP2Stream removed = streams.remove(streamId); if (removed == null) return false; @@ -1068,6 +1101,11 @@ private void onStreamClosed(Stream stream) { if (LOG.isDebugEnabled()) LOG.debug("Closed stream {} for {}", stream, this); + onStreamClosed(stream.getId()); + } + + private void onStreamClosed(int streamId) + { streamsClosed.incrementAndGet(); } @@ -2107,17 +2145,23 @@ private int priority(PriorityFrame frame, Callback callback) Slot slot = new Slot(); int currentStreamId = frame.getStreamId(); int streamId = reserveSlot(slot, currentStreamId, callback::failed); - if (streamId > 0) + if (streamId <= 0) + return 0; + + if (!priorityStreams.add(streamId)) { - if (currentStreamId <= 0) - frame = frame.withStreamId(streamId); - slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x -> - { - HTTP2Session.this.onStreamDestroyed(streamId); - callback.failed(x); - }))); - flush(); + callback.failed(new IllegalStateException("Duplicate stream " + streamId)); + return 0; } + + if (currentStreamId <= 0) + frame = frame.withStreamId(streamId); + slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x -> + { + removeStream(streamId); + callback.failed(x); + }))); + flush(); return streamId; } @@ -2126,19 +2170,19 @@ private void newLocalStream(HTTP2Stream.FrameList frameList, Promise pro Slot slot = new Slot(); int currentStreamId = frameList.getStreamId(); int streamId = reserveSlot(slot, currentStreamId, promise::failed); - if (streamId > 0) + if (streamId <= 0) + return; + + List frames = frameList.getFrames(); + if (currentStreamId <= 0) { - List frames = frameList.getFrames(); - if (currentStreamId <= 0) - { - frames = frames.stream() - .map(frame -> frame.withStreamId(streamId)) - .collect(Collectors.toList()); - } - if (createLocalStream(slot, frames, promise, listener, streamId)) - return; - freeSlot(slot, streamId); + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); } + if (createLocalStream(slot, frames, promise, listener, streamId)) + return; + freeSlot(slot, streamId); } private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer failFn) @@ -2147,7 +2191,7 @@ private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Co HTTP2Session.this.onStreamCreated(streamId); HTTP2Stream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x -> { - HTTP2Session.this.onStreamDestroyed(streamId); + removeStream(streamId); failFn.accept(x); }); if (stream != null) @@ -2181,14 +2225,14 @@ private boolean newRemoteStream(int streamId) private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); - int streamId = reserveSlot(slot, 0, promise::failed); - if (streamId > 0) - { - frame = frame.withStreamId(streamId); - if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) - return; - freeSlot(slot, streamId); - } + int streamId = reserveSlot(slot, frame.getPromisedStreamId(), promise::failed); + if (streamId <= 0) + return; + + frame = frame.withStreamId(streamId); + if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) + return; + freeSlot(slot, streamId); } private boolean createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) @@ -2205,7 +2249,7 @@ private boolean createLocalStream(Slot slot, List frames, Promise promise.succeeded(stream), x -> { - HTTP2Session.this.onStreamDestroyed(streamId); + removeStream(stream); promise.failed(x); }); int count = frames.size(); @@ -2235,19 +2279,105 @@ private MetaData.Request extractMetaDataRequest(StreamFrame frame) private int reserveSlot(Slot slot, int streamId, Consumer fail) { + if (streamId < 0 || (streamId > 0 && !isLocalStream(streamId))) + { + fail.accept(new IllegalArgumentException("invalid stream id " + streamId)); + return 0; + } + + int maxTotal = getMaxTotalLocalStreams(); + + boolean created = false; + int reservedStreamId = 0; Throwable failure = null; - boolean reserved = false; try (AutoLock ignored = lock.lock()) { // SPEC: cannot create new streams after receiving a GOAWAY. if (closed == CloseState.NOT_CLOSED) { - if (streamId <= 0) + if (streamId == 0) + { + int total = incrementTotalLocalStreams(maxTotal); + if (total <= maxTotal) + { + // Stream id generated internally. + reservedStreamId = localStreamIds.getAndUpdate(v -> + { + if (v >= 0) + return v + 2; + return v; + }); + if (reservedStreamId > 0) + { + slots.offer(slot); + created = true; + } + else + { + failure = decrementTotalLocalStreams("max stream id exceeded"); + } + } + else + { + failure = decrementTotalLocalStreams("max total streams exceeded"); + } + } + else { - streamId = localStreamIds.getAndAdd(2); - reserved = true; + // Stream id is given. + while (true) + { + int nextStreamId = localStreamIds.get(); + if (nextStreamId > 0) + { + if (streamId >= nextStreamId) + { + int total = incrementTotalLocalStreams(maxTotal); + if (total <= maxTotal) + { + // This may overflow, but it's ok as the current streamId + // is valid; it is the next streamId that will be invalid. + int newNextStreamId = streamId + 2; + if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId)) + { + reservedStreamId = streamId; + slots.offer(slot); + created = true; + break; + } + else + { + totalLocalStreams.decrementAndGet(); + } + } + else + { + failure = decrementTotalLocalStreams("max total streams exceeded"); + break; + } + } + else + { + if (streams.containsKey(streamId) || priorityStreams.contains(streamId)) + { + reservedStreamId = streamId; + slots.offer(slot); + } + else + { + failure = new IllegalArgumentException("invalid stream id " + streamId); + } + break; + } + } + else + { + reservedStreamId = nextStreamId; + failure = new IllegalStateException("max stream id exceeded"); + break; + } + } } - slots.offer(slot); } else { @@ -2258,15 +2388,14 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) } if (failure == null) { - if (reserved) + if (created) HTTP2Session.this.onStreamCreated(streamId); - return streamId; } else { fail.accept(failure); - return 0; } + return reservedStreamId; } private void freeSlot(Slot slot, int streamId) @@ -2279,6 +2408,22 @@ private void freeSlot(Slot slot, int streamId) flush(); } + private int incrementTotalLocalStreams(int maxTotal) + { + return totalLocalStreams.updateAndGet(v -> + { + if (v <= maxTotal) + return v + 1; + return v; + }); + } + + private Throwable decrementTotalLocalStreams(String message) + { + totalLocalStreams.decrementAndGet(); + return new IllegalStateException(message); + } + /** *

Iterates over the entries of the slot queue to flush them.

*

The flush proceeds until either one of the following two conditions is true:

diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index d6490012a7db..9024805eba1c 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -965,7 +965,7 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,attachment=%s}", + return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,request=%s,attachment=%s}", getClass().getSimpleName(), getId(), session.hashCode(), @@ -977,6 +977,7 @@ public String toString() remoteReset, closeState, NanoTime.millisSince(creationNanoTime), + request, attachment); } diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java index b4a25d8f7710..66f02599b7eb 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.hpack.HpackException; @@ -1139,6 +1140,160 @@ public void onClose(Session session, GoAwayFrame frame, Callback callback) assertThat(failure.getMessage(), containsString("invalid_hpack_block")); } + @Test + public void testClientExceedsConnectionMaxUsage() throws Exception + { + start(new ServerSessionListener() {}); + + Session session = newClientSession(new Session.Listener() {}); + ((HTTP2Session)session).setMaxTotalLocalStreams(1); + + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}); + + // Must not be able to create more streams than allowed. + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Must not be able to create more streams than allowed with an explicit streamId. + int explicitStreamId = Integer.MAX_VALUE; + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Session must still be valid. + assertFalse(session.isClosed()); + } + + @Test + public void testClientExceedsMaxStreamId() throws Exception + { + start(new ServerSessionListener() {}); + + Session session = newClientSession(new Session.Listener() {}); + + // Use the max possible streamId. + int explicitStreamId = Integer.MAX_VALUE; + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}); + + // Must not be able to create more streams. + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Session must still be valid. + assertFalse(session.isClosed()); + } + + @Test + public void testClientCreatesStreamsWithExplicitStreamId() throws Exception + { + start(new ServerSessionListener() {}); + + Session session = newClientSession(new Session.Listener() {}); + + int evenStreamId = 128; + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(evenStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Equivalent to Integer.MAX_VALUE + 2. + int negativeStreamId = Integer.MIN_VALUE + 1; + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(negativeStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + int explicitStreamId = 127; + Stream stream = session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + assertThat(stream.getId(), equalTo(explicitStreamId)); + + stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + assertThat(stream.getId(), equalTo(explicitStreamId + 2)); + + // Cannot create streams with smaller id. + int smallerStreamId = explicitStreamId - 2; + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(smallerStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Should be possible to create the stream with the max id. + explicitStreamId = Integer.MAX_VALUE; + session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + // After the stream with the max id, cannot create more streams on this connection. + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + // Session must still be valid. + assertFalse(session.isClosed()); + } + + @Test + public void testServerPushesStreamsWithExplicitStreamId() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + start(new ServerSessionListener() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + try + { + int oddStreamId = 129; + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), oddStreamId, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + int negativeStreamId = Integer.MIN_VALUE; + assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), negativeStreamId, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + int explicitStreamId = 128; + Stream pushedStream = stream.push(new PushPromiseFrame(stream.getId(), explicitStreamId, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + assertThat(pushedStream.getId(), equalTo(explicitStreamId)); + + pushedStream = stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + assertThat(pushedStream.getId(), equalTo(explicitStreamId + 2)); + + // Cannot push streams with smaller id. + int smallerStreamId = explicitStreamId - 2; + assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), smallerStreamId, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Should be possible to push the stream with the max id. + explicitStreamId = Integer.MAX_VALUE - 1; + stream.push(new PushPromiseFrame(stream.getId(), explicitStreamId, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + // After the stream with the max id, cannot push more streams on this connection. + assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + // Session must still be valid. + assertFalse(stream.getSession().isClosed()); + + latch.countDown(); + + return null; + } + catch (Throwable x) + { + throw new RuntimeException(x); + } + } + }); + + Session session = newClientSession(new Session.Listener() {}); + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + private static void sleep(long time) { try