Skip to content

Commit

Permalink
Use ChannelFutureListener in Netty code to reduce capturing lambdas (e…
Browse files Browse the repository at this point in the history
…lastic#112967)

Mainly motivated by simplifying the reference chains for Netty buffers
and have easier to analyze heap dumps in some spots but also a small
performance win in and of itself.
  • Loading branch information
original-brownbear authored Sep 18, 2024
1 parent f437f13 commit 90e343c
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 22 deletions.
4 changes: 4 additions & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,7 @@ tasks.named("thirdPartyAudit").configure {
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5'
)
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/netty-signatures.txt')
}
9 changes: 9 additions & 0 deletions modules/transport-netty4/forbidden/netty-signatures.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".

@defaultMessage Use org.elasticsearch.transport.netty4.Netty4Utils.addListener(io.netty.channel.ChannelFuture, io.netty.channel.ChannelFutureListener) instead
io.netty.channel.ChannelFuture#addListener(io.netty.util.concurrent.GenericFutureListener)
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ public void onFailure(Exception e) {
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
Netty4Utils.addListener(channel.close(), f -> {
finishingWrite.combiner().add(f.channel().newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
Expand Down Expand Up @@ -417,7 +417,7 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite)
final boolean isPartComplete = bodyPart.isPartComplete();
final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart();
final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
Netty4Utils.addListener(f, ignored -> bytes.close());
combiner.add(f);
return isPartComplete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
}

private static void addClosedExceptionLogger(Channel channel) {
channel.closeFuture().addListener(f -> {
if (f.isSuccess() == false) {
logger.debug(() -> format("exception while closing channel: %s", channel), f.cause());
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
Expand Down Expand Up @@ -141,7 +145,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
// can only be completed by some network event from this point on. However...
final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
addListener(promise, listener);
assert assertCorrectPromiseListenerThreading(channel, promise);
assert assertCorrectPromiseListenerThreading(promise);
channel.writeAndFlush(message, promise);
if (channel.eventLoop().isShuttingDown()) {
// ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
Expand All @@ -156,10 +160,10 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
}
}

private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
final var eventLoop = channel.eventLoop();
promise.addListener(future -> {
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) {
addListener(promise, future -> {
var eventLoop = future.channel().eventLoop();
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated()
: future.cause();
});
return true;
Expand All @@ -183,4 +187,9 @@ public static void addListener(Future<Void> future, ActionListener<Void> listene
}
});
}

@SuppressForbidden(reason = "single point for adding listeners that enforces use of ChannelFutureListener")
public static void addListener(ChannelFuture channelFuture, ChannelFutureListener listener) {
channelFuture.addListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.PromiseCombiner;

import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By
final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE);
if (readableBytes == bufferSize) {
// last write for this chunk we're done
ctx.write(buf).addListener(forwardResultListener(ctx, promise));
Netty4Utils.addListener(ctx.write(buf), forwardResultListener(promise));
return;
}
final int readerIndex = buf.readerIndex();
final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize);
buf.readerIndex(readerIndex + bufferSize);
ctx.write(writeBuffer).addListener(forwardFailureListener(ctx, promise));
Netty4Utils.addListener(ctx.write(writeBuffer), forwardFailureListener(promise));
if (ctx.channel().isWritable() == false) {
// channel isn't writable any longer -> move to queuing
queueWrite(buf, promise);
Expand Down Expand Up @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) {
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false) {
currentWrite = null;
writeFuture.addListener(forwardResultListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardResultListener(write.promise));
} else {
writeFuture.addListener(forwardFailureListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardFailureListener(write.promise));
}
}
ctx.flush();
Expand All @@ -176,18 +176,18 @@ private boolean doFlush(ChannelHandlerContext ctx) {
return true;
}

private static GenericFutureListener<Future<Void>> forwardFailureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardFailureListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess() == false) {
promise.tryFailure(future.cause());
}
};
}

private static GenericFutureListener<Future<Void>> forwardResultListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardResultListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess()) {
promise.trySuccess();
} else {
Expand Down
7 changes: 6 additions & 1 deletion x-pack/plugin/security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ tasks.named("forbiddenPatterns").configure {
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/ldap-signatures.txt', 'forbidden/xml-signatures.txt', 'forbidden/oidc-signatures.txt')
signaturesFiles += files(
'forbidden/ldap-signatures.txt',
'forbidden/xml-signatures.txt',
'forbidden/oidc-signatures.txt',
project(':modules:transport-netty4').file('forbidden/netty-signatures.txt')
)
}

tasks.named('forbiddenApisTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations;
Expand Down Expand Up @@ -341,7 +342,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
final SslHandler sslHandler = new SslHandler(sslEngine);
ctx.pipeline().replace(this, "ssl", sslHandler);
final Future<?> handshakePromise = sslHandler.handshakeFuture();
connectPromise.addListener(result -> {
Netty4Utils.addListener(connectPromise, result -> {
if (result.isSuccess() == false) {
promise.tryFailure(result.cause());
} else {
Expand Down

0 comments on commit 90e343c

Please sign in to comment.