Skip to content

Commit

Permalink
Fixes #10786 - TLS handshake failures leak HttpConnection.RequestTime…
Browse files Browse the repository at this point in the history
…outs tasks

Moved the call to destroy the CyclicTimeouts to a close() call that is always called.

Fixed NPE in ManagedSelector.getTotalKeys().

Fixed exception handling to avoid infinite recursion in SslConnection.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Oct 26, 2023
1 parent e98c0ae commit fa166d0
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,7 @@ protected void doStop() throws Exception
decoderFactories.clear();
handlers.clear();

for (HttpDestination destination : destinations.values())
{
destination.close();
}
destinations.values().forEach(HttpDestination::close);
destinations.clear();

requestListeners.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public Object getAttachment()
return attachment;
}

protected void destroy()
public void destroy()
{
requestTimeouts.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ protected void close(Throwable failure)
getEndPoint().close();
if (LOG.isDebugEnabled())
LOG.debug("Closed {}", this);
delegate.destroy();
}
}

Expand Down Expand Up @@ -339,7 +340,6 @@ protected void normalizeRequest(HttpRequest request)
public void close()
{
HttpConnectionOverHTTP.this.close();
destroy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ protected void close(Throwable failure)
getEndPoint().close();
if (LOG.isDebugEnabled())
LOG.debug("Closed {}", this);

delegate.destroy();
}
}

Expand Down Expand Up @@ -387,7 +389,6 @@ public SendFailure send(HttpExchange exchange)
public void close()
{
HttpConnectionOverFCGI.this.close();
destroy();
}

protected void close(Throwable failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ void remove()
public void close()
{
close(new AsynchronousCloseException());
destroy();
}

protected void close(Throwable failure)
Expand All @@ -221,6 +220,8 @@ protected void close(Throwable failure)
channel.destroy();
channel = idleChannels.poll();
}

destroy();
}
}

Expand Down
12 changes: 7 additions & 5 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ protected void doStop() throws Exception
@ManagedAttribute(value = "Total number of keys", readonly = true)
public int getTotalKeys()
{
return _selector.keys().size();
Selector selector = _selector;
return selector == null ? 0 : selector.keys().size();
}

@ManagedAttribute(value = "Average number of selected keys", readonly = true)
Expand Down Expand Up @@ -555,15 +556,16 @@ private void processUpdates()
if (LOG.isDebugEnabled())
LOG.debug("updateable {}", _updateable.size());

Selector selector = _selector;
for (SelectorUpdate update : _updateable)
{
if (_selector == null)
if (selector == null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}", update);
update.update(_selector);
update.update(selector);
}
catch (Throwable x)
{
Expand All @@ -572,13 +574,13 @@ private void processUpdates()
}
_updateable.clear();

Selector selector;
int updates;
try (AutoLock l = _lock.lock())
{
updates = _updates.size();
_selecting = updates == 0;
selector = _selecting ? null : _selector;
if (_selecting)
selector = null;
}

if (LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,7 @@ private Throwable handleException(Throwable x, String context)
if (LOG.isDebugEnabled())
LOG.debug("{} stored {} exception", this, context, x);
}
else if (x != _failure)
else if (x != _failure && x.getCause() != _failure)
{
_failure.addSuppressed(x);
if (LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.http.client;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class TLSHandshakeFailureTest extends AbstractTest<TransportScenario>
{
@Override
public void init(Transport transport) throws IOException
{
setScenario(new TransportScenario(transport));
assumeTrue(scenario.transport.isTlsBased());
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testTLSWrapAbruptSSLEngineClose(Transport transport) throws Exception
{
TLSHandshakeAction action = SSLEngine::closeOutbound;
testTLSWrapFailure(transport, action, 1);
testTLSWrapFailure(transport, action, 2);
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testTLSWrapAbruptSSLEngineFailure(Transport transport) throws Exception
{
TLSHandshakeAction action = sslEngine ->
{
throw new SSLException("test");
};
testTLSWrapFailure(transport, action, 1);
testTLSWrapFailure(transport, action, 2);
}

private void testTLSWrapFailure(Transport transport, TLSHandshakeAction action, int wrapCount) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client = new HttpClient(scenario.client.getTransport())
{
@Override
protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.Client sslContextFactory, ClientConnectionFactory connectionFactory)
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
private final AtomicInteger wraps = new AtomicInteger();

@Override
protected SSLEngineResult wrap(SSLEngine sslEngine, ByteBuffer[] input, ByteBuffer output) throws SSLException
{
if (wraps.incrementAndGet() == wrapCount)
action.accept(sslEngine);
return super.wrap(sslEngine, input, output);
}
};
}
};
}
};
ScheduledThreadPoolExecutor schedulerService = new ScheduledThreadPoolExecutor(1);
schedulerService.setRemoveOnCancelPolicy(true);
scenario.client.setScheduler(new ScheduledExecutorScheduler(schedulerService));
scenario.client.start();

int count = 10;
for (int i = 0; i < count; ++i)
{
assertThrows(ExecutionException.class, () -> scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send()
);
}

// There should be a task scheduled by HttpDestination
// to expire HttpExchanges while they are queued.
// There may be also a task for the idle timeout of the connection.
List<Runnable> tasks = schedulerService.shutdownNow();
assertThat(tasks.toString(), tasks.size(), allOf(greaterThan(0), lessThan(count + 1)));
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testTLSUnwrapAbruptSSLEngineClose(Transport transport) throws Exception
{
TLSHandshakeAction action = SSLEngine::closeInbound;
testTLSUnwrapFailure(transport, action, 1);
testTLSUnwrapFailure(transport, action, 2);
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testTLSUnwrapAbruptSSLEngineFailure(Transport transport) throws Exception
{
TLSHandshakeAction action = sslEngine ->
{
throw new SSLException("test");
};
testTLSUnwrapFailure(transport, action, 1);
testTLSUnwrapFailure(transport, action, 2);
}

private void testTLSUnwrapFailure(Transport transport, TLSHandshakeAction action, int unwrapCount) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.client.stop();
scenario.client = new HttpClient(scenario.client.getTransport())
{
@Override
protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.Client sslContextFactory, ClientConnectionFactory connectionFactory)
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
private final AtomicInteger unwraps = new AtomicInteger();

@Override
protected SSLEngineResult unwrap(SSLEngine sslEngine, ByteBuffer input, ByteBuffer output) throws SSLException
{
if (unwraps.incrementAndGet() == unwrapCount)
action.accept(sslEngine);
return super.unwrap(sslEngine, input, output);
}
};
}
};
}
};
ScheduledThreadPoolExecutor schedulerService = new ScheduledThreadPoolExecutor(1);
schedulerService.setRemoveOnCancelPolicy(true);
scenario.client.setScheduler(new ScheduledExecutorScheduler(schedulerService));
scenario.client.start();

int count = 10;
for (int i = 0; i < count; ++i)
{
assertThrows(ExecutionException.class, () -> scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send()
);
}

// There should be a task scheduled by HttpDestination
// to expire HttpExchanges while they are queued.
// There may be also a task for the idle timeout of the connection.
List<Runnable> tasks = schedulerService.shutdownNow();
assertThat(tasks.toString(), tasks.size(), allOf(greaterThan(0), lessThan(count + 1)));
}

@FunctionalInterface
private interface TLSHandshakeAction
{
void accept(SSLEngine sslEngine) throws SSLException;
}
}

0 comments on commit fa166d0

Please sign in to comment.