From 9236d756dd44ff08e736fe26015a7b408a2e67d5 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 13 Sep 2023 00:03:58 +0800 Subject: [PATCH] Remove the duplicated code in NettyHelpers. --- .../remote/transport/netty/NettyHelpers.scala | 40 +++++++------------ .../transport/netty/NettySSLSupport.scala | 8 ++-- .../transport/netty/NettyTransport.scala | 11 +++-- .../remote/transport/netty/TcpSupport.scala | 10 +++-- 4 files changed, 32 insertions(+), 37 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala index 8e3a7c55a02..3b17a577303 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyHelpers.scala @@ -13,16 +13,17 @@ package org.apache.pekko.remote.transport.netty -import io.netty.buffer.ByteBuf -import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler } - import java.nio.channels.ClosedChannelException + +import scala.annotation.nowarn import scala.util.control.NonFatal + import org.apache.pekko import pekko.PekkoException import pekko.util.unused -import scala.annotation.nowarn +import io.netty.buffer.ByteBuf +import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler } /** * INTERNAL API @@ -52,15 +53,16 @@ private[netty] trait NettyHelpers { /** * INTERNAL API */ -private[netty] trait NettyServerHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers { - +private[netty] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf] + with NettyHelpers { final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { onMessage(ctx, msg) } @nowarn("msg=deprecated") - final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = + final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { transformException(ctx, cause) + } final override def channelActive(ctx: ChannelHandlerContext): Unit = { onOpen(ctx) @@ -75,23 +77,9 @@ private[netty] trait NettyServerHelpers extends SimpleChannelInboundHandler[Byte /** * INTERNAL API */ -private[netty] trait NettyClientHelpers extends SimpleChannelInboundHandler[ByteBuf] with NettyHelpers { - - final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = { - onMessage(ctx, msg) - } +private[netty] trait NettyServerHelpers extends NettyChannelHandlerAdapter - @nowarn("msg=deprecated") - final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { - transformException(ctx, cause) - } - - final override def channelActive(ctx: ChannelHandlerContext): Unit = { - onOpen(ctx) - onConnect(ctx) - } - - final override def channelInactive(ctx: ChannelHandlerContext): Unit = { - onDisconnect(ctx) - } -} +/** + * INTERNAL API + */ +private[netty] trait NettyClientHelpers extends NettyChannelHandlerAdapter diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala index 57edc7ed3b1..1d93406b5bd 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala @@ -14,14 +14,16 @@ package org.apache.pekko.remote.transport.netty import scala.annotation.nowarn + import com.typesafe.config.Config -import io.netty.channel.Channel -import io.netty.handler.ssl.SslHandler -import io.netty.util.concurrent.Future import org.apache.pekko import pekko.japi.Util._ import pekko.util.ccompat._ +import io.netty.channel.Channel +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.Future + /** * INTERNAL API */ diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index 9c05f9e2968..c331bd5938f 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -16,11 +16,13 @@ package org.apache.pekko.remote.transport.netty import java.net.{ InetAddress, InetSocketAddress, SocketAddress } import java.util.concurrent.CancellationException import java.util.concurrent.atomic.AtomicInteger + import scala.annotation.nowarn import scala.concurrent.{ blocking, ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.util.Try import scala.util.control.{ NoStackTrace, NonFatal } + import com.typesafe.config.Config import org.apache.pekko import pekko.ConfigurationException @@ -30,11 +32,13 @@ import pekko.dispatch.ThreadPoolConfig import pekko.event.Logging import pekko.remote.RARP import pekko.remote.transport.{ AssociationHandle, Transport } +import pekko.remote.transport.AssociationHandle.HandleEventListener +import pekko.remote.transport.Transport._ +import pekko.util.Helpers.Requiring import pekko.util.{ Helpers, OptionVal } -import AssociationHandle.HandleEventListener -import Transport._ -import Helpers.Requiring + import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap } +import io.netty.buffer.Unpooled import io.netty.channel.{ Channel, ChannelFuture, @@ -44,7 +48,6 @@ import io.netty.channel.{ ChannelPipeline } import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatchers, DefaultChannelGroup } -import io.netty.buffer.Unpooled import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index b09d1cd6cbc..8d8fab344a1 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -13,11 +13,11 @@ package org.apache.pekko.remote.transport.netty -import io.netty.channel.{ Channel, ChannelHandlerContext } - import java.net.InetSocketAddress -import scala.concurrent.{ Future, Promise } + import scala.annotation.nowarn +import scala.concurrent.{ Future, Promise } + import org.apache.pekko import pekko.actor.Address import pekko.event.LoggingAdapter @@ -25,7 +25,9 @@ import pekko.remote.transport.AssociationHandle import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, HandleEventListener, InboundPayload } import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString + import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled } +import io.netty.channel.{ Channel, ChannelHandlerContext } import io.netty.util.AttributeKey private[remote] object TcpHandlers { @@ -50,7 +52,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { override def onDisconnect(ctx: ChannelHandlerContext): Unit = { notifyListener(ctx.channel(), Disassociated(AssociationHandle.Unknown)) - log.debug("Remote connection to [{}] was disconnected because of {}", ctx.channel().remoteAddress()) + log.debug("Remote connection to [{}] was disconnected.", ctx.channel().remoteAddress()) } override def onMessage(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {