Skip to content

Commit

Permalink
+remote Make use of FlushConsolidationHandler.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 18, 2023
1 parent 62bf7cb commit 435c47e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
5 changes: 5 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,11 @@ pekko {
# set to 0b for platform default
receive-buffer-size = 256000b

# Flush operations are generally speaking expensive, consolidating flushes will improve throughput
# but with the downside of a bit higher latency.
# The number of flushes after which an explicit flush will be done, set to 0 to turn this feature off.
explicit-flush-after-flushes = 0

# Maximum message size the transport will accept, but at least
# 32000 bytes.
# Please note that UDP does not support arbitrary large datagrams,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import io.netty.handler.flush.FlushConsolidationHandler
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.GlobalEventExecutor

Expand Down Expand Up @@ -132,6 +133,9 @@ class NettyTransportSettings(config: Config) {

val SendBufferSize: Option[Int] = optionSize("send-buffer-size")

val ExplicitFlushAfterFlushes: Int = getInt("explicit-flush-after-flushes")
.requiring(_ >= 0, "Setting 'explicit-flush-after-flushes' must >= 0")

val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size")

val MaxFrameSize: Int = getBytes("maximum-frame-size").toInt
Expand Down Expand Up @@ -357,6 +361,9 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

private def newPipeline(channel: Channel): ChannelPipeline = {
val pipeline = channel.pipeline()
if (ExplicitFlushAfterFlushes > 0) {
pipeline.addLast("FlushConsolidationHandler", new FlushConsolidationHandler(ExplicitFlushAfterFlushes, true))
}
pipeline.addLast(
"FrameDecoder",
new LengthFieldBasedFrameDecoder(
Expand Down

0 comments on commit 435c47e

Please sign in to comment.