Skip to content

Commit

Permalink
!test Migrate multi node testkit to Netty 4.
Browse files Browse the repository at this point in the history
Signed-off-by: He-Pin <[email protected]>
  • Loading branch information
He-Pin committed Aug 5, 2023
1 parent 5b97885 commit 2b6ccc6
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 156 deletions.
13 changes: 13 additions & 0 deletions cluster/src/multi-jvm/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
<logger name="io.netty.util.Recycler" level="ERROR" />
<logger name="io.netty.buffer.PoolThreadCache" level="ERROR" />
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Migrate to netty 4
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.pekko.remote.testconductor.RemoteConnection")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.PlayerHandler")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelOpen")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelClosed")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelBound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelUnbound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.writeComplete")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.exceptionCaught")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.messageReceived")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ConductorHandler")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.clients")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.messageReceived")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder.decode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder.encode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgEncoder.encode")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.apply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.getAddrString")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.shutdown")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.getPipeline")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgDecoder.decode")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller.connection")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy$default$1")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.channel")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy$default$1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy$default$1")

# For Scala 3 these are also needed
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected._1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data._1")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM._1")
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,21 @@ package org.apache.pekko.remote.testconductor
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.classTag
import scala.util.control.NoStackTrace

import RemoteConnection.getAddrString
import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable
import language.postfixOps
import org.jboss.netty.channel.{
Channel,
ChannelHandlerContext,
ChannelStateEvent,
MessageEvent,
SimpleChannelUpstreamHandler
}

import org.apache.pekko
import pekko.PekkoException
import pekko.ConfigurationException
import pekko.PekkoException
import pekko.actor.{
Actor,
ActorRef,
Expand Down Expand Up @@ -286,32 +282,33 @@ trait Conductor { this: TestConductorExt =>
*
* INTERNAL API.
*/
@Sharable
private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: ActorRef, log: LoggingAdapter)
extends SimpleChannelUpstreamHandler {
extends ChannelInboundHandlerAdapter {

implicit val createTimeout: Timeout = _createTimeout
val clients = new ConcurrentHashMap[Channel, ActorRef]()

override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelActive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("connection from {}", getAddrString(channel))
val fsm: ActorRef =
Await.result((controller ? Controller.CreateServerFSM(channel)).mapTo(classTag[ActorRef]), Duration.Inf)
clients.put(channel, fsm)
}

override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("disconnect from {}", getAddrString(channel))
val fsm = clients.get(channel)
fsm ! Controller.ClientDisconnected
clients.remove(channel)
}

override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val channel = event.getChannel
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
event.getMessage match {
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
val channel = ctx.channel()
log.debug("message from {}: {}", getAddrString(channel), msg)
msg match {
case msg: NetworkOp =>
clients.get(channel) ! msg
case msg =>
Expand All @@ -320,6 +317,11 @@ private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: Actor
}
}

@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
ctx.close()
}
}

/**
Expand Down Expand Up @@ -398,10 +400,10 @@ private[pekko] class ServerFSM(val controller: ActorRef, val channel: Channel)
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
stop()
case Event(ToClient(msg: UnconfirmedClientOp), _) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay()
case Event(ToClient(msg), None) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay().using(Some(sender()))
case Event(ToClient(msg), _) =>
log.warning("cannot send {} while waiting for previous ACK", msg)
Expand Down Expand Up @@ -436,7 +438,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
import Controller._

val settings = TestConductor().Settings
val connection = RemoteConnection(
val connection: RemoteConnection = RemoteConnection(
Server,
controllerPort,
settings.ServerSocketWorkerPoolSize,
Expand Down Expand Up @@ -472,7 +474,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller

override def receive = LoggingReceive {
case CreateServerFSM(channel) =>
val (ip, port) = channel.getRemoteAddress match {
val (ip, port) = channel.remoteAddress() match {
case s: InetSocketAddress => (s.getAddress.getHostAddress, s.getPort)
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
Expand Down Expand Up @@ -525,12 +527,13 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
case Remove(node) =>
barrier ! BarrierCoordinator.RemoveClient(node)
}
case GetNodes => sender() ! nodes.keys
case GetSockAddr => sender() ! connection.getLocalAddress
case GetNodes => sender() ! nodes.keys
case GetSockAddr =>
sender() ! connection.channelFuture.sync().channel().localAddress()
}

override def postStop(): Unit = {
RemoteConnection.shutdown(connection)
connection.shutdown()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package org.apache.pekko.remote.testconductor

import scala.concurrent.duration._

import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder }
import language.implicitConversions
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder

import org.apache.pekko
import pekko.actor.Address
import pekko.remote.testconductor.{ TestConductorProtocol => TCP }
Expand Down Expand Up @@ -74,7 +71,7 @@ private[pekko] case object Done extends Done {

private[pekko] final case class Remove(node: RoleName) extends CommandOp

private[pekko] class MsgEncoder extends OneToOneEncoder {
private[pekko] class MsgEncoder extends MessageToMessageEncoder[AnyRef] {

implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
Expand All @@ -90,7 +87,11 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
case Direction.Both => TCP.Direction.Both
}

def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def encode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(encode0(msg))
}

private def encode0(msg: AnyRef): AnyRef = msg match {
case x: NetworkOp =>
val w = TCP.Wrapper.newBuilder
x match {
Expand Down Expand Up @@ -136,7 +137,7 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
}
}

private[pekko] class MsgDecoder extends OneToOneDecoder {
private[pekko] class MsgDecoder extends MessageToMessageDecoder[AnyRef] {

implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
Expand All @@ -147,7 +148,11 @@ private[pekko] class MsgDecoder extends OneToOneDecoder {
case TCP.Direction.Both => Direction.Both
}

def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def decode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(decode0(msg))
}

private def decode0(msg: AnyRef): AnyRef = msg match {
case w: TCP.Wrapper if w.getAllFields.size == 1 =>
if (w.hasHello) {
val h = w.getHello
Expand Down
Loading

0 comments on commit 2b6ccc6

Please sign in to comment.