Skip to content

Commit

Permalink
#7157 add missing callback calls in H2 reset codepath
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban committed Nov 25, 2021
1 parent f86a719 commit cbea7eb
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,23 @@ public void data(DataFrame frame, Callback callback)
@Override
public void reset(ResetFrame frame, Callback callback)
{
Throwable resetFailure = null;
synchronized (this)
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
{
resetFailure = failure;
}
else
{
localReset = true;
failure = new EOFException("reset");
}
}
((HTTP2Session)session).reset(this, frame, callback);
if (resetFailure != null)
callback.failed(resetFailure);
else
((HTTP2Session)session).reset(this, frame, callback);
}

private boolean startWrite(Callback callback)
Expand Down Expand Up @@ -397,6 +406,8 @@ private void onReset(ResetFrame frame, Callback callback)
close();
if (session.removeStream(this))
notifyReset(this, frame, callback);
else
callback.succeeded();
}

private void onPush(PushPromiseFrame frame, Callback callback)
Expand All @@ -421,6 +432,8 @@ private void onFailure(FailureFrame frame, Callback callback)
close();
if (session.removeStream(this))
notifyFailure(this, frame, callback);
else
callback.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,26 @@ public void send(HttpExchange exchange)
public void release()
{
setStream(null);
if (connection.release(this))
boolean released = connection.release(this);
if (LOG.isDebugEnabled())
LOG.debug("released channel? {} {}", released, this);
if (released)
getHttpDestination().release(getHttpConnection());
}

@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
Stream stream = getStream();
if (LOG.isDebugEnabled())
LOG.debug("exchange terminated {} {}", result, stream);
if (result.isSucceeded())
{
release();
}
else
{
Stream stream = getStream();
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.eclipse.jetty.http2.client.http;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -47,6 +50,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -62,10 +66,17 @@ public class MultiplexedConnectionPoolTest
private HttpClient client;

private void startServer(Handler handler) throws Exception
{
startServer(handler, MAX_MULTIPLEX, -1L);
}

private void startServer(Handler handler, int maxConcurrentStreams, long streamIdleTimeout) throws Exception
{
server = new Server();
HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX);
http2ServerConnectionFactory.setMaxConcurrentStreams(maxConcurrentStreams);
if (streamIdleTimeout > 0)
http2ServerConnectionFactory.setStreamIdleTimeout(streamIdleTimeout);
connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
Expand Down Expand Up @@ -208,6 +219,91 @@ private void sendRequest(CountDownLatch[] reqClientDoneLatches, int i)
});
}

@Test
public void testStreamIdleTimeout() throws Exception
{
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
ConnectionPoolFactory factory = new ConnectionPoolFactory("StreamIdleTimeout", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(pool.getBean(Pool.class));
return pool;
});

startServer(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
int req = Integer.parseInt(target.substring(1));
try
{
response.getWriter().println("req " + req + " executed");
response.getWriter().flush();
}
catch (Exception e)
{
throw new ServletException(e);
}
}
}, 64, 1L);

HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
client.start();

List<CompletableFuture<Void>> futures = new ArrayList<>();
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 100; i++)
{
CompletableFuture<Void> cf = new CompletableFuture<>();
client.newRequest("localhost", connector.getLocalPort())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
counter.incrementAndGet();
cf.complete(null);
});
futures.add(cf);
}

// Wait for all requests to complete.
for (CompletableFuture<Void> cf : futures)
{
cf.get(5, TimeUnit.SECONDS);
}
assertThat(counter.get(), is(100));

// All remaining pooled connections should be in IDLE state.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
for (Pool<Connection>.Entry value : poolRef.get().values())
{
if (!value.isIdle())
return false;
}
return true;
});
}

@Test
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
{
Expand Down

0 comments on commit cbea7eb

Please sign in to comment.