Skip to content

Commit

Permalink
Migrate the classic transport to Netty 4 without CVEs (#643)
Browse files Browse the repository at this point in the history
* !remoting Update classic transport from Netty 3 to netty4

* use lambda

* =sbt Update Netty4 version to 4.1.97.final

* Reduce allocation in ChannelLocalActor.

* Remove the duplicated code in NettyHelpers.
  • Loading branch information
He-Pin authored Sep 15, 2023
1 parent d0b9c43 commit 46edc51
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 254 deletions.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ SortImports.blocks = [
"com.sun."
"org.apache.pekko."
"org.reactivestreams."
"io.netty."
]
10 changes: 4 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ object Dependencies {
// needs to be inline with the aeron version, check
// https://github.com/real-logic/aeron/blob/1.x.y/build.gradle
val agronaVersion = "1.19.2"
val nettyVersion = "3.10.6.Final"
val netty4Version = "4.1.96.Final"
val nettyVersion = "4.1.97.Final"
val protobufJavaVersion = "3.19.6"
val logbackVersion = "1.2.11"

Expand Down Expand Up @@ -60,9 +59,8 @@ object Dependencies {
// Compile

val config = "com.typesafe" % "config" % "1.4.2"
val netty = "io.netty" % "netty" % nettyVersion
val `netty-transport` = "io.netty" % "netty-transport" % netty4Version
val `netty-handler` = "io.netty" % "netty-handler" % netty4Version
val `netty-transport` = "io.netty" % "netty-transport" % nettyVersion
val `netty-handler` = "io.netty" % "netty-handler" % nettyVersion

val scalaReflect: ScalaVersionDependentModuleID =
ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _)
Expand Down Expand Up @@ -278,7 +276,7 @@ object Dependencies {
Compile.slf4jApi,
TestDependencies.scalatest.value)

val remoteDependencies = Seq(netty, aeronDriver, aeronClient)
val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient)
val remoteOptionalDependencies = remoteDependencies.map(_ % "optional")

val remote = l ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package org.apache.pekko.remote.classic

import com.typesafe.config.ConfigFactory
import org.jboss.netty.channel.ChannelException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.testkit.SocketUtil

import java.net.BindException

class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {

"an ActorSystem" must {
Expand All @@ -43,10 +43,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
val ex = intercept[BindException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
ex.getMessage should startWith("Address already in use")
} finally {
as.terminate()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#migrate the classic transport to Netty4
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.transport.netty.NettyFutureBridge.apply")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor$")
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private[pekko] class RemoteActorRefProvider(
private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = {
checkClassOrThrow(
system,
"org.jboss.netty.channel.Channel",
"io.netty.channel.Channel",
"Classic",
"Netty",
"https://pekko.apache.org/docs/pekko/current/remoting.html")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,36 @@ package org.apache.pekko.remote.transport.netty

import java.nio.channels.ClosedChannelException

import scala.annotation.nowarn
import scala.util.control.NonFatal

import org.jboss.netty.channel._

import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler }

/**
* INTERNAL API
*/
private[netty] trait NettyHelpers {

protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onConnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onDisconnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = ()
protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = ()
protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = ()

final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = {
val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause")
final protected def transformException(ctx: ChannelHandlerContext, ex: Throwable): Unit = {
val cause = if (ex ne null) ex else new PekkoException("Unknown cause")
cause match {
case _: ClosedChannelException => // Ignore
case null | NonFatal(_) => onException(ctx, ev)
case null | NonFatal(_) => onException(ctx, ex)
case e: Throwable => throw e // Rethrow fatals
}
}
Expand All @@ -51,54 +53,33 @@ private[netty] trait NettyHelpers {
/**
* INTERNAL API
*/
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {

final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf]
with NettyHelpers {
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
onMessage(ctx, msg)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
@nowarn("msg=deprecated")
final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
final override def channelActive(ctx: ChannelHandlerContext): Unit = {
onOpen(ctx)
onConnect(ctx)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
final override def channelInactive(ctx: ChannelHandlerContext): Unit = {
onDisconnect(ctx)
}
}

/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
}
private[netty] trait NettyServerHelpers extends NettyChannelHandlerAdapter

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
}
}
/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends NettyChannelHandlerAdapter
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package org.apache.pekko.remote.transport.netty

import scala.annotation.nowarn
import com.typesafe.config.Config
import org.jboss.netty.handler.ssl.SslHandler

import com.typesafe.config.Config
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._

import io.netty.channel.Channel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.Future

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -64,6 +67,12 @@ private[pekko] object NettySSLSupport {
val sslEngine =
if (isClient) sslEngineProvider.createClientSSLEngine()
else sslEngineProvider.createServerSSLEngine()
new SslHandler(sslEngine)
val handler = new SslHandler(sslEngine)
handler.handshakeFuture().addListener((future: Future[Channel]) => {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
})
handler
}
}
Loading

0 comments on commit 46edc51

Please sign in to comment.