From a92c2d151576552038fc3c34e40b2caf071712f1 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 11 Sep 2023 21:47:06 +0800 Subject: [PATCH] Reduce allocation in ChannelLocalActor. --- .../netty4.backwards.excludes | 2 ++ .../remote/transport/netty/TcpSupport.scala | 33 +++++++------------ 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes b/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes index 59de2d77521..7803c7b8521 100644 --- a/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes +++ b/remote/src/main/mima-filters/1.1.x.backwards.excludes/netty4.backwards.excludes @@ -1,2 +1,4 @@ #migrate the classic transport to Netty4 ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.transport.netty.NettyFutureBridge.apply") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.remote.transport.netty.ChannelLocalActor$") \ No newline at end of file diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala index facdeee1cc9..b09d1cd6cbc 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/TcpSupport.scala @@ -26,24 +26,10 @@ import pekko.remote.transport.AssociationHandle.{ Disassociated, HandleEvent, Ha import pekko.remote.transport.Transport.AssociationEventListener import pekko.util.ByteString import io.netty.buffer.{ ByteBuf, ByteBufUtil, Unpooled } +import io.netty.util.AttributeKey -import java.util.concurrent.ConcurrentHashMap - -/** - * INTERNAL API - */ -private[remote] object ChannelLocalActor { - private val map = new ConcurrentHashMap[Channel, Option[HandleEventListener]]() - - def notifyListener(channel: Channel, msg: HandleEvent): Unit = { - map.getOrDefault(channel, None).foreach { - _.notify(msg) - } - } - - def set(channel: Channel, listener: Option[HandleEventListener]): Unit = { - map.put(channel, listener) - } +private[remote] object TcpHandlers { + private val LISTENER = AttributeKey.valueOf[HandleEventListener]("listener") } /** @@ -52,14 +38,12 @@ private[remote] object ChannelLocalActor { @nowarn("msg=deprecated") private[remote] trait TcpHandlers extends CommonHandlers { protected def log: LoggingAdapter - - import ChannelLocalActor._ + import TcpHandlers._ override def registerListener( channel: Channel, listener: HandleEventListener, - remoteSocketAddress: InetSocketAddress): Unit = - ChannelLocalActor.set(channel, Some(listener)) + remoteSocketAddress: InetSocketAddress): Unit = channel.attr(LISTENER).set(listener) override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) @@ -79,6 +63,13 @@ private[remote] trait TcpHandlers extends CommonHandlers { log.warning("Remote connection to [{}] failed with {}", ctx.channel().remoteAddress(), e.getCause) ctx.channel().close() // No graceful close here } + + private def notifyListener(channel: Channel, event: HandleEvent): Unit = { + val listener = channel.attr(LISTENER).get() + if (listener ne null) { + listener.notify(event) + } + } } /**