diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala index ec9b7ca40ce..57edc7ed3b1 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettySSLSupport.scala @@ -17,7 +17,7 @@ import scala.annotation.nowarn import com.typesafe.config.Config import io.netty.channel.Channel import io.netty.handler.ssl.SslHandler -import io.netty.util.concurrent.{ Future, GenericFutureListener } +import io.netty.util.concurrent.Future import org.apache.pekko import pekko.japi.Util._ import pekko.util.ccompat._ @@ -66,11 +66,9 @@ private[pekko] object NettySSLSupport { if (isClient) sslEngineProvider.createClientSSLEngine() else sslEngineProvider.createServerSSLEngine() val handler = new SslHandler(sslEngine) - handler.handshakeFuture().addListener(new GenericFutureListener[Future[Channel]] { - override def operationComplete(future: Future[Channel]): Unit = { - if (!future.isSuccess) { - handler.closeOutbound().channel().close() - } + handler.handshakeFuture().addListener((future: Future[Channel]) => { + if (!future.isSuccess) { + handler.closeOutbound().channel().close() } }) handler diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index e6848c798dd..9c05f9e2968 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -38,19 +38,12 @@ import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap } import io.netty.channel.{ Channel, ChannelFuture, - ChannelFutureListener, ChannelHandlerContext, ChannelInitializer, ChannelOption, ChannelPipeline } -import io.netty.channel.group.{ - ChannelGroup, - ChannelGroupFuture, - ChannelGroupFutureListener, - ChannelMatchers, - DefaultChannelGroup -} +import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatchers, DefaultChannelGroup } import io.netty.buffer.Unpooled import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel @@ -63,34 +56,30 @@ import io.netty.util.concurrent.GlobalEventExecutor object NettyFutureBridge { def apply(nettyFuture: ChannelFuture): Future[Channel] = { val p = Promise[Channel]() - nettyFuture.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = - p.complete( - Try( - if (future.isSuccess) future.channel() - else if (future.isCancelled) throw new CancellationException - else throw future.cause())) - }) + nettyFuture.addListener((future: ChannelFuture) => + p.complete( + Try( + if (future.isSuccess) future.channel() + else if (future.isCancelled) throw new CancellationException + else throw future.cause()))) p.future } def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = { import pekko.util.ccompat.JavaConverters._ val p = Promise[ChannelGroup]() - nettyFuture.addListener(new ChannelGroupFutureListener { - def operationComplete(future: ChannelGroupFuture): Unit = - p.complete( - Try( - if (future.isSuccess) future.group() - else - throw future.iterator.asScala - .collectFirst { - case f if f.isCancelled => new CancellationException - case f if !f.isSuccess => f.cause() - } - .getOrElse(new IllegalStateException( - "Error reported in ChannelGroupFuture, but no error found in individual futures.")))) - }) + nettyFuture.addListener((future: ChannelGroupFuture) => + p.complete( + Try( + if (future.isSuccess) future.group() + else + throw future.iterator.asScala + .collectFirst { + case f if f.isCancelled => new CancellationException + case f if !f.isSuccess => f.cause() + } + .getOrElse(new IllegalStateException( + "Error reported in ChannelGroupFuture, but no error found in individual futures."))))) p.future } } @@ -271,7 +260,7 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty extends NettyClientHelpers with CommonHandlers { final protected val statusPromise = Promise[AssociationHandle]() - def statusFuture = statusPromise.future + def statusFuture: Future[AssociationHandle] = statusPromise.future final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress): Unit = { init(channel, remoteSocketAddress, remoteAddress)(statusPromise.success) @@ -287,7 +276,7 @@ private[transport] object NettyTransport { val FrameLengthFieldLength = 4 def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = { @nowarn("msg=deprecated") - def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.channel() } + def always(c: ChannelFuture): Future[Channel] = NettyFutureBridge(c).recover { case _ => c.channel() } for { _ <- always { channel.writeAndFlush(Unpooled.EMPTY_BUFFER) } // Force flush by waiting on a final dummy write _ <- always { channel.disconnect() } @@ -404,23 +393,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } - private val serverPipelineInitializer: ChannelInitializer[SocketChannel] = new ChannelInitializer[SocketChannel] { - override def initChannel(ch: SocketChannel): Unit = { - val pipeline = newPipeline(ch) - if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false)) - val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log) - pipeline.addLast("ServerHandler", handler) - } + private val serverPipelineInitializer: ChannelInitializer[SocketChannel] = (ch: SocketChannel) => { + val pipeline = newPipeline(ch) + if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false)) + val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log) + pipeline.addLast("ServerHandler", handler) } private def clientPipelineInitializer(remoteAddress: Address): ChannelInitializer[SocketChannel] = - new ChannelInitializer[SocketChannel] { - override def initChannel(ch: SocketChannel): Unit = { - val pipeline = newPipeline(ch) - if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) - val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log) - pipeline.addLast("clienthandler", handler) - } + (ch: SocketChannel) => { + val pipeline = newPipeline(ch) + if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) + val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log) + pipeline.addLast("clienthandler", handler) } private val inboundBootstrap: ServerBootstrap = { @@ -569,7 +554,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } override def shutdown(): Future[Boolean] = { - def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ => true).recover { case _ => false } + def always(c: ChannelGroupFuture): Future[Boolean] = NettyFutureBridge(c).map(_ => true).recover { case _ => false } for { // Force flush by trying to write an empty buffer and wait for success unbindStatus <- always(channelGroup.close(ChannelMatchers.isServerChannel))