Skip to content

Commit

Permalink
!remoting Update classic transport from Netty 3 to netty4
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 9, 2023
1 parent d0099a1 commit 2517d0a
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 212 deletions.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ SortImports.blocks = [
"com.sun."
"org.apache.pekko."
"org.reactivestreams."
"io.netty."
]
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ object Dependencies {
// needs to be inline with the aeron version, check
// https://github.com/real-logic/aeron/blob/1.x.y/build.gradle
val agronaVersion = "1.19.2"
val nettyVersion = "3.10.6.Final"
val netty4Version = "4.1.96.Final"
val protobufJavaVersion = "3.19.6"
val logbackVersion = "1.2.11"
Expand Down Expand Up @@ -60,7 +59,6 @@ object Dependencies {
// Compile

val config = "com.typesafe" % "config" % "1.4.2"
val netty = "io.netty" % "netty" % nettyVersion
val `netty-transport` = "io.netty" % "netty-transport" % netty4Version
val `netty-handler` = "io.netty" % "netty-handler" % netty4Version

Expand Down Expand Up @@ -278,7 +276,7 @@ object Dependencies {
Compile.slf4jApi,
TestDependencies.scalatest.value)

val remoteDependencies = Seq(netty, aeronDriver, aeronClient)
val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient)
val remoteOptionalDependencies = remoteDependencies.map(_ % "optional")

val remote = l ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/Paradox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object Paradox {
"scalatest.version" -> Dependencies.scalaTestVersion,
"sigar_loader.version" -> "1.6.6-rev002",
"aeron_version" -> Dependencies.aeronVersion,
"netty_version" -> Dependencies.nettyVersion,
"netty_version" -> Dependencies.netty4Version,
"logback_version" -> Dependencies.logbackVersion))

val rootsSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package org.apache.pekko.remote.classic

import com.typesafe.config.ConfigFactory
import org.jboss.netty.channel.ChannelException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.testkit.SocketUtil

import java.net.BindException

class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {

"an ActorSystem" must {
Expand All @@ -43,10 +43,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
val ex = intercept[BindException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
ex.getMessage should startWith("Address already in use")
} finally {
as.terminate()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#migrate the classic transport to Netty4
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.transport.netty.NettyFutureBridge.apply")
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private[pekko] class RemoteActorRefProvider(
private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = {
checkClassOrThrow(
system,
"org.jboss.netty.channel.Channel",
"io.netty.channel.Channel",
"Classic",
"Netty",
"https://pekko.apache.org/docs/pekko/current/remoting.html")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,37 @@

package org.apache.pekko.remote.transport.netty

import java.nio.channels.ClosedChannelException
import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler }

import java.nio.channels.ClosedChannelException
import scala.util.control.NonFatal

import org.jboss.netty.channel._

import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import scala.annotation.nowarn

/**
* INTERNAL API
*/
private[netty] trait NettyHelpers {

protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onConnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onDisconnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = ()
protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = ()
protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = ()

final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = {
val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause")
final protected def transformException(ctx: ChannelHandlerContext, ex: Throwable): Unit = {
val cause = if (ex ne null) ex else new PekkoException("Unknown cause")
cause match {
case _: ClosedChannelException => // Ignore
case null | NonFatal(_) => onException(ctx, ev)
case null | NonFatal(_) => onException(ctx, ex)
case e: Throwable => throw e // Rethrow fatals
}
}
Expand All @@ -51,54 +52,46 @@ private[netty] trait NettyHelpers {
/**
* INTERNAL API
*/
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {
private[netty] trait NettyServerHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers {

final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
onMessage(ctx, msg)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)
@nowarn("msg=deprecated")
final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit =
transformException(ctx, cause)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
final override def channelActive(ctx: ChannelHandlerContext): Unit = {
onOpen(ctx)
onConnect(ctx)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
final override def channelInactive(ctx: ChannelHandlerContext): Unit = {
onDisconnect(ctx)
}
}

/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
}
private[netty] trait NettyClientHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers {

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
onMessage(ctx, msg)
}

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
@nowarn("msg=deprecated")
final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
final override def channelActive(ctx: ChannelHandlerContext): Unit = {
onOpen(ctx)
onConnect(ctx)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
final override def channelInactive(ctx: ChannelHandlerContext): Unit = {
onDisconnect(ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ package org.apache.pekko.remote.transport.netty

import scala.annotation.nowarn
import com.typesafe.config.Config
import org.jboss.netty.handler.ssl.SslHandler

import io.netty.channel.Channel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.{ Future, GenericFutureListener }
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._
Expand Down Expand Up @@ -64,6 +65,14 @@ private[pekko] object NettySSLSupport {
val sslEngine =
if (isClient) sslEngineProvider.createClientSSLEngine()
else sslEngineProvider.createServerSSLEngine()
new SslHandler(sslEngine)
val handler = new SslHandler(sslEngine)
handler.handshakeFuture().addListener(new GenericFutureListener[Future[Channel]] {
override def operationComplete(future: Future[Channel]): Unit = {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
}
})
handler
}
}
Loading

0 comments on commit 2517d0a

Please sign in to comment.