diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index af5558139a4..9339817d6c4 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -568,6 +568,11 @@ pekko { # set to 0b for platform default receive-buffer-size = 256000b + # Flush operations are generally speaking expensive, consolidating flushes will improve throughput + # but with the downside of a bit higher latency. + # The number of flushes after which an explicit flush will be done, set to 0 to turn this feature off. + explicit-flush-after-flushes = 0 + # Maximum message size the transport will accept, but at least # 32000 bytes. # Please note that UDP does not support arbitrary large datagrams, diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index d7ff5dca8fe..3d9411d2941 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -52,6 +52,7 @@ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import io.netty.handler.flush.FlushConsolidationHandler import io.netty.handler.ssl.SslHandler import io.netty.util.concurrent.GlobalEventExecutor @@ -132,6 +133,9 @@ class NettyTransportSettings(config: Config) { val SendBufferSize: Option[Int] = optionSize("send-buffer-size") + val ExplicitFlushAfterFlushes: Int = getInt("explicit-flush-after-flushes") + .requiring(_ >= 0, "Setting 'explicit-flush-after-flushes' must >= 0") + val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size") val MaxFrameSize: Int = getBytes("maximum-frame-size").toInt @@ -357,6 +361,9 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private def newPipeline(channel: Channel): ChannelPipeline = { val pipeline = channel.pipeline() + if (ExplicitFlushAfterFlushes > 0) { + pipeline.addLast("FlushConsolidationHandler", new FlushConsolidationHandler(ExplicitFlushAfterFlushes, true)) + } pipeline.addLast( "FrameDecoder", new LengthFieldBasedFrameDecoder(