Skip to content

Commit

Permalink
=remote Make use of FlushConsolidationHandler to improve throughput.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 19, 2023
1 parent 9288d4c commit 1121bca
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package org.apache.pekko.remote.transport.netty
import java.net.{ InetAddress, InetSocketAddress, SocketAddress }
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.{ blocking, ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }

import com.typesafe.config.Config
import org.apache.pekko
import pekko.ConfigurationException
Expand All @@ -36,7 +34,6 @@ import pekko.remote.transport.AssociationHandle.HandleEventListener
import pekko.remote.transport.Transport._
import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }

import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.channel.{
Expand All @@ -52,6 +49,7 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import io.netty.handler.flush.FlushConsolidationHandler
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.GlobalEventExecutor

Expand Down Expand Up @@ -368,6 +366,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

private def newPipeline(channel: Channel): ChannelPipeline = {
val pipeline = channel.pipeline()
pipeline.addLast("FlushConsolidationHandler",
new FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true))
pipeline.addLast(
"FrameDecoder",
new LengthFieldBasedFrameDecoder(
Expand Down

0 comments on commit 1121bca

Please sign in to comment.