diff --git a/.scalafix.conf b/.scalafix.conf index 815bdb7e362..3009ea9bb12 100644 --- a/.scalafix.conf +++ b/.scalafix.conf @@ -48,4 +48,5 @@ SortImports.blocks = [ "com.sun." "org.apache.pekko." "org.reactivestreams." + "io.netty." ] diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1d26b403124..88e91546307 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,6 @@ object Dependencies { // needs to be inline with the aeron version, check // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle val agronaVersion = "1.15.1" - val nettyVersion = "3.10.6.Final" val netty4Version = "4.1.96.Final" val protobufJavaVersion = "3.16.3" val logbackVersion = "1.2.11" @@ -60,7 +59,6 @@ object Dependencies { // Compile val config = "com.typesafe" % "config" % "1.4.2" - val netty = "io.netty" % "netty" % nettyVersion val `netty-transport` = "io.netty" % "netty-transport" % netty4Version val `netty-handler` = "io.netty" % "netty-handler" % netty4Version @@ -278,7 +276,7 @@ object Dependencies { Compile.slf4jApi, TestDependencies.scalatest.value) - val remoteDependencies = Seq(netty, aeronDriver, aeronClient) + val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient) val remoteOptionalDependencies = remoteDependencies.map(_ % "optional") val remote = l ++= Seq( diff --git a/project/Paradox.scala b/project/Paradox.scala index 98aa3dac72f..b5dfda4284e 100644 --- a/project/Paradox.scala +++ b/project/Paradox.scala @@ -66,7 +66,7 @@ object Paradox { "scalatest.version" -> Dependencies.scalaTestVersion, "sigar_loader.version" -> "1.6.6-rev002", "aeron_version" -> Dependencies.aeronVersion, - "netty_version" -> Dependencies.nettyVersion, + "netty_version" -> Dependencies.netty4Version, "logback_version" -> Dependencies.logbackVersion)) val rootsSettings = Seq( diff --git a/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala b/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala index eb2151c2893..8284941d3f3 100644 --- a/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala +++ b/remote-tests/src/test/scala/org/apache/pekko/remote/classic/RemotingFailedToBindSpec.scala @@ -14,7 +14,6 @@ package org.apache.pekko.remote.classic import com.typesafe.config.ConfigFactory -import org.jboss.netty.channel.ChannelException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -22,6 +21,8 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.testkit.SocketUtil +import java.net.BindException + class RemotingFailedToBindSpec extends AnyWordSpec with Matchers { "an ActorSystem" must { @@ -43,10 +44,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers { """.stripMargin) val as = ActorSystem("RemotingFailedToBindSpec", config) try { - val ex = intercept[ChannelException] { + val ex = intercept[BindException] { ActorSystem("BindTest2", config) } - ex.getMessage should startWith("Failed to bind") + ex.getMessage should startWith("Address already in use: bind") } finally { as.terminate() } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala index be74615e97c..7f7918e5060 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala @@ -13,13 +13,12 @@ package org.apache.pekko.remote +import scala.annotation.nowarn import scala.concurrent.Future import scala.util.Failure import scala.util.control.Exception.Catcher import scala.util.control.NonFatal -import scala.annotation.nowarn - import org.apache.pekko import pekko.ConfigurationException import pekko.Done @@ -281,7 +280,7 @@ private[pekko] class RemoteActorRefProvider( private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = { checkClassOrThrow( system, - "org.jboss.netty.channel.Channel", + "io.netty.channel.Channel", "Classic", "Netty", "https://pekko.apache.org/docs/pekko/current/remoting.html") diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala index 58edc016feb..715bc3f6149 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala @@ -15,34 +15,33 @@ package org.apache.pekko.remote.transport.netty import java.nio.channels.ClosedChannelException +import scala.annotation.nowarn import scala.util.control.NonFatal -import org.jboss.netty.channel._ - import org.apache.pekko import pekko.PekkoException import pekko.util.unused +import io.netty.channel.{ ChannelHandlerContext, ChannelInboundHandlerAdapter } + /** * INTERNAL API */ private[netty] trait NettyHelpers { - protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () - - protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () + protected def onActive(@unused ctx: ChannelHandlerContext): Unit = () - protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = () + protected def onInactive(@unused ctx: ChannelHandlerContext): Unit = () - protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = () + protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: Any): Unit = () - protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = () + protected def onException(@unused ctx: ChannelHandlerContext, @unused cause: Throwable): Unit = () - final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { - val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause") + final protected def transformException(ctx: ChannelHandlerContext, exception: Throwable): Unit = { + val cause = if (exception.getCause ne null) exception.getCause else new PekkoException("Unknown cause") cause match { case _: ClosedChannelException => // Ignore - case null | NonFatal(_) => onException(ctx, ev) + case null | NonFatal(_) => onException(ctx, exception) case e: Throwable => throw e // Rethrow fatals } } @@ -51,54 +50,51 @@ private[netty] trait NettyHelpers { /** * INTERNAL API */ -private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers { +private[netty] trait NettyServerHelpers extends ChannelInboundHandlerAdapter with NettyHelpers { - final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - super.messageReceived(ctx, e) - onMessage(ctx, e) + override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = { + super.channelRead(ctx, msg) + onMessage(ctx, msg) } - final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e) - - final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelConnected(ctx, e) - onConnect(ctx, e) + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + transformException(ctx, cause) } - final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelOpen(ctx, e) - onOpen(ctx, e) + override def channelActive(ctx: ChannelHandlerContext): Unit = { + super.channelActive(ctx) + onActive(ctx) } - final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelDisconnected(ctx, e) - onDisconnect(ctx, e) + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + super.channelInactive(ctx) + onInactive(ctx) } } /** * INTERNAL API */ -private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers { - final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - super.messageReceived(ctx, e) - onMessage(ctx, e) - } +private[netty] trait NettyClientHelpers extends ChannelInboundHandlerAdapter with NettyHelpers { - final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e) + override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = { + super.channelRead(ctx, msg) + onMessage(ctx, msg) + } - final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelConnected(ctx, e) - onConnect(ctx, e) + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + transformException(ctx, cause) } - final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelOpen(ctx, e) - onOpen(ctx, e) + override def channelActive(ctx: ChannelHandlerContext): Unit = { + super.channelActive(ctx) + onActive(ctx) } - final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - super.channelDisconnected(ctx, e) - onDisconnect(ctx, e) + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + super.channelInactive(ctx) + onInactive(ctx) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala index c6a7fe7c824..28feabd4c82 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala @@ -14,13 +14,14 @@ package org.apache.pekko.remote.transport.netty import scala.annotation.nowarn -import com.typesafe.config.Config -import org.jboss.netty.handler.ssl.SslHandler +import com.typesafe.config.Config import org.apache.pekko import pekko.japi.Util._ import pekko.util.ccompat._ +import io.netty.handler.ssl.SslHandler + /** * INTERNAL API */ diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 96b37fa9372..e5e1bc59de6 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -13,59 +13,45 @@ package org.apache.pekko.remote.transport.netty -import java.net.InetAddress -import java.net.InetSocketAddress -import java.net.SocketAddress +import java.net.{ InetAddress, InetSocketAddress, SocketAddress } import java.util.concurrent.CancellationException -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.blocking +import scala.concurrent.{ blocking, ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.util.Try -import scala.util.control.NoStackTrace -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } import com.typesafe.config.Config import org.apache.pekko -import org.jboss.netty.bootstrap.Bootstrap -import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.bootstrap.ConnectionlessBootstrap -import org.jboss.netty.bootstrap.ServerBootstrap -import org.jboss.netty.buffer.ChannelBuffer -import org.jboss.netty.buffer.ChannelBuffers -import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.ChannelGroup -import org.jboss.netty.channel.group.ChannelGroupFuture -import org.jboss.netty.channel.group.ChannelGroupFutureListener -import org.jboss.netty.channel.group.DefaultChannelGroup -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioWorkerPool -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender -import org.jboss.netty.handler.ssl.SslHandler -import org.jboss.netty.util.HashedWheelTimer -import pekko.ConfigurationException -import pekko.OnlyCauseStackTrace -import pekko.actor.ActorSystem -import pekko.actor.Address -import pekko.actor.ExtendedActorSystem -import pekko.dispatch.ThreadPoolConfig -import pekko.event.Logging -import pekko.remote.RARP -import pekko.remote.transport.AssociationHandle -import pekko.remote.transport.AssociationHandle.HandleEventListener -import pekko.remote.transport.Transport -import pekko.remote.transport.Transport._ -import pekko.util.Helpers -import pekko.util.Helpers.Requiring -import pekko.util.OptionVal + +import org.apache.pekko.{ ConfigurationException, OnlyCauseStackTrace } +import org.apache.pekko.actor.{ ActorSystem, Address, ExtendedActorSystem } +import org.apache.pekko.dispatch.ThreadPoolConfig +import org.apache.pekko.event.Logging +import org.apache.pekko.remote.RARP +import org.apache.pekko.remote.transport.{ AssociationHandle, Transport } +import org.apache.pekko.remote.transport.AssociationHandle.HandleEventListener +import org.apache.pekko.remote.transport.Transport._ +import org.apache.pekko.util.{ Helpers, OptionVal } +import org.apache.pekko.util.Helpers.Requiring + +import io.netty.bootstrap.{ Bootstrap, ServerBootstrap } +import io.netty.buffer.{ ByteBuf, Unpooled } +import io.netty.channel._ +import io.netty.channel.group.{ + ChannelGroup, + ChannelGroupFuture, + ChannelGroupFutureListener, + ChannelMatchers, + DefaultChannelGroup +} +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } +import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } +import io.netty.handler.ssl.SslHandler @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") object NettyFutureBridge { @@ -75,9 +61,9 @@ object NettyFutureBridge { def operationComplete(future: ChannelFuture): Unit = p.complete( Try( - if (future.isSuccess) future.getChannel + if (future.isSuccess) future.channel() else if (future.isCancelled) throw new CancellationException - else throw future.getCause)) + else throw future.cause())) }) p.future } @@ -89,12 +75,12 @@ object NettyFutureBridge { def operationComplete(future: ChannelGroupFuture): Unit = p.complete( Try( - if (future.isCompleteSuccess) future.getGroup + if (future.isSuccess) future.group() else throw future.iterator.asScala .collectFirst { case f if f.isCancelled => new CancellationException - case f if !f.isSuccess => f.getCause + case f if !f.isSuccess => f.cause() } .getOrElse(new IllegalStateException( "Error reported in ChannelGroupFuture, but no error found in individual futures.")))) @@ -184,8 +170,6 @@ class NettyTransportSettings(config: Config) { case value => value.toInt } - val SslSettings: Option[SSLSettings] = if (EnableSsl) Some(new SSLSettings(config.getConfig("security"))) else None - val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool")) val ClientSocketWorkerPoolSize: Int = computeWPS(config.getConfig("client-socket-worker-pool")) @@ -196,19 +180,24 @@ class NettyTransportSettings(config: Config) { config.getDouble("pool-size-factor"), config.getInt("pool-size-max")) - // Check Netty version >= 3.10.6 + // Check Netty version >= 4.1.94 { - val nettyVersion = org.jboss.netty.util.Version.ID + val nettyVersions = io.netty.util.Version.identify() + val nettyVersion = nettyVersions.values().stream().filter(_.artifactId() == "netty-transport") + .findFirst() + .map(_.artifactVersion()) + .orElseThrow(() => throw new IllegalArgumentException("Can not read netty-transport's version.")) + def throwInvalidNettyVersion(): Nothing = { throw new IllegalArgumentException( - "pekko-remote with the Netty transport requires Netty version 3.10.6 or " + + "pekko-remote with the Netty transport requires Netty version 4.1.94 or " + s"later. Version [$nettyVersion] is on the class path. Issue https://github.com/netty/netty/pull/4739 " + "may cause messages to not be delivered.") } try { val segments: Array[String] = nettyVersion.split("[.-]") - if (segments.length < 3 || segments(0).toInt != 3 || segments(1).toInt != 10 || segments(2).toInt < 6) + if (segments.length < 3 || segments(0).toInt != 4 || segments(1).toInt != 1 || segments(2).toInt < 94) throwInvalidNettyVersion() } catch { case _: NumberFormatException => @@ -225,25 +214,26 @@ class NettyTransportSettings(config: Config) { private[netty] trait CommonHandlers extends NettyHelpers { protected val transport: NettyTransport - final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - transport.channelGroup.add(e.getChannel) + override protected def onActive(ctx: ChannelHandlerContext): Unit = { + transport.channelGroup.add(ctx.channel()) + } protected def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle protected def registerListener( channel: Channel, listener: HandleEventListener, - msg: ChannelBuffer, + msg: ByteBuf, remoteSocketAddress: InetSocketAddress): Unit final protected def init( channel: Channel, remoteSocketAddress: SocketAddress, remoteAddress: Address, - msg: ChannelBuffer)(op: AssociationHandle => Any): Unit = { + msg: ByteBuf)(op: AssociationHandle => Any): Unit = { import transport._ NettyTransport.addressFromSocketAddress( - channel.getLocalAddress, + channel.localAddress(), schemeIdentifier, system.name, Some(settings.Hostname), @@ -251,8 +241,9 @@ private[netty] trait CommonHandlers extends NettyHelpers { case Some(localAddress) => val handle = createHandle(channel, localAddress, remoteAddress) handle.readHandlerPromise.future.foreach { listener => + // TODO use channel attr registerListener(channel, listener, msg, remoteSocketAddress.asInstanceOf[InetSocketAddress]) - channel.setReadable(true) + channel.config().setAutoRead(true) } op(handle) @@ -273,8 +264,8 @@ private[netty] abstract class ServerHandler( import transport.executionContext - final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { - channel.setReadable(false) + final protected def initInbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ByteBuf): Unit = { + channel.config().setAutoRead(false) associationListenerFuture.foreach { listener => val remoteAddress = NettyTransport .addressFromSocketAddress( @@ -303,7 +294,7 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty final protected val statusPromise = Promise[AssociationHandle]() def statusFuture = statusPromise.future - final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { + final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ByteBuf): Unit = { init(channel, remoteSocketAddress, remoteAddress, msg)(statusPromise.success) } @@ -315,11 +306,13 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty private[transport] object NettyTransport { // 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler. val FrameLengthFieldLength = 4 + def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = { @nowarn("msg=deprecated") - def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.getChannel } + def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.channel() } + for { - _ <- always { channel.write(ChannelBuffers.buffer(0)) } // Force flush by waiting on a final dummy write + _ <- always { channel.writeAndFlush(Unpooled.EMPTY_BUFFER) } // Force flush by waiting on a final dummy write _ <- always { channel.disconnect() } } channel.close() } @@ -371,13 +364,11 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private val log = Logging.withMarker(system, classOf[NettyTransport]) - /** - * INTERNAL API - */ - private[netty] final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() + private val clientEventLoopGroup = new NioEventLoopGroup() + + private val serverParentEventLoopGroup = new NioEventLoopGroup() - private def createExecutorService() = - UseDispatcherForIo.map(system.dispatchers.lookup).getOrElse(Executors.newCachedThreadPool(system.threadFactory)) + private val serverChildEventLoopGroup = new NioEventLoopGroup() /* * Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap. @@ -386,26 +377,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA * outbound connections are initiated in the shutdown phase. */ val channelGroup = new DefaultChannelGroup( - "pekko-netty-transport-driver-channelgroup-" + - uniqueIdCounter.getAndIncrement) - - private val clientChannelFactory: ChannelFactory = { - val boss, worker = createExecutorService() - new NioClientSocketChannelFactory( - boss, - 1, - new NioWorkerPool(worker, ClientSocketWorkerPoolSize), - new HashedWheelTimer(system.threadFactory)) - } - - private val serverChannelFactory: ChannelFactory = { - val boss, worker = createExecutorService() - // This does not create a HashedWheelTimer internally - new NioServerSocketChannelFactory(boss, worker, ServerSocketWorkerPoolSize) - } + "pekko-netty-transport-driver-channelgroup-" + uniqueIdCounter.getAndIncrement, + serverChildEventLoopGroup.next()) - private def newPipeline: DefaultChannelPipeline = { - val pipeline = new DefaultChannelPipeline + private def setupPipeline(pipeline: ChannelPipeline): ChannelPipeline = { pipeline.addLast( "FrameDecoder", new LengthFieldBasedFrameDecoder( @@ -416,8 +391,6 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA FrameLengthFieldLength, // Strip the header true)) pipeline.addLast("FrameEncoder", new LengthFieldPrepender(FrameLengthFieldLength)) - - pipeline } private val associationListenerPromise: Promise[AssociationEventListener] = Promise() @@ -439,7 +412,6 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA sslEngineProvider match { case OptionVal.Some(sslProvider) => val handler = NettySSLSupport(sslProvider, isClient) - handler.setCloseOnSSLException(true) handler case _ => throw new IllegalStateException("Expected enable-ssl=on") @@ -447,53 +419,67 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } - private val serverPipelineFactory: ChannelPipelineFactory = new ChannelPipelineFactory { - override def getPipeline: ChannelPipeline = { - val pipeline = newPipeline + private val serverChildChannelInitializer: ChannelInitializer[SocketChannel] = new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val pipeline = ch.pipeline() + setupPipeline(pipeline) if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false)) val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log) pipeline.addLast("ServerHandler", handler) - pipeline } } - private def clientPipelineFactory(remoteAddress: Address): ChannelPipelineFactory = - new ChannelPipelineFactory { - override def getPipeline: ChannelPipeline = { - val pipeline = newPipeline + private def clientChannelInitializer(remoteAddress: Address): ChannelInitializer[SocketChannel] = + new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val pipeline = ch.pipeline() + setupPipeline(pipeline) if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log) - pipeline.addLast("clienthandler", handler) - pipeline + pipeline.addLast("clientHandler", handler) } } - private def setupBootstrap[B <: Bootstrap](bootstrap: B, pipelineFactory: ChannelPipelineFactory): B = { - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", settings.Backlog) - bootstrap.setOption("child.tcpNoDelay", settings.TcpNodelay) - bootstrap.setOption("child.keepAlive", settings.TcpKeepalive) - bootstrap.setOption("reuseAddress", settings.TcpReuseAddr) - settings.ReceiveBufferSize.foreach(sz => bootstrap.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz => bootstrap.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz => bootstrap.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz => bootstrap.setOption("writeBufferLowWaterMark", sz)) + private def setupBootstrap(bootstrap: Bootstrap, channelInitializer: ChannelInitializer[SocketChannel]): Bootstrap = { + bootstrap.handler(channelInitializer) + bootstrap.group(clientEventLoopGroup) + bootstrap.channel(classOf[NioSocketChannel]) + bootstrap.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) + bootstrap.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + settings.ReceiveBufferSize.foreach(sz => bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) + settings.SendBufferSize.foreach(sz => bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) + settings.WriteBufferHighWaterMark.foreach(sz => + bootstrap.option[java.lang.Integer](ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, sz)) + settings.WriteBufferLowWaterMark.foreach(sz => + bootstrap.option[java.lang.Integer](ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, sz)) bootstrap } - private val inboundBootstrap: Bootstrap = { - setupBootstrap(new ServerBootstrap(serverChannelFactory), serverPipelineFactory) + private def setupBootstrap( + bootstrap: ServerBootstrap, channelInitializer: ChannelInitializer[SocketChannel]): ServerBootstrap = { + bootstrap.childHandler(channelInitializer) + bootstrap.group(serverParentEventLoopGroup, serverChildEventLoopGroup) + bootstrap.channel(classOf[NioServerSocketChannel]) + bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, settings.Backlog) + bootstrap.option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, settings.TcpReuseAddr) + bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay) + bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive) + settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz)) + settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz)) + settings.WriteBufferHighWaterMark.foreach(sz => + bootstrap.childOption[java.lang.Integer](ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, sz)) + settings.WriteBufferLowWaterMark.foreach(sz => + bootstrap.childOption[java.lang.Integer](ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, sz)) + bootstrap } - private def outboundBootstrap(remoteAddress: Address): ClientBootstrap = { - val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(remoteAddress)) - bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) - bootstrap.setOption("tcpNoDelay", settings.TcpNodelay) - bootstrap.setOption("keepAlive", settings.TcpKeepalive) - settings.ReceiveBufferSize.foreach(sz => bootstrap.setOption("receiveBufferSize", sz)) - settings.SendBufferSize.foreach(sz => bootstrap.setOption("sendBufferSize", sz)) - settings.WriteBufferHighWaterMark.foreach(sz => bootstrap.setOption("writeBufferHighWaterMark", sz)) - settings.WriteBufferLowWaterMark.foreach(sz => bootstrap.setOption("writeBufferLowWaterMark", sz)) + private val inboundBootstrap: ServerBootstrap = { + setupBootstrap(new ServerBootstrap(), serverChildChannelInitializer) + } + + private def outboundBootstrap(remoteAddress: Address): Bootstrap = { + val bootstrap = setupBootstrap(new Bootstrap(), clientChannelInitializer(remoteAddress)) + bootstrap.option[java.lang.Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.ConnectionTimeout.toMillis.toInt) bootstrap } @@ -515,13 +501,13 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } yield { try { val newServerChannel = inboundBootstrap match { - case b: ServerBootstrap => b.bind(address) - case b: ConnectionlessBootstrap => b.bind(address) - case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser + case b: ServerBootstrap => b.bind(address).sync().channel() + case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser } // Block reads until a handler actor is registered - newServerChannel.setReadable(false) + newServerChannel.config().setAutoRead(false) + channelGroup.add(newServerChannel) serverChannel = newServerChannel @@ -530,26 +516,26 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector) addressFromSocketAddress( - newServerChannel.getLocalAddress, + newServerChannel.localAddress(), schemeIdentifier, system.name, Some(settings.Hostname), port) match { case Some(address) => - addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, + addressFromSocketAddress(newServerChannel.localAddress, schemeIdentifier, system.name, None, None) match { case Some(address) => boundTo = address case None => throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") + s"Unknown local address type [${newServerChannel.localAddress.getClass.getName}]") } associationListenerPromise.future.foreach { _ => - newServerChannel.setReadable(true) + newServerChannel.config().setAutoRead(true) } (address, associationListenerPromise) case None => throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.getLocalAddress.getClass.getName}]") + s"Unknown local address type [${newServerChannel.localAddress.getClass.getName}]") } } catch { case NonFatal(e) => { @@ -567,21 +553,21 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA private[pekko] def boundAddress = boundTo override def associate(remoteAddress: Address): Future[AssociationHandle] = { - if (!serverChannel.isBound) Future.failed(new NettyTransportException("Transport is not bound")) + if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound")) else { - val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) + val bootstrap: Bootstrap = outboundBootstrap(remoteAddress) (for { socketAddress <- addressToSocketAddress(remoteAddress) readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel => if (EnableSsl) blocking { - channel.getPipeline.get(classOf[SslHandler]).handshake().awaitUninterruptibly() + channel.pipeline().get(classOf[SslHandler]).renegotiate().awaitUninterruptibly() } - channel.setReadable(false) + channel.config().setAutoRead(false) channel } - handle <- readyChannel.getPipeline.get(classOf[ClientHandler]).statusFuture + handle <- readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture } yield handle).recover { case _: CancellationException => throw new NettyTransportExceptionNoStack("Connection was cancelled") case NonFatal(t) => @@ -598,22 +584,17 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } override def shutdown(): Future[Boolean] = { - def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ => true).recover { case _ => false } + def always(c: ChannelGroupFuture): Future[Boolean] = NettyFutureBridge(c).map(_ => true).recover { case _ => false } for { // Force flush by trying to write an empty buffer and wait for success - unbindStatus <- always(channelGroup.unbind()) - lastWriteStatus <- always(channelGroup.write(ChannelBuffers.buffer(0))) + unbindStatus <- always { channelGroup.close(ChannelMatchers.isServerChannel) } + lastWriteStatus <- always(channelGroup.writeAndFlush(Unpooled.EMPTY_BUFFER)) disconnectStatus <- always(channelGroup.disconnect()) closeStatus <- always(channelGroup.close()) } yield { - // Release the selectors, but don't try to kill the dispatcher - if (UseDispatcherForIo.isDefined) { - clientChannelFactory.shutdown() - serverChannelFactory.shutdown() - } else { - clientChannelFactory.releaseExternalResources() - serverChannelFactory.releaseExternalResources() - } + serverParentEventLoopGroup.shutdownGracefully() + serverChildEventLoopGroup.shutdownGracefully() + clientEventLoopGroup.shutdownGracefully() lastWriteStatus && unbindStatus && disconnectStatus && closeStatus } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index b7504ee6c60..f961f2e2c4e 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -15,11 +15,8 @@ package org.apache.pekko.remote.transport.netty import java.net.InetSocketAddress -import scala.concurrent.{ Future, Promise } - import scala.annotation.nowarn -import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } -import org.jboss.netty.channel._ +import scala.concurrent.{ Future, Promise } import org.apache.pekko import pekko.actor.Address @@ -29,12 +26,15 @@ import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, Ha import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString +import io.netty.buffer.{ ByteBuf, ByteBufUtil } +import io.netty.channel.{ Channel, ChannelHandlerContext } +import io.netty.util.{ AttributeKey, ReferenceCountUtil } + /** * INTERNAL API */ -private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] { - override def initialValue(channel: Channel): Option[HandleEventListener] = None - def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel).foreach { _.notify(msg) } +private[remote] object TcpHandlers { + private val LISTENER = AttributeKey.valueOf[Option[HandleEventListener]]("HandleEventListener") } /** @@ -43,34 +43,48 @@ private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEvent @nowarn("msg=deprecated") private[remote] trait TcpHandlers extends CommonHandlers { protected def log: LoggingAdapter - - import ChannelLocalActor._ + import TcpHandlers._ override def registerListener( channel: Channel, listener: HandleEventListener, - msg: ChannelBuffer, - remoteSocketAddress: InetSocketAddress): Unit = - ChannelLocalActor.set(channel, Some(listener)) + msg: ByteBuf, + remoteSocketAddress: InetSocketAddress): Unit = { + channel.attr(LISTENER).set(Some(listener)) + } override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) - override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { - notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) - log.debug("Remote connection to [{}] was disconnected because of {}", e.getChannel.getRemoteAddress, e) + override protected def onInactive(ctx: ChannelHandlerContext): Unit = { + super.onInactive(ctx) + val channel = ctx.channel() + notifyListener(channel, Disassociated(AssociationHandle.Unknown)) + log.debug("Remote connection to [{}] was disconnected because of channel inactive.", channel.remoteAddress()) } - override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { - val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() - if (bytes.length > 0) notifyListener(e.getChannel, InboundPayload(ByteString(bytes))) + override def onMessage(ctx: ChannelHandlerContext, msg: Any): Unit = { + val byteBuf = msg.asInstanceOf[ByteBuf] + val bytes: Array[Byte] = ByteBufUtil.getBytes(byteBuf) + ReferenceCountUtil.safeRelease(msg) + if (bytes.length > 0) { + notifyListener(ctx.channel(), InboundPayload(ByteString(bytes))) + } } - override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { - notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) - log.warning("Remote connection to [{}] failed with {}", e.getChannel.getRemoteAddress, e.getCause) - e.getChannel.close() // No graceful close here + override protected def onException(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + super.onException(ctx, cause) + val channel = ctx.channel() + notifyListener(channel, Disassociated(AssociationHandle.Unknown)) + log.warning("Remote connection to [{}] failed with {}", channel.remoteAddress(), cause) + channel.close() // No graceful close here } + + private def notifyListener(channel: Channel, msg: HandleEvent): Unit = { + val listener = channel.attr(LISTENER).get() + listener.foreach(_.notify(msg)) + } + } /** @@ -84,9 +98,11 @@ private[remote] class TcpServerHandler( extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers { - override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - initInbound(e.getChannel, e.getChannel.getRemoteAddress, null) - + override def onActive(ctx: ChannelHandlerContext): Unit = { + super.onActive(ctx) + val channel = ctx.channel() + initInbound(channel, channel.remoteAddress(), null) + } } /** @@ -97,9 +113,11 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress extends ClientHandler(_transport, remoteAddress) with TcpHandlers { - override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - initOutbound(e.getChannel, e.getChannel.getRemoteAddress, null) - + override def onActive(ctx: ChannelHandlerContext): Unit = { + super.onActive(ctx) + val channel = ctx.channel() + initOutbound(channel, channel.remoteAddress(), null) + } } /** @@ -118,7 +136,8 @@ private[remote] class TcpAssociationHandle( override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) { - channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) + val length = payload.length + channel.writeAndFlush(channel.alloc().buffer(length).writeBytes(payload.asByteBuffer)) true } else false