Skip to content

Commit

Permalink
Migrate the classic transport to Netty 4.
Browse files Browse the repository at this point in the history
Signed-off-by: He-Pin <[email protected]>
  • Loading branch information
He-Pin committed Sep 8, 2023
1 parent 68dd099 commit 739f0fd
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 231 deletions.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ SortImports.blocks = [
"com.sun."
"org.apache.pekko."
"org.reactivestreams."
"io.netty."
]
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ object Dependencies {
// needs to be inline with the aeron version, check
// https://github.com/real-logic/aeron/blob/1.x.y/build.gradle
val agronaVersion = "1.15.1"
val nettyVersion = "3.10.6.Final"
val netty4Version = "4.1.96.Final"
val protobufJavaVersion = "3.19.6"
val logbackVersion = "1.2.11"
Expand Down Expand Up @@ -60,7 +59,6 @@ object Dependencies {
// Compile

val config = "com.typesafe" % "config" % "1.4.2"
val netty = "io.netty" % "netty" % nettyVersion
val `netty-transport` = "io.netty" % "netty-transport" % netty4Version
val `netty-handler` = "io.netty" % "netty-handler" % netty4Version

Expand Down Expand Up @@ -278,7 +276,7 @@ object Dependencies {
Compile.slf4jApi,
TestDependencies.scalatest.value)

val remoteDependencies = Seq(netty, aeronDriver, aeronClient)
val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient)
val remoteOptionalDependencies = remoteDependencies.map(_ % "optional")

val remote = l ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/Paradox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object Paradox {
"scalatest.version" -> Dependencies.scalaTestVersion,
"sigar_loader.version" -> "1.6.6-rev002",
"aeron_version" -> Dependencies.aeronVersion,
"netty_version" -> Dependencies.nettyVersion,
"netty_version" -> Dependencies.netty4Version,
"logback_version" -> Dependencies.logbackVersion))

val rootsSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
package org.apache.pekko.remote.classic

import com.typesafe.config.ConfigFactory
import org.jboss.netty.channel.ChannelException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.testkit.SocketUtil

import java.net.BindException

class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {

"an ActorSystem" must {
Expand All @@ -43,10 +44,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
val ex = intercept[BindException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
ex.getMessage should startWith("Address already in use: bind")
} finally {
as.terminate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

package org.apache.pekko.remote

import scala.annotation.nowarn
import scala.concurrent.Future
import scala.util.Failure
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal

import scala.annotation.nowarn

import org.apache.pekko
import pekko.ConfigurationException
import pekko.Done
Expand Down Expand Up @@ -281,7 +280,7 @@ private[pekko] class RemoteActorRefProvider(
private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = {
checkClassOrThrow(
system,
"org.jboss.netty.channel.Channel",
"io.netty.channel.Channel",
"Classic",
"Netty",
"https://pekko.apache.org/docs/pekko/current/remoting.html")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,33 @@ package org.apache.pekko.remote.transport.netty

import java.nio.channels.ClosedChannelException

import scala.annotation.nowarn
import scala.util.control.NonFatal

import org.jboss.netty.channel._

import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import io.netty.channel.{ ChannelHandlerContext, ChannelInboundHandlerAdapter }

/**
* INTERNAL API
*/
private[netty] trait NettyHelpers {

protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()

protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onActive(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onInactive(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = ()
protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: Any): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = ()
protected def onException(@unused ctx: ChannelHandlerContext, @unused cause: Throwable): Unit = ()

final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = {
val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause")
final protected def transformException(ctx: ChannelHandlerContext, exception: Throwable): Unit = {
val cause = if (exception.getCause ne null) exception.getCause else new PekkoException("Unknown cause")
cause match {
case _: ClosedChannelException => // Ignore
case null | NonFatal(_) => onException(ctx, ev)
case null | NonFatal(_) => onException(ctx, exception)
case e: Throwable => throw e // Rethrow fatals
}
}
Expand All @@ -51,54 +50,51 @@ private[netty] trait NettyHelpers {
/**
* INTERNAL API
*/
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {
private[netty] trait NettyServerHelpers extends ChannelInboundHandlerAdapter with NettyHelpers {

final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
super.channelRead(ctx, msg)
onMessage(ctx, msg)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
override def channelActive(ctx: ChannelHandlerContext): Unit = {
super.channelActive(ctx)
onActive(ctx)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
super.channelInactive(ctx)
onInactive(ctx)
}
}

/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
}
private[netty] trait NettyClientHelpers extends ChannelInboundHandlerAdapter with NettyHelpers {

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
super.channelRead(ctx, msg)
onMessage(ctx, msg)
}

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
override def channelActive(ctx: ChannelHandlerContext): Unit = {
super.channelActive(ctx)
onActive(ctx)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
super.channelInactive(ctx)
onInactive(ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
package org.apache.pekko.remote.transport.netty

import scala.annotation.nowarn
import com.typesafe.config.Config
import org.jboss.netty.handler.ssl.SslHandler

import com.typesafe.config.Config
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._

import io.netty.handler.ssl.SslHandler

/**
* INTERNAL API
*/
Expand Down
Loading

0 comments on commit 739f0fd

Please sign in to comment.