From 2b6ccc6b141308d4c36d32a45be3c5729c97320c Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 15 Jul 2023 21:39:02 +0800 Subject: [PATCH] !test Migrate multi node testkit to Netty 4. Signed-off-by: He-Pin --- cluster/src/multi-jvm/resources/logback.xml | 13 ++ .../migrate-to-netty4.backwards.excludes | 57 ++++++ .../remote/testconductor/Conductor.scala | 51 ++--- .../remote/testconductor/DataTypes.scala | 23 ++- .../pekko/remote/testconductor/Player.scala | 67 +++---- .../testconductor/RemoteConnection.scala | 178 +++++++++++------- .../pekko/remote/testkit/MultiNodeSpec.scala | 5 +- project/Dependencies.scala | 10 +- project/MultiNode.scala | 1 + project/SbtMultiJvm.scala | 56 ++++-- 10 files changed, 305 insertions(+), 156 deletions(-) create mode 100644 cluster/src/multi-jvm/resources/logback.xml create mode 100644 multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes diff --git a/cluster/src/multi-jvm/resources/logback.xml b/cluster/src/multi-jvm/resources/logback.xml new file mode 100644 index 00000000000..8472208da38 --- /dev/null +++ b/cluster/src/multi-jvm/resources/logback.xml @@ -0,0 +1,13 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + diff --git a/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes b/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes new file mode 100644 index 00000000000..7b3f57d4965 --- /dev/null +++ b/multi-node-testkit/src/main/mima-filters/1.1.0.backwards.excludes/migrate-to-netty4.backwards.excludes @@ -0,0 +1,57 @@ +# Migrate to netty 4 +ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.pekko.remote.testconductor.RemoteConnection") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.PlayerHandler") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelOpen") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelClosed") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelBound") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelUnbound") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.writeComplete") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.exceptionCaught") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelConnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelDisconnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.messageReceived") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.this") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ConductorHandler") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.clients") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelConnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelDisconnected") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.messageReceived") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder.decode") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder.encode") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgEncoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgEncoder.encode") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.apply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.getAddrString") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.shutdown") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.getPipeline") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.this") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgDecoder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgDecoder.decode") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller.connection") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.unapply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.unapply") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.channel") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy$default$1") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.this") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.channel") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy$default$1") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy$default$1") + +# For Scala 3 these are also needed +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected._1") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data._1") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM._1") diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala index 3bd160ae16b..653d61d6681 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala @@ -16,6 +16,7 @@ package org.apache.pekko.remote.testconductor import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import scala.annotation.nowarn import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ @@ -23,18 +24,13 @@ import scala.reflect.classTag import scala.util.control.NoStackTrace import RemoteConnection.getAddrString +import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter } +import io.netty.channel.ChannelHandler.Sharable import language.postfixOps -import org.jboss.netty.channel.{ - Channel, - ChannelHandlerContext, - ChannelStateEvent, - MessageEvent, - SimpleChannelUpstreamHandler -} import org.apache.pekko -import pekko.PekkoException import pekko.ConfigurationException +import pekko.PekkoException import pekko.actor.{ Actor, ActorRef, @@ -286,32 +282,33 @@ trait Conductor { this: TestConductorExt => * * INTERNAL API. */ +@Sharable private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: ActorRef, log: LoggingAdapter) - extends SimpleChannelUpstreamHandler { + extends ChannelInboundHandlerAdapter { implicit val createTimeout: Timeout = _createTimeout val clients = new ConcurrentHashMap[Channel, ActorRef]() - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelActive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("connection from {}", getAddrString(channel)) val fsm: ActorRef = Await.result((controller ? Controller.CreateServerFSM(channel)).mapTo(classTag[ActorRef]), Duration.Inf) clients.put(channel, fsm) } - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("disconnect from {}", getAddrString(channel)) val fsm = clients.get(channel) fsm ! Controller.ClientDisconnected clients.remove(channel) } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { - val channel = event.getChannel - log.debug("message from {}: {}", getAddrString(channel), event.getMessage) - event.getMessage match { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + val channel = ctx.channel() + log.debug("message from {}: {}", getAddrString(channel), msg) + msg match { case msg: NetworkOp => clients.get(channel) ! msg case msg => @@ -320,6 +317,11 @@ private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: Actor } } + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + log.error("channel {} exception {}", ctx.channel(), cause) + ctx.close() + } } /** @@ -398,10 +400,10 @@ private[pekko] class ServerFSM(val controller: ActorRef, val channel: Channel) log.warning("client {} sent unsupported message {}", getAddrString(channel), msg) stop() case Event(ToClient(msg: UnconfirmedClientOp), _) => - channel.write(msg) + channel.writeAndFlush(msg) stay() case Event(ToClient(msg), None) => - channel.write(msg) + channel.writeAndFlush(msg) stay().using(Some(sender())) case Event(ToClient(msg), _) => log.warning("cannot send {} while waiting for previous ACK", msg) @@ -436,7 +438,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller import Controller._ val settings = TestConductor().Settings - val connection = RemoteConnection( + val connection: RemoteConnection = RemoteConnection( Server, controllerPort, settings.ServerSocketWorkerPoolSize, @@ -472,7 +474,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller override def receive = LoggingReceive { case CreateServerFSM(channel) => - val (ip, port) = channel.getRemoteAddress match { + val (ip, port) = channel.remoteAddress() match { case s: InetSocketAddress => (s.getAddress.getHostAddress, s.getPort) case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser } @@ -525,12 +527,13 @@ private[pekko] class Controller(private var initialParticipants: Int, controller case Remove(node) => barrier ! BarrierCoordinator.RemoveClient(node) } - case GetNodes => sender() ! nodes.keys - case GetSockAddr => sender() ! connection.getLocalAddress + case GetNodes => sender() ! nodes.keys + case GetSockAddr => + sender() ! connection.channelFuture.sync().channel().localAddress() } override def postStop(): Unit = { - RemoteConnection.shutdown(connection) + connection.shutdown() } } diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala index 9e33176d8db..2184ed419bd 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/DataTypes.scala @@ -15,12 +15,9 @@ package org.apache.pekko.remote.testconductor import scala.concurrent.duration._ +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder } import language.implicitConversions -import org.jboss.netty.channel.Channel -import org.jboss.netty.channel.ChannelHandlerContext -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder - import org.apache.pekko import pekko.actor.Address import pekko.remote.testconductor.{ TestConductorProtocol => TCP } @@ -74,7 +71,7 @@ private[pekko] case object Done extends Done { private[pekko] final case class Remove(node: RoleName) extends CommandOp -private[pekko] class MsgEncoder extends OneToOneEncoder { +private[pekko] class MsgEncoder extends MessageToMessageEncoder[AnyRef] { implicit def address2proto(addr: Address): TCP.Address = TCP.Address.newBuilder @@ -90,7 +87,11 @@ private[pekko] class MsgEncoder extends OneToOneEncoder { case Direction.Both => TCP.Direction.Both } - def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match { + override def encode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = { + out.add(encode0(msg)) + } + + private def encode0(msg: AnyRef): AnyRef = msg match { case x: NetworkOp => val w = TCP.Wrapper.newBuilder x match { @@ -136,7 +137,7 @@ private[pekko] class MsgEncoder extends OneToOneEncoder { } } -private[pekko] class MsgDecoder extends OneToOneDecoder { +private[pekko] class MsgDecoder extends MessageToMessageDecoder[AnyRef] { implicit def address2scala(addr: TCP.Address): Address = Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort) @@ -147,7 +148,11 @@ private[pekko] class MsgDecoder extends OneToOneDecoder { case TCP.Direction.Both => Direction.Both } - def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match { + override def decode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = { + out.add(decode0(msg)) + } + + private def decode0(msg: AnyRef): AnyRef = msg match { case w: TCP.Wrapper if w.getAllFields.size == 1 => if (w.hasHello) { val h = w.getHello diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala index c81371e56f8..843e82ff140 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala @@ -15,7 +15,9 @@ package org.apache.pekko.remote.testconductor import java.net.{ ConnectException, InetSocketAddress } import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future } import scala.concurrent.duration._ @@ -23,16 +25,8 @@ import scala.reflect.classTag import scala.util.control.NoStackTrace import scala.util.control.NonFatal -import org.jboss.netty.channel.{ - Channel, - ChannelHandlerContext, - ChannelStateEvent, - ExceptionEvent, - MessageEvent, - SimpleChannelUpstreamHandler, - WriteCompletionEvent -} - +import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter } +import io.netty.channel.ChannelHandler.Sharable import org.apache.pekko import pekko.actor._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -202,7 +196,7 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress case Event(_: ClientOp, _) => stay().replying(Status.Failure(new IllegalStateException("not connected yet"))) case Event(Connected(channel), _) => - channel.write(Hello(name.name, TestConductor().address)) + channel.writeAndFlush(Hello(name.name, TestConductor().address)) goto(AwaitDone).using(Data(Some(channel), None)) case Event(e: ConnectionFailure, _) => log.error(e, "ConnectionFailure") @@ -229,12 +223,12 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress when(Connected) { case Event(Disconnected, _) => log.info("disconnected from TestConductor") - throw new ConnectionFailure("disconnect") + throw ConnectionFailure("disconnect") case Event(ToServer(_: Done), Data(Some(channel), _)) => - channel.write(Done) + channel.writeAndFlush(Done) stay() case Event(ToServer(msg), d @ Data(Some(channel), None)) => - channel.write(msg) + channel.writeAndFlush(msg) val token = msg match { case EnterBarrier(barrier, _) => Some(barrier -> sender()) case GetAddress(node) => Some(node.name -> sender()) @@ -331,6 +325,7 @@ private[pekko] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress * * INTERNAL API. */ +@Sharable private[pekko] class PlayerHandler( server: InetSocketAddress, private var reconnects: Int, @@ -339,57 +334,47 @@ private[pekko] class PlayerHandler( fsm: ActorRef, log: LoggingAdapter, scheduler: Scheduler)(implicit executor: ExecutionContext) - extends SimpleChannelUpstreamHandler { + extends ChannelInboundHandlerAdapter { import ClientFSM._ - reconnect() + var connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection](reconnect()) var nextAttempt: Deadline = _ - override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} open", event.getChannel) - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} closed", event.getChannel) - override def channelBound(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} bound", event.getChannel) - override def channelUnbound(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("channel {} unbound", event.getChannel) - override def writeComplete(ctx: ChannelHandlerContext, event: WriteCompletionEvent) = - log.debug("channel {} written {}", event.getChannel, event.getWrittenAmount) - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - log.debug("channel {} exception {}", event.getChannel, event.getCause) - event.getCause match { + @nowarn("msg=deprecated") + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + log.error("channel {} exception {}", ctx.channel(), cause) + cause match { case _: ConnectException if reconnects > 0 => reconnects -= 1 - scheduler.scheduleOnce(nextAttempt.timeLeft)(reconnect()) + scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect())) case e => fsm ! ConnectionFailure(e.getMessage) } } - private def reconnect(): Unit = { + private def reconnect(): RemoteConnection = { nextAttempt = Deadline.now + backoff RemoteConnection(Client, server, poolSize, this) } - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val ch = event.getChannel + override def channelActive(ctx: ChannelHandlerContext): Unit = { + val ch = ctx.channel() log.debug("connected to {}", getAddrString(ch)) fsm ! Connected(ch) } - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val channel = event.getChannel + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + val channel = ctx.channel() log.debug("disconnected from {}", getAddrString(channel)) fsm ! PoisonPill - executor.execute(new Runnable { def run = RemoteConnection.shutdown(channel) }) // Must be shutdown outside of the Netty IO pool + executor.execute(() => connectionRef.get().shutdown()) // Must be shutdown outside of the Netty IO pool } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { - val channel = event.getChannel - log.debug("message from {}: {}", getAddrString(channel), event.getMessage) - event.getMessage match { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + val channel = ctx.channel() + log.debug("message from {}: {}", getAddrString(channel), msg) + msg match { case msg: NetworkOp => fsm ! msg case msg => diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index dd6f883308c..5ab25a369f9 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -14,68 +14,70 @@ package org.apache.pekko.remote.testconductor import java.net.InetSocketAddress -import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import scala.annotation.nowarn import scala.util.control.NonFatal -import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap } -import org.jboss.netty.buffer.ChannelBuffer -import org.jboss.netty.channel.{ - Channel, - ChannelPipeline, - ChannelPipelineFactory, - ChannelUpstreamHandler, - DefaultChannelPipeline +import io.netty.bootstrap.{ Bootstrap, ServerBootstrap } +import io.netty.buffer.{ ByteBuf, ByteBufUtil } +import io.netty.channel._ +import io.netty.channel.ChannelHandler.Sharable +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } +import io.netty.handler.codec.{ + LengthFieldBasedFrameDecoder, + LengthFieldPrepender, + MessageToMessageDecoder, + MessageToMessageEncoder } -import org.jboss.netty.channel.ChannelHandlerContext -import org.jboss.netty.channel.socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory } -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.oneone.{ OneToOneDecoder, OneToOneEncoder } import org.apache.pekko -import pekko.event.Logging import pekko.protobufv3.internal.Message import pekko.util.Helpers /** * INTERNAL API. */ -private[pekko] class ProtobufEncoder extends OneToOneEncoder { - override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = +private[pekko] class ProtobufEncoder extends MessageToMessageEncoder[Message] { + + override def encode(ctx: ChannelHandlerContext, msg: Message, out: java.util.List[AnyRef]): Unit = { msg match { - case m: Message => - val bytes = m.toByteArray() - ctx.getChannel.getConfig.getBufferFactory.getBuffer(bytes, 0, bytes.length) - case other => other + case message: Message => + val bytes = message.toByteArray + out.add(ctx.alloc().buffer(bytes.length).writeBytes(bytes)) } + } } /** * INTERNAL API. */ -private[pekko] class ProtobufDecoder(prototype: Message) extends OneToOneDecoder { - override def decode(ctx: ChannelHandlerContext, ch: Channel, obj: AnyRef): AnyRef = - obj match { - case buf: ChannelBuffer => - val len = buf.readableBytes() - val bytes = new Array[Byte](len) - buf.getBytes(buf.readerIndex, bytes, 0, len) - prototype.getParserForType.parseFrom(bytes) - case other => other - } +private[pekko] class ProtobufDecoder(prototype: Message) extends MessageToMessageDecoder[ByteBuf] { + + override def decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: java.util.List[AnyRef]): Unit = { + val bytes = ByteBufUtil.getBytes(msg) + out.add(prototype.getParserForType.parseFrom(bytes)) + } } /** * INTERNAL API. */ -private[pekko] class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4)) - val proto = List(new ProtobufEncoder, new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance)) - val msg = List(new MsgEncoder, new MsgDecoder) - (encap ::: proto ::: msg ::: handler :: Nil).foldLeft(new DefaultChannelPipeline) { (pipe, handler) => - pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe - } +@Sharable +private[pekko] class TestConductorPipelineFactory( + handler: ChannelInboundHandler) extends ChannelInitializer[SocketChannel] { + + override def initChannel(ch: SocketChannel): Unit = { + val pipe = ch.pipeline() + pipe.addLast("lengthFieldPrepender", new LengthFieldPrepender(4)) + pipe.addLast("lengthFieldDecoder", new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4, false)) + pipe.addLast("protoEncoder", new ProtobufEncoder) + pipe.addLast("protoDecoder", new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance)) + pipe.addLast("msgEncoder", new MsgEncoder) + pipe.addLast("msgDecoder", new MsgDecoder) + pipe.addLast("userHandler", handler) } } @@ -94,44 +96,90 @@ private[pekko] case object Client extends Role */ private[pekko] case object Server extends Role +/** + * INTERNAL API. + */ +private[pekko] trait RemoteConnection { + + /** + * The channel future associated with this connection. + */ + def channelFuture: ChannelFuture + + /** + * Shutdown the connection and release the resources. + */ + def shutdown(): Unit +} + /** * INTERNAL API. */ private[pekko] object RemoteConnection { - def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler: ChannelUpstreamHandler): Channel = { + def apply( + role: Role, + sockaddr: InetSocketAddress, + poolSize: Int, + handler: ChannelInboundHandler): RemoteConnection = { role match { case Client => - val socketfactory = - new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize) - val bootstrap = new ClientBootstrap(socketfactory) - bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.connect(sockaddr).getChannel + val bootstrap = new Bootstrap() + val eventLoopGroup = new NioEventLoopGroup(poolSize) + val cf = bootstrap + .group(eventLoopGroup) + .channel(classOf[NioSocketChannel]) + .handler(new TestConductorPipelineFactory(handler)) + .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) + .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + .connect(sockaddr) + new RemoteConnection { + override def channelFuture: ChannelFuture = cf + + @nowarn("msg=deprecated") + override def shutdown(): Unit = { + try { + channelFuture.channel().close().sync() + eventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS) + } catch { + case NonFatal(_) => // silence this one to not make tests look like they failed, it's not really critical + } + } + } + case Server => - val socketfactory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool, poolSize) - val bootstrap = new ServerBootstrap(socketfactory) - bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler)) - bootstrap.setOption("reuseAddress", !Helpers.isWindows) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.bind(sockaddr) + val bootstrap = new ServerBootstrap() + val parentEventLoopGroup = new NioEventLoopGroup(poolSize) + val childEventLoopGroup = new NioEventLoopGroup(poolSize) + val cf = bootstrap + .group(parentEventLoopGroup, childEventLoopGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new TestConductorPipelineFactory(handler)) + .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR, !Helpers.isWindows) + .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048) + .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) + .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) + .bind(sockaddr) + .sync() + + new RemoteConnection { + override def channelFuture: ChannelFuture = cf + + @nowarn("msg=deprecated") + override def shutdown(): Unit = { + try { + channelFuture.channel().close().sync() + parentEventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS) + childEventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS) + } catch { + case NonFatal(_) => // silence this one to not make tests look like they failed, it's not really critical + } + } + } } } - def getAddrString(channel: Channel) = channel.getRemoteAddress match { + def getAddrString(channel: Channel): String = channel.remoteAddress() match { case i: InetSocketAddress => i.toString case _ => "[unknown]" } - - def shutdown(channel: Channel): Unit = { - try { - try channel.close() - finally - try channel.getFactory.shutdown() - finally channel.getFactory.releaseExternalResources() - } catch { - case NonFatal(_) => - // silence this one to not make tests look like they failed, it's not really critical - } - } } diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala index 6597ebf9840..95f6a3f1df9 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testkit/MultiNodeSpec.scala @@ -14,14 +14,15 @@ package org.apache.pekko.remote.testkit import java.net.{ InetAddress, InetSocketAddress } + import scala.collection.immutable import scala.concurrent.{ Await, Awaitable } import scala.concurrent.duration._ import scala.util.control.NonFatal -import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } +import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } +import io.netty.channel.ChannelException import language.implicitConversions -import org.jboss.netty.channel.ChannelException import org.apache.pekko import pekko.actor._ import pekko.actor.RootActorPath diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e389f55fbdf..9da66d667b1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -29,6 +29,7 @@ object Dependencies { // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle val agronaVersion = "1.15.1" val nettyVersion = "3.10.6.Final" + val netty4Version = "4.1.94.Final" val protobufJavaVersion = "3.16.3" val logbackVersion = "1.2.11" @@ -60,8 +61,11 @@ object Dependencies { val config = "com.typesafe" % "config" % "1.4.2" val netty = "io.netty" % "netty" % nettyVersion + val `netty-transport` = "io.netty" % "netty-transport" % netty4Version + val `netty-handler` = "io.netty" % "netty-handler" % netty4Version - val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _) + val scalaReflect: ScalaVersionDependentModuleID = + ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _) val slf4jApi = "org.slf4j" % "slf4j-api" % slf4jVersion @@ -286,9 +290,9 @@ object Dependencies { val remoteTests = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) ++ remoteDependencies - val multiNodeTestkit = l ++= Seq(netty) + val multiNodeTestkit = l ++= Seq(`netty-transport`, `netty-handler`) - val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) + val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value, TestDependencies.logback) val clusterTools = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) diff --git a/project/MultiNode.scala b/project/MultiNode.scala index 6c0d71e1ae7..987823190ca 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -82,6 +82,7 @@ object MultiNode extends AutoPlugin { multiJvmCreateLogger / logLevel := Level.Debug, // to see ssh establishment MultiJvm / assembly / assemblyMergeStrategy := { case n if n.endsWith("logback-test.xml") => MergeStrategy.first + case n if n.endsWith("io.netty.versions.properties") => MergeStrategy.first case n if n.toLowerCase.matches("meta-inf.*\\.default") => MergeStrategy.first case n => (MultiJvm / assembly / assemblyMergeStrategy).value.apply(n) }, diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala index d107c823119..b05cf847012 100644 --- a/project/SbtMultiJvm.scala +++ b/project/SbtMultiJvm.scala @@ -15,15 +15,16 @@ import scala.sys.process.Process import sjsonnew.BasicJsonProtocol._ import sbt._ import Keys._ + import java.io.File import java.lang.Boolean.getBoolean - -import scala.Console.{ GREEN, RESET } - import sbtassembly.AssemblyPlugin.assemblySettings import sbtassembly.{ AssemblyKeys, MergeStrategy } import AssemblyKeys._ +import java.net.{ InetSocketAddress, Socket } +import java.util.concurrent.TimeUnit + object MultiJvmPlugin extends AutoPlugin { case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String]) @@ -167,10 +168,11 @@ object MultiJvmPlugin extends AutoPlugin { // the first class wins just like a classpath // just concatenate conflicting text files assembly / assemblyMergeStrategy := { - case n if n.endsWith(".class") => MergeStrategy.first - case n if n.endsWith(".txt") => MergeStrategy.concat - case n if n.endsWith("NOTICE") => MergeStrategy.concat - case n => (assembly / assemblyMergeStrategy).value.apply(n) + case n if n.endsWith(".class") => MergeStrategy.first + case n if n.endsWith(".txt") => MergeStrategy.concat + case n if n.endsWith("NOTICE") => MergeStrategy.concat + case n if n.endsWith("LICENSE") => MergeStrategy.concat + case n => (assembly / assemblyMergeStrategy).value.apply(n) }, assembly / assemblyJarName := { name.value + "_" + scalaVersion.value + "-" + version.value + "-multi-jvm-assembly.jar" @@ -372,11 +374,41 @@ object MultiJvmPlugin extends AutoPlugin { val connectInput = input && index == 0 log.debug("Starting %s for %s".format(jvmName, testClass)) log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" "))) - (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput)) + val testClass2Process = (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput)) + if (index == 0) { + log.debug("%s for %s 's started as `Controller`, waiting before can be connected for clients.".format(jvmName, + testClass)) + val controllerHost = hosts.head + val serverPort: Int = Integer.getInteger("multinode.server-port", 4711) + waitingBeforeConnectable(controllerHost, serverPort, TimeUnit.SECONDS.toMillis(20L)) + } + testClass2Process } processExitCodes(name, processes, log) } + private def waitingBeforeConnectable(host: String, port: Int, timeoutInMillis: Long): Unit = { + val inetSocketAddress = new InetSocketAddress(host, port) + def telnet(addr: InetSocketAddress, timeout: Int): Boolean = { + val socket: Socket = new Socket() + try { + socket.connect(inetSocketAddress, timeout) + socket.isConnected + } catch { + case _: Exception => false + } finally { + socket.close() + } + } + + val startTime = System.currentTimeMillis() + var connectivity = false + while (!connectivity && (System.currentTimeMillis() - startTime < timeoutInMillis)) { + connectivity = telnet(inetSocketAddress, 1000) + TimeUnit.MILLISECONDS.sleep(100) + } + } + def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = { val exitCodes = processes.map { case (testClass, process) => (testClass, process.exitValue()) @@ -558,7 +590,7 @@ object MultiJvmPlugin extends AutoPlugin { private def getMultiNodeCommandLineOptions(hosts: Seq[String], index: Int, maxNodes: Int): Seq[String] = { Seq( "-Dmultinode.max-nodes=" + maxNodes, - "-Dmultinode.server-host=" + hosts(0).split("@").last, + "-Dmultinode.server-host=" + hosts.head.split("@").last, "-Dmultinode.host=" + hosts(index).split("@").last, "-Dmultinode.index=" + index) } @@ -573,7 +605,7 @@ object MultiJvmPlugin extends AutoPlugin { if (hosts.isEmpty) { if (hostsFile.exists && hostsFile.canRead) { s.log.info("Using hosts defined in file " + hostsFile.getAbsolutePath) - IO.readLines(hostsFile).map(_.trim).filter(_.length > 0).toIndexedSeq + IO.readLines(hostsFile).map(_.trim).filter(_.nonEmpty).toIndexedSeq } else hosts.toIndexedSeq } else { @@ -585,7 +617,7 @@ object MultiJvmPlugin extends AutoPlugin { theHosts.map { x => val elems = x.split(":").toList.take(2).padTo(2, defaultJava) - (elems(0), elems(1)) - } unzip + (elems.head, elems(1)) + }.unzip } }