Skip to content

Commit

Permalink
expose ConnectionManager (#9073)
Browse files Browse the repository at this point in the history
  • Loading branch information
yawkat authored Apr 10, 2023
1 parent beddff7 commit 4b590ff
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.micronaut.scheduling.instrument.InvocationInstrumenter;
import io.micronaut.websocket.exceptions.WebSocketSessionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
Expand Down Expand Up @@ -123,10 +124,11 @@

/**
* Connection manager for {@link DefaultHttpClient}. This class manages the lifecycle of netty
* channels (wrapped in {@link PoolHandle}s), including pooling and timeouts.
* channels (wrapped in {@link PoolHandle}s), including pooling and timeouts.<br>
* Note: This class is public for use in micronaut-oracle-cloud.
*/
@Internal
class ConnectionManager {
public class ConnectionManager {
final InvocationInstrumenter instrumenter;

private final HttpVersionSelection httpVersion;
Expand Down Expand Up @@ -259,6 +261,15 @@ private static NioEventLoopGroup createEventLoopGroup(HttpClientConfiguration co
return group;
}

/**
* Allocator for this connection manager. Used by micronaut-oracle-cloud.
*
* @return The configured allocator
*/
public final ByteBufAllocator alloc() {
return (ByteBufAllocator) bootstrap.config().options().getOrDefault(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
}

/**
* For testing.
*
Expand All @@ -267,7 +278,7 @@ private static NioEventLoopGroup createEventLoopGroup(HttpClientConfiguration co
*/
@NonNull
@SuppressWarnings("unused")
List<Channel> getChannels() {
final List<Channel> getChannels() {
List<Channel> channels = new ArrayList<>();
for (Pool pool : pools.values()) {
pool.forEachConnection(c -> channels.add(((Pool.ConnectionHolder) c).channel));
Expand All @@ -282,7 +293,7 @@ List<Channel> getChannels() {
* @since 4.0.0
*/
@SuppressWarnings("unused")
int liveRequestCount() {
final int liveRequestCount() {
AtomicInteger count = new AtomicInteger();
for (Pool pool : pools.values()) {
pool.forEachConnection(c -> {
Expand All @@ -301,7 +312,7 @@ int liveRequestCount() {
/**
* @see DefaultHttpClient#start()
*/
public void start() {
public final void start() {
// only need to start new group if it's managed by us
if (shutdownGroup) {
group = createEventLoopGroup(configuration, threadFactory);
Expand All @@ -324,7 +335,7 @@ private void initBootstrap() {
/**
* @see DefaultHttpClient#stop()
*/
public void shutdown() {
public final void shutdown() {
for (Pool pool : pools.values()) {
pool.shutdown();
}
Expand Down Expand Up @@ -353,19 +364,19 @@ public void shutdown() {
*
* @return Whether this connection manager is still running and can serve requests
*/
public boolean isRunning() {
public final boolean isRunning() {
return !group.isShutdown();
}

/**
* Use the bootstrap to connect to the given host. Also does some proxy setup. This method is
* protected: The test suite overrides it to return embedded channels instead.
* not final: The test suite overrides it to return embedded channels instead.
*
* @param requestKey The host to connect to
* @param channelInitializer The initializer to use
* @return Future that terminates when the TCP connection is established.
*/
protected ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, ChannelInitializer<?> channelInitializer) {
ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, ChannelInitializer<?> channelInitializer) {
String host = requestKey.getHost();
int port = requestKey.getPort();
Bootstrap localBootstrap = bootstrap.clone();
Expand Down Expand Up @@ -404,7 +415,7 @@ private SslContext buildSslContext(DefaultHttpClient.RequestKey requestKey) {
* @param blockHint Optional information about what threads are blocked for this connection request
* @return A mono that will complete once the channel is ready for transmission
*/
Mono<PoolHandle> connect(DefaultHttpClient.RequestKey requestKey, @Nullable BlockHint blockHint) {
public final Mono<PoolHandle> connect(DefaultHttpClient.RequestKey requestKey, @Nullable BlockHint blockHint) {
return pools.computeIfAbsent(requestKey, Pool::new).acquire(blockHint);
}

Expand All @@ -416,7 +427,7 @@ Mono<PoolHandle> connect(DefaultHttpClient.RequestKey requestKey, @Nullable Bloc
* @param handler The websocket message handler
* @return A mono that will complete when the handshakes complete
*/
Mono<?> connectForWebsocket(DefaultHttpClient.RequestKey requestKey, ChannelHandler handler) {
final Mono<?> connectForWebsocket(DefaultHttpClient.RequestKey requestKey, ChannelHandler handler) {
Sinks.Empty<Object> initial = new CancellableMonoSink<>(null);

ChannelFuture connectFuture = doConnect(requestKey, new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -507,7 +518,7 @@ private void configureProxy(ChannelPipeline pipeline, boolean secure, String hos
}
}

<V, C extends Future<V>> void addInstrumentedListener(
final <V, C extends Future<V>> void addInstrumentedListener(
Future<? extends V> channelFuture, GenericFutureListener<C> listener) {
channelFuture.addListener(f -> {
try (Instrumentation ignored = instrumenter.newInstrumentation()) {
Expand Down Expand Up @@ -854,7 +865,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
* once the request and response are done, the handle is {@link #release() released} and a new
* request can claim the same connection.
*/
abstract static class PoolHandle {
public abstract static class PoolHandle {
private static final Supplier<ResourceLeakDetector<PoolHandle>> LEAK_DETECTOR = SupplierUtil.memoized(() ->
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(PoolHandle.class));

Expand All @@ -870,16 +881,24 @@ private PoolHandle(boolean http2, Channel channel) {
this.channel = channel;
}

public final Channel channel() {
return channel;
}

public final boolean http2() {
return http2;
}

/**
* Prevent this connection from being reused, e.g. because garbage was written because of
* an error.
*/
abstract void taint();
public abstract void taint();

/**
* Close this connection or release it back to the pool.
*/
void release() {
public void release() {
if (released) {
throw new IllegalStateException("Already released");
}
Expand All @@ -895,12 +914,12 @@ void release() {
*
* @return Whether this connection may be reused
*/
abstract boolean canReturn();
public abstract boolean canReturn();

/**
* Notify any {@link NettyClientCustomizer} that the request pipeline has been built.
*/
abstract void notifyRequestPipelineBuilt();
public abstract void notifyRequestPipelineBuilt();
}

/**
Expand Down Expand Up @@ -1226,12 +1245,12 @@ void dispatch0(PoolSink<PoolHandle> sink) {
final ChannelHandlerContext lastContext = channel.pipeline().lastContext();

@Override
void taint() {
public void taint() {
windDownConnection = true;
}

@Override
void release() {
public void release() {
super.release();
if (!windDownConnection) {
ChannelHandlerContext newLast = channel.pipeline().lastContext();
Expand All @@ -1249,12 +1268,12 @@ void release() {
}

@Override
boolean canReturn() {
public boolean canReturn() {
return !windDownConnection;
}

@Override
void notifyRequestPipelineBuilt() {
public void notifyRequestPipelineBuilt() {
connectionCustomizer.onRequestPipelineBuilt();
}
};
Expand Down Expand Up @@ -1345,12 +1364,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
NettyClientCustomizer streamCustomizer = connectionCustomizer.specializeForChannel(streamChannel, NettyClientCustomizer.ChannelRole.HTTP2_STREAM);
PoolHandle ph = new PoolHandle(true, streamChannel) {
@Override
void taint() {
public void taint() {
// do nothing, we don't reuse stream channels
}

@Override
void release() {
public void release() {
super.release();
liveStreamChannels.remove(streamChannel);
streamChannel.close();
Expand All @@ -1363,12 +1382,12 @@ void release() {
}

@Override
boolean canReturn() {
public boolean canReturn() {
return true;
}

@Override
void notifyRequestPipelineBuilt() {
public void notifyRequestPipelineBuilt() {
streamCustomizer.onRequestPipelineBuilt();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ public Logger getLog() {
return log;
}

/**
* Access to the connection manager, for micronaut-oracle-cloud.
*
* @return The connection manager of this client
*/
public ConnectionManager connectionManager() {
return connectionManager;
}

@Override
public HttpClient start() {
if (!isRunning()) {
Expand Down Expand Up @@ -1855,7 +1864,7 @@ private <E extends HttpClientException> E decorate(E exc) {
/**
* Key used for connection pooling and determining host/port.
*/
static final class RequestKey {
public static final class RequestKey {
private final String host;
private final int port;
private final boolean secure;
Expand Down

0 comments on commit 4b590ff

Please sign in to comment.