From 2517d0ac102bea02e408833eddff057193d82ef0 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 9 Sep 2023 14:35:40 +0800 Subject: [PATCH] !remoting Update classic transport from Netty 3 to netty4 --- .scalafix.conf | 1 + project/Dependencies.scala | 4 +- project/Paradox.scala | 2 +- .../classic/RemotingFailedToBindSpec.scala | 8 +- .../netty4.backwards.excludes | 2 + .../pekko/remote/RemoteActorRefProvider.scala | 2 +- .../remote/transport/netty/NettyHelpers.scala | 79 +++--- .../transport/netty/NettySSLSupport.scala | 15 +- .../transport/netty/NettyTransport.scala | 255 +++++++++--------- .../remote/transport/netty/TcpSupport.scala | 56 ++-- 10 files changed, 212 insertions(+), 212 deletions(-) create mode 100644 remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes diff --git a/.scalafix.conf b/.scalafix.conf index 815bdb7e362..3009ea9bb12 100644 --- a/.scalafix.conf +++ b/.scalafix.conf @@ -48,4 +48,5 @@ SortImports.blocks = [ "com.sun." "org.apache.pekko." "org.reactivestreams." + "io.netty." ] diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 50462e27eb2..45ac25f0d3a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,6 @@ object Dependencies { // needs to be inline with the aeron version, check // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle val agronaVersion = "1.19.2" - val nettyVersion = "3.10.6.Final" val netty4Version = "4.1.96.Final" val protobufJavaVersion = "3.19.6" val logbackVersion = "1.2.11" @@ -60,7 +59,6 @@ object Dependencies { // Compile val config = "com.typesafe" % "config" % "1.4.2" - val netty = "io.netty" % "netty" % nettyVersion val `netty-transport` = "io.netty" % "netty-transport" % netty4Version val `netty-handler` = "io.netty" % "netty-handler" % netty4Version @@ -278,7 +276,7 @@ object Dependencies { Compile.slf4jApi, TestDependencies.scalatest.value) - val remoteDependencies = Seq(netty, aeronDriver, aeronClient) + val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient) val remoteOptionalDependencies = remoteDependencies.map(_ % "optional") val remote = l ++= Seq( diff --git a/project/Paradox.scala b/project/Paradox.scala index 2de4a049fd5..4dc3a2372da 100644 --- a/project/Paradox.scala +++ b/project/Paradox.scala @@ -66,7 +66,7 @@ object Paradox { "scalatest.version" -> Dependencies.scalaTestVersion, "sigar_loader.version" -> "1.6.6-rev002", "aeron_version" -> Dependencies.aeronVersion, - "netty_version" -> Dependencies.nettyVersion, + "netty_version" -> Dependencies.netty4Version, "logback_version" -> Dependencies.logbackVersion)) val rootsSettings = Seq( diff --git a/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala b/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala index eb2151c2893..b610c708e9a 100644 --- a/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala +++ b/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala @@ -14,14 +14,14 @@ package org.apache.pekko.remote.classic import com.typesafe.config.ConfigFactory -import org.jboss.netty.channel.ChannelException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - import org.apache.pekko import pekko.actor.ActorSystem import pekko.testkit.SocketUtil +import java.net.BindException + class RemotingFailedToBindSpec extends AnyWordSpec with Matchers { "an ActorSystem" must { @@ -43,10 +43,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers { """.stripMargin) val as = ActorSystem("RemotingFailedToBindSpec", config) try { - val ex = intercept[ChannelException] { + val ex = intercept[BindException] { ActorSystem("BindTest2", config) } - ex.getMessage should startWith("Failed to bind") + ex.getMessage should startWith("Address already in use") } finally { as.terminate() } diff --git a/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes b/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes new file mode 100644 index 00000000000..59de2d77521 --- /dev/null +++ b/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes @@ -0,0 +1,2 @@ +#migrate the classic transport to Netty4 +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.transport.netty.NettyFutureBridge.apply") diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala index edd9cc8561d..238429b7000 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala @@ -281,7 +281,7 @@ private[pekko] class RemoteActorRefProvider( private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = { checkClassOrThrow( system, - "org.jboss.netty.channel.Channel", + "io.netty.channel.Channel", "Classic", "Netty", "https://pekko.apache.org/docs/pekko/current/remoting.html") diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala index 58edc016feb..8e3a7c55a02 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala @@ -13,36 +13,37 @@ package org.apache.pekko.remote.transport.netty -import java.nio.channels.ClosedChannelException +import io.netty.buffer.ByteBuf +import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler } +import java.nio.channels.ClosedChannelException import scala.util.control.NonFatal - -import org.jboss.netty.channel._ - import org.apache.pekko import pekko.PekkoException import pekko.util.unused +import scala.annotation.nowarn + /** * INTERNAL API */ private[netty] trait NettyHelpers { - protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () + protected def onConnect(@unused ctx: ChannelHandlerContext): Unit = () - protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () + protected def onDisconnect(@unused ctx: ChannelHandlerContext): Unit = () - protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () + protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = () - protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = () + protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = () - protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = () + protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = () - final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { - val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause") + final protected def transformException(ctx: ChannelHandlerContext, ex: Throwable): Unit = { + val cause = if (ex ne null) ex else new PekkoException("Unknown cause") cause match { case _: ClosedChannelException => // Ignore - case null | NonFatal(_) => onException(ctx, ev) + case null | NonFatal(_) => onException(ctx, ex) case e: Throwable => throw e // Rethrow fatals } } @@ -51,54 +52,46 @@ private[netty] trait NettyHelpers { /** * INTERNAL API */ -private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers { +private[netty] trait NettyServerHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers { - final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - super.messageReceived(ctx, e) - onMessage(ctx, e) + final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { + onMessage(ctx, msg) } - final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e) + @nowarn("msg=deprecated") + final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = + transformException(ctx, cause) - final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelConnected(ctx, e) - onConnect(ctx, e) + final override def channelActive(ctx: ChannelHandlerContext): Unit = { + onOpen(ctx) + onConnect(ctx) } - final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelOpen(ctx, e) - onOpen(ctx, e) - } - - final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelDisconnected(ctx, e) - onDisconnect(ctx, e) + final override def channelInactive(ctx: ChannelHandlerContext): Unit = { + onDisconnect(ctx) } } /** * INTERNAL API */ -private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers { - final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - super.messageReceived(ctx, e) - onMessage(ctx, e) - } +private[netty] trait NettyClientHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers { - final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e) + final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { + onMessage(ctx, msg) + } - final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelConnected(ctx, e) - onConnect(ctx, e) + @nowarn("msg=deprecated") + final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + transformException(ctx, cause) } - final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelOpen(ctx, e) - onOpen(ctx, e) + final override def channelActive(ctx: ChannelHandlerContext): Unit = { + onOpen(ctx) + onConnect(ctx) } - final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelDisconnected(ctx, e) - onDisconnect(ctx, e) + final override def channelInactive(ctx: ChannelHandlerContext): Unit = { + onDisconnect(ctx) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala index c6a7fe7c824..ec9b7ca40ce 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala @@ -15,8 +15,9 @@ package org.apache.pekko.remote.transport.netty import scala.annotation.nowarn import com.typesafe.config.Config -import org.jboss.netty.handler.ssl.SslHandler - +import io.netty.channel.Channel +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.{ Future, GenericFutureListener } import org.apache.pekko import pekko.japi.Util._ import pekko.util.ccompat._ @@ -64,6 +65,14 @@ private[pekko] object NettySSLSupport { val sslEngine = if (isClient) sslEngineProvider.createClientSSLEngine() else sslEngineProvider.createServerSSLEngine() - new SslHandler(sslEngine) + val handler = new SslHandler(sslEngine) + handler.handshakeFuture().addListener(new GenericFutureListener[Future[Channel]] { + override def operationComplete(future: Future[Channel]): Unit = { + if (!future.isSuccess) { + handler.closeOutbound().channel().close() + } + } + }) + handler } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 6e0300ca134..e6848c798dd 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -14,33 +14,14 @@ package org.apache.pekko.remote.transport.netty import java.net.{ InetAddress, InetSocketAddress, SocketAddress } -import java.util.concurrent.{ CancellationException, Executors } +import java.util.concurrent.CancellationException import java.util.concurrent.atomic.AtomicInteger - import scala.annotation.nowarn import scala.concurrent.{ blocking, ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.util.Try import scala.util.control.{ NoStackTrace, NonFatal } - import com.typesafe.config.Config -import org.jboss.netty.bootstrap.{ Bootstrap, ClientBootstrap, ServerBootstrap } -import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } -import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ - ChannelGroup, - ChannelGroupFuture, - ChannelGroupFutureListener, - DefaultChannelGroup -} -import org.jboss.netty.channel.socket.nio.{ - NioClientSocketChannelFactory, - NioServerSocketChannelFactory, - NioWorkerPool -} -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.ssl.SslHandler -import org.jboss.netty.util.HashedWheelTimer import org.apache.pekko import pekko.ConfigurationException import pekko.OnlyCauseStackTrace @@ -53,6 +34,30 @@ import pekko.util.{ Helpers, OptionVal } import AssociationHandle.HandleEventListener import Transport._ import Helpers.Requiring +import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap } +import io.netty.channel.{ + Channel, + ChannelFuture, + ChannelFutureListener, + ChannelHandlerContext, + ChannelInitializer, + ChannelOption, + ChannelPipeline +} +import io.netty.channel.group.{ + ChannelGroup, + ChannelGroupFuture, + ChannelGroupFutureListener, + ChannelMatchers, + DefaultChannelGroup +} +import io.netty.buffer.Unpooled +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } +import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.GlobalEventExecutor @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") object NettyFutureBridge { @@ -62,9 +67,9 @@ object NettyFutureBridge { def operationComplete(future: ChannelFuture): Unit = p.complete( Try( - if (future.isSuccess) future.getChannel + if (future.isSuccess) future.channel() else if (future.isCancelled) throw new CancellationException - else throw future.getCause)) + else throw future.cause())) }) p.future } @@ -76,12 +81,12 @@ object NettyFutureBridge { def operationComplete(future: ChannelGroupFuture): Unit = p.complete( Try( - if (future.isCompleteSuccess) future.getGroup + if (future.isSuccess) future.group() else throw future.iterator.asScala .collectFirst { case f if f.isCancelled => new CancellationException - case f if !f.isSuccess => f.getCause + case f if !f.isSuccess => f.cause() } .getOrElse(new IllegalStateException( "Error reported in ChannelGroupFuture, but no error found in individual futures.")))) @@ -182,27 +187,6 @@ class NettyTransportSettings(config: Config) { config.getInt("pool-size-min"), config.getDouble("pool-size-factor"), config.getInt("pool-size-max")) - - // Check Netty version >= 3.10.6 - { - val nettyVersion = org.jboss.netty.util.Version.ID - def throwInvalidNettyVersion(): Nothing = { - throw new IllegalArgumentException( - "pekko-remote with the Netty transport requires Netty version 3.10.6 or " + - s"later. Version [$nettyVersion] is on the class path. Issue https://github.com/netty/netty/pull/4739 " + - "may cause messages to not be delivered.") - } - - try { - val segments: Array[String] = nettyVersion.split("[.-]") - if (segments.length < 3 || segments(0).toInt != 3 || segments(1).toInt != 10 || segments(2).toInt < 6) - throwInvalidNettyVersion() - } catch { - case _: NumberFormatException => - throwInvalidNettyVersion() - } - } - } /** @@ -212,25 +196,24 @@ class NettyTransportSettings(config: Config) { private[netty] trait CommonHandlers extends NettyHelpers { protected val transport: NettyTransport - final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - transport.channelGroup.add(e.getChannel) + override protected def onOpen(ctx: ChannelHandlerContext): Unit = { + transport.channelGroup.add(ctx.channel()) + } protected def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle protected def registerListener( channel: Channel, listener: HandleEventListener, - msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit final protected def init( channel: Channel, remoteSocketAddress: SocketAddress, - remoteAddress: Address, - msg: ChannelBuffer)(op: AssociationHandle => Any): Unit = { + remoteAddress: Address)(op: AssociationHandle => Any): Unit = { import transport._ NettyTransport.addressFromSocketAddress( - channel.getLocalAddress, + channel.localAddress(), schemeIdentifier, system.name, Some(settings.Hostname), @@ -238,8 +221,8 @@ private[netty] trait CommonHandlers extends NettyHelpers { case Some(localAddress) => val handle = createHandle(channel, localAddress, remoteAddress) handle.readHandlerPromise.future.foreach { listener => - registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) - channel.setReadable(true) + registerListener(channel, listener, remoteSocketAddress.asInstanceOf[InetSocketAddress]) + channel.config().setAutoRead(true) } op(handle) @@ -260,8 +243,8 @@ private[netty] abstract class ServerHandler( import transport.executionContext - final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { - channel.setReadable(false) + final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress): Unit = { + channel.config.setAutoRead(false) associationListenerFuture.foreach { listener => val remoteAddress = NettyTransport .addressFromSocketAddress( @@ -272,7 +255,7 @@ private[netty] abstract class ServerHandler( port = None) .getOrElse(throw new NettyTransportException( s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]")) - init(channel, remoteSocketAddress, remoteAddress, msg) { a => + init(channel, remoteSocketAddress, remoteAddress) { a => listener.notify(InboundAssociation(a)) } } @@ -290,8 +273,8 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty final protected val statusPromise = Promise[AssociationHandle]() def statusFuture = statusPromise.future - final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { - init(channel, remoteSocketAddress, remoteAddress, msg)(statusPromise.success) + final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress): Unit = { + init(channel, remoteSocketAddress, remoteAddress)(statusPromise.success) } } @@ -304,9 +287,9 @@ private[transport] object NettyTransport { val FrameLengthFieldLength = 4 def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = { @nowarn("msg=deprecated") - def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.getChannel } + def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.channel() } for { - _ <- always { channel.write(ChannelBuffers.buffer(0)) } // Force flush by waiting on a final dummy write + _ <- always { channel.writeAndFlush(Unpooled.EMPTY_BUFFER) } // Force flush by waiting on a final dummy write _ <- always { channel.disconnect() } } channel.close() } @@ -358,8 +341,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private val log = Logging.withMarker(system, classOf[NettyTransport]) - private def createExecutorService() = - UseDispatcherForIo.map(system.dispatchers.lookup).getOrElse(Executors.newCachedThreadPool(system.threadFactory)) + private def createEventLoopGroup(nThreadCount: Int): NioEventLoopGroup = + UseDispatcherForIo.map(system.dispatchers.lookup) + .map(executor => new NioEventLoopGroup(0, executor)) + .getOrElse(new NioEventLoopGroup(nThreadCount, system.threadFactory)) /* * Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap. @@ -369,25 +354,17 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA */ val channelGroup = new DefaultChannelGroup( "pekko-netty-transport-driver-channelgroup-" + - uniqueIdCounter.getAndIncrement) - - private val clientChannelFactory: ChannelFactory = { - val boss, worker = createExecutorService() - new NioClientSocketChannelFactory( - boss, - 1, - new NioWorkerPool(worker, ClientSocketWorkerPoolSize), - new HashedWheelTimer(system.threadFactory)) - } + uniqueIdCounter.getAndIncrement, + GlobalEventExecutor.INSTANCE) - private val serverChannelFactory: ChannelFactory = { - val boss, worker = createExecutorService() - // This does not create a HashedWheelTimer internally - new NioServerSocketChannelFactory(boss, worker, ServerSocketWorkerPoolSize) - } + private val clientEventLoopGroup = createEventLoopGroup(ClientSocketWorkerPoolSize + 1) - private def newPipeline: DefaultChannelPipeline = { - val pipeline = new DefaultChannelPipeline + private val serverEventLoopParentGroup = createEventLoopGroup(0) + + private val serverEventLoopChildGroup = createEventLoopGroup(ServerSocketWorkerPoolSize) + + private def newPipeline(channel: Channel): ChannelPipeline = { + val pipeline = channel.pipeline() pipeline.addLast( "FrameDecoder", new LengthFieldBasedFrameDecoder( @@ -420,62 +397,78 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private def sslHandler(isClient: Boolean): SslHandler = { sslEngineProvider match { case OptionVal.Some(sslProvider) => - val handler = NettySSLSupport(sslProvider, isClient) - handler.setCloseOnSSLException(true) - handler + NettySSLSupport(sslProvider, isClient) case _ => throw new IllegalStateException("Expected enable-ssl=on") } } - private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory { - override def getPipeline: ChannelPipeline = { - val pipeline = newPipeline + private val serverPipelineInitializer: ChannelInitializer[SocketChannel] = new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val pipeline = newPipeline(ch) if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false)) val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log) pipeline.addLast("ServerHandler", handler) - pipeline } } - private def clientPipelineFactory(remoteAddress: Address): ChannelPipelineFactory = - new ChannelPipelineFactory { - override def getPipeline: ChannelPipeline = { - val pipeline = newPipeline + private def clientPipelineInitializer(remoteAddress: Address): ChannelInitializer[SocketChannel] = + new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val pipeline = newPipeline(ch) if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log) pipeline.addLast("clienthandler", handler) - pipeline } } - private def setupBootstrap[B <: Bootstrap](bootstrap: B, pipelineFactory: ChannelPipelineFactory): B = { - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", settings.Backlog) - bootstrap.setOption("child.tcpNoDelay", settings.TcpNodelay) - bootstrap.setOption("child.keepAlive", settings.TcpKeepalive) - bootstrap.setOption("reuseAddress", settings.TcpReuseAddr) - settings.ReceiveBufferSize.foreach(sz => bootstrap.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz => bootstrap.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz => bootstrap.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz => bootstrap.setOption("writeBufferLowWaterMark", sz)) - bootstrap - } - private val inboundBootstrap: ServerBootstrap = { - setupBootstrap(new ServerBootstrap(serverChannelFactory), serverPipelineFactory) + val bootstrap = new ServerBootstrap() + bootstrap.group(serverEventLoopParentGroup, serverEventLoopChildGroup) + + bootstrap.channel(classOf[NioServerSocketChannel]) + bootstrap.childHandler(serverPipelineInitializer) + // DO NOT AUTO READ + bootstrap.option[java.lang.Boolean](ChannelOption.AUTO_READ, false) + + bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, settings.Backlog) + bootstrap.option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, settings.TcpReuseAddr) + + // DO NOT AUTO READ + bootstrap.childOption[java.lang.Boolean](ChannelOption.AUTO_READ, false) + bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) + bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + + settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) + settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) + settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz => + bootstrap.childOption[java.lang.Integer](ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, sz)) + settings.WriteBufferLowWaterMark.filter(_ > 0).foreach(sz => + bootstrap.childOption[java.lang.Integer](ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, sz)) + bootstrap } private def outboundBootstrap(remoteAddress: Address): ClientBootstrap = { - val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(remoteAddress)) - bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) - bootstrap.setOption("tcpNoDelay", settings.TcpNodelay) - bootstrap.setOption("keepAlive", settings.TcpKeepalive) - settings.ReceiveBufferSize.foreach(sz => bootstrap.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz => bootstrap.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz => bootstrap.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz => bootstrap.setOption("writeBufferLowWaterMark", sz)) + val bootstrap = new ClientBootstrap() + bootstrap.group(clientEventLoopGroup) + bootstrap.handler(clientPipelineInitializer(remoteAddress)) + bootstrap.channel(classOf[NioSocketChannel]) + // DO NOT AUTO READ + bootstrap.option[java.lang.Boolean](ChannelOption.AUTO_READ, false) + + bootstrap.option[java.lang.Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.ConnectionTimeout.toMillis.toInt) + bootstrap.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) + bootstrap.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + + settings.ReceiveBufferSize.foreach(sz => bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) + settings.SendBufferSize.foreach(sz => bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) + + settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz => + bootstrap.option[java.lang.Integer](ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, sz)) + settings.WriteBufferLowWaterMark.filter(_ > 0).foreach(sz => + bootstrap.option[java.lang.Integer](ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, sz)) + // bootstrap } @@ -496,10 +489,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) } yield { try { - val newServerChannel = inboundBootstrap.bind(address) + val newServerChannel = inboundBootstrap.bind(address).sync().channel() // Block reads until a handler actor is registered - newServerChannel.setReadable(false) + newServerChannel.config().setAutoRead(false) channelGroup.add(newServerChannel) serverChannel = newServerChannel @@ -508,26 +501,26 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector) addressFromSocketAddress( - newServerChannel.getLocalAddress, + newServerChannel.localAddress(), schemeIdentifier, system.name, Some(settings.Hostname), port) match { case Some(address) => - addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, + addressFromSocketAddress(newServerChannel.localAddress(), schemeIdentifier, system.name, None, None) match { case Some(address) => boundTo = address case None => throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") + s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") } associationListenerPromise.future.foreach { _ => - newServerChannel.setReadable(true) + newServerChannel.config().setAutoRead(true) } (address, associationListenerPromise) case None => throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") + s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") } } catch { case NonFatal(e) => { @@ -545,7 +538,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private[pekko] def boundAddress = boundTo override def associate(remoteAddress: Address): Future[AssociationHandle] = { - if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound")) + if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound")) else { val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) @@ -554,12 +547,12 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel => if (EnableSsl) blocking { - channel.getPipeline.get(classOf[SslHandler]).handshake().awaitUninterruptibly() + channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly() } - channel.setReadable(false) + channel.config.setAutoRead(false) channel } - handle <- readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture + handle <- readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture } yield handle).recover { case _: CancellationException => throw new NettyTransportExceptionNoStack("Connection was cancelled") case NonFatal(t) => @@ -579,19 +572,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ => true).recover { case _ => false } for { // Force flush by trying to write an empty buffer and wait for success - unbindStatus <- always(channelGroup.unbind()) - lastWriteStatus <- always(channelGroup.write(ChannelBuffers.buffer(0))) + unbindStatus <- always(channelGroup.close(ChannelMatchers.isServerChannel)) + lastWriteStatus <- always(channelGroup.writeAndFlush(Unpooled.EMPTY_BUFFER)) disconnectStatus <- always(channelGroup.disconnect()) closeStatus <- always(channelGroup.close()) } yield { // Release the selectors, but don't try to kill the dispatcher - if (UseDispatcherForIo.isDefined) { - clientChannelFactory.shutdown() - serverChannelFactory.shutdown() - } else { - clientChannelFactory.releaseExternalResources() - serverChannelFactory.releaseExternalResources() - } + clientEventLoopGroup.shutdownGracefully() + serverEventLoopParentGroup.shutdownGracefully() + serverEventLoopChildGroup.shutdownGracefully() lastWriteStatus && unbindStatus && disconnectStatus && closeStatus } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index b7504ee6c60..facdeee1cc9 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -13,14 +13,11 @@ package org.apache.pekko.remote.transport.netty -import java.net.InetSocketAddress +import io.netty.channel.{ Channel, ChannelHandlerContext } +import java.net.InetSocketAddress import scala.concurrent.{ Future, Promise } - import scala.annotation.nowarn -import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } -import org.jboss.netty.channel._ - import org.apache.pekko import pekko.actor.Address import pekko.event.LoggingAdapter @@ -28,13 +25,25 @@ import pekko.remote.transport.AssociationHandle import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, HandleEventListener, InboundPayload } import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString +import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled } + +import java.util.concurrent.ConcurrentHashMap /** * INTERNAL API */ -private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] { - override def initialValue(channel: Channel): Option[HandleEventListener] = None - def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel).foreach { _.notify(msg) } +private[remote] object ChannelLocalActor { + private val map = new ConcurrentHashMap[Channel, Option[HandleEventListener]]() + + def notifyListener(channel: Channel, msg: HandleEvent): Unit = { + map.getOrDefault(channel, None).foreach { + _.notify(msg) + } + } + + def set(channel: Channel, listener: Option[HandleEventListener]): Unit = { + map.put(channel, listener) + } } /** @@ -49,27 +58,26 @@ private[remote] trait TcpHandlers extends CommonHandlers { override def registerListener( channel: Channel, listener: HandleEventListener, - msg: ChannelBuffer, remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener)) override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) - override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) - log.debug("Remote connection to [{}] was disconnected because of {}", e.getChannel.getRemoteAddress, e) + override def onDisconnect(ctx: ChannelHandlerContext): Unit = { + notifyListener(ctx.channel(), Disassociated(AssociationHandle.Unknown)) + log.debug("Remote connection to [{}] was disconnected because of {}", ctx.channel().remoteAddress()) } - override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() - if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes))) + override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { + val bytes: Array[Byte] = ByteBufUtil.getBytes(msg) + if (bytes.length > 0) notifyListener(ctx.channel(), InboundPayload(ByteString(bytes))) } - override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { - notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) - log.warning("Remote connection to [{}] failed with {}", e.getChannel.getRemoteAddress, e.getCause) - e.getChannel.close() // No graceful close here + override def onException(ctx: ChannelHandlerContext, e: Throwable): Unit = { + notifyListener(ctx.channel(), Disassociated(AssociationHandle.Unknown)) + log.warning("Remote connection to [{}] failed with {}", ctx.channel().remoteAddress(), e.getCause) + ctx.channel().close() // No graceful close here } } @@ -84,8 +92,8 @@ private[remote] class TcpServerHandler( extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers { - override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - initInbound(e.getChannel, e.getChannel.getRemoteAddress, null) + override def onConnect(ctx: ChannelHandlerContext): Unit = + initInbound(ctx.channel(), ctx.channel().remoteAddress()) } @@ -97,8 +105,8 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress extends ClientHandler(_transport, remoteAddress) with TcpHandlers { - override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - initOutbound(e.getChannel, e.getChannel.getRemoteAddress, null) + override def onConnect(ctx: ChannelHandlerContext): Unit = + initOutbound(ctx.channel(), ctx.channel().remoteAddress()) } @@ -118,7 +126,7 @@ private[remote] class TcpAssociationHandle( override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) { - channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) + channel.writeAndFlush(Unpooled.wrappedBuffer(payload.asByteBuffer)) true } else false