Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate the classic transport to Netty 4 without CVEs #643

Merged
merged 5 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ SortImports.blocks = [
"com.sun."
"org.apache.pekko."
"org.reactivestreams."
"io.netty."
]
10 changes: 4 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ object Dependencies {
// needs to be inline with the aeron version, check
// https://github.com/real-logic/aeron/blob/1.x.y/build.gradle
val agronaVersion = "1.19.2"
val nettyVersion = "3.10.6.Final"
val netty4Version = "4.1.96.Final"
val nettyVersion = "4.1.97.Final"
val protobufJavaVersion = "3.19.6"
val logbackVersion = "1.2.11"

Expand Down Expand Up @@ -60,9 +59,8 @@ object Dependencies {
// Compile

val config = "com.typesafe" % "config" % "1.4.2"
val netty = "io.netty" % "netty" % nettyVersion
val `netty-transport` = "io.netty" % "netty-transport" % netty4Version
val `netty-handler` = "io.netty" % "netty-handler" % netty4Version
val `netty-transport` = "io.netty" % "netty-transport" % nettyVersion
val `netty-handler` = "io.netty" % "netty-handler" % nettyVersion

val scalaReflect: ScalaVersionDependentModuleID =
ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _)
Expand Down Expand Up @@ -278,7 +276,7 @@ object Dependencies {
Compile.slf4jApi,
TestDependencies.scalatest.value)

val remoteDependencies = Seq(netty, aeronDriver, aeronClient)
val remoteDependencies = Seq(`netty-transport`, `netty-handler`, aeronDriver, aeronClient)
val remoteOptionalDependencies = remoteDependencies.map(_ % "optional")

val remote = l ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package org.apache.pekko.remote.classic

import com.typesafe.config.ConfigFactory
import org.jboss.netty.channel.ChannelException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.testkit.SocketUtil

import java.net.BindException

class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {

"an ActorSystem" must {
Expand All @@ -43,10 +43,10 @@ class RemotingFailedToBindSpec extends AnyWordSpec with Matchers {
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
val ex = intercept[BindException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
ex.getMessage should startWith("Address already in use")
} finally {
as.terminate()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#migrate the classic transport to Netty4
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.transport.netty.NettyFutureBridge.apply")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor$")
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private[pekko] class RemoteActorRefProvider(
private def checkNettyOnClassPath(system: ActorSystemImpl): Unit = {
checkClassOrThrow(
system,
"org.jboss.netty.channel.Channel",
"io.netty.channel.Channel",
"Classic",
"Netty",
"https://pekko.apache.org/docs/pekko/current/remoting.html")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,36 @@ package org.apache.pekko.remote.transport.netty

import java.nio.channels.ClosedChannelException

import scala.annotation.nowarn
import scala.util.control.NonFatal

import org.jboss.netty.channel._

import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler }

/**
* INTERNAL API
*/
private[netty] trait NettyHelpers {

protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onConnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onDisconnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = ()
protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = ()
protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = ()

final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = {
val cause = if (ev.getCause ne null) ev.getCause else new PekkoException("Unknown cause")
final protected def transformException(ctx: ChannelHandlerContext, ex: Throwable): Unit = {
val cause = if (ex ne null) ex else new PekkoException("Unknown cause")
cause match {
case _: ClosedChannelException => // Ignore
case null | NonFatal(_) => onException(ctx, ev)
case null | NonFatal(_) => onException(ctx, ex)
case e: Throwable => throw e // Rethrow fatals
}
}
Expand All @@ -51,54 +53,33 @@ private[netty] trait NettyHelpers {
/**
* INTERNAL API
*/
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {

final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf]
with NettyHelpers {
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
onMessage(ctx, msg)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
@nowarn("msg=deprecated")
final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
final override def channelActive(ctx: ChannelHandlerContext): Unit = {
onOpen(ctx)
onConnect(ctx)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
final override def channelInactive(ctx: ChannelHandlerContext): Unit = {
onDisconnect(ctx)
}
}

/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)
onMessage(ctx, e)
}

final override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = transformException(ctx, e)

final override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelConnected(ctx, e)
onConnect(ctx, e)
}
private[netty] trait NettyServerHelpers extends NettyChannelHandlerAdapter

final override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelOpen(ctx, e)
onOpen(ctx, e)
}

final override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
super.channelDisconnected(ctx, e)
onDisconnect(ctx, e)
}
}
/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends NettyChannelHandlerAdapter
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package org.apache.pekko.remote.transport.netty

import scala.annotation.nowarn
import com.typesafe.config.Config
import org.jboss.netty.handler.ssl.SslHandler

import com.typesafe.config.Config
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._

import io.netty.channel.Channel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.Future

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -64,6 +67,12 @@ private[pekko] object NettySSLSupport {
val sslEngine =
if (isClient) sslEngineProvider.createClientSSLEngine()
else sslEngineProvider.createServerSSLEngine()
new SslHandler(sslEngine)
val handler = new SslHandler(sslEngine)
handler.handshakeFuture().addListener((future: Future[Channel]) => {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
})
handler
}
}
Loading