Skip to content

Commit

Permalink
use lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 9, 2023
1 parent 2517d0a commit a806ca0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.annotation.nowarn
import com.typesafe.config.Config
import io.netty.channel.Channel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.{ Future, GenericFutureListener }
import io.netty.util.concurrent.Future
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._
Expand Down Expand Up @@ -66,11 +66,9 @@ private[pekko] object NettySSLSupport {
if (isClient) sslEngineProvider.createClientSSLEngine()
else sslEngineProvider.createServerSSLEngine()
val handler = new SslHandler(sslEngine)
handler.handshakeFuture().addListener(new GenericFutureListener[Future[Channel]] {
override def operationComplete(future: Future[Channel]): Unit = {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
handler.handshakeFuture().addListener((future: Future[Channel]) => {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
})
handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,12 @@ import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.channel.{
Channel,
ChannelFuture,
ChannelFutureListener,
ChannelHandlerContext,
ChannelInitializer,
ChannelOption,
ChannelPipeline
}
import io.netty.channel.group.{
ChannelGroup,
ChannelGroupFuture,
ChannelGroupFutureListener,
ChannelMatchers,
DefaultChannelGroup
}
import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatchers, DefaultChannelGroup }
import io.netty.buffer.Unpooled
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
Expand All @@ -63,34 +56,30 @@ import io.netty.util.concurrent.GlobalEventExecutor
object NettyFutureBridge {
def apply(nettyFuture: ChannelFuture): Future[Channel] = {
val p = Promise[Channel]()
nettyFuture.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit =
p.complete(
Try(
if (future.isSuccess) future.channel()
else if (future.isCancelled) throw new CancellationException
else throw future.cause()))
})
nettyFuture.addListener((future: ChannelFuture) =>
p.complete(
Try(
if (future.isSuccess) future.channel()
else if (future.isCancelled) throw new CancellationException
else throw future.cause())))
p.future
}

def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
import pekko.util.ccompat.JavaConverters._
val p = Promise[ChannelGroup]()
nettyFuture.addListener(new ChannelGroupFutureListener {
def operationComplete(future: ChannelGroupFuture): Unit =
p.complete(
Try(
if (future.isSuccess) future.group()
else
throw future.iterator.asScala
.collectFirst {
case f if f.isCancelled => new CancellationException
case f if !f.isSuccess => f.cause()
}
.getOrElse(new IllegalStateException(
"Error reported in ChannelGroupFuture, but no error found in individual futures."))))
})
nettyFuture.addListener((future: ChannelGroupFuture) =>
p.complete(
Try(
if (future.isSuccess) future.group()
else
throw future.iterator.asScala
.collectFirst {
case f if f.isCancelled => new CancellationException
case f if !f.isSuccess => f.cause()
}
.getOrElse(new IllegalStateException(
"Error reported in ChannelGroupFuture, but no error found in individual futures.")))))
p.future
}
}
Expand Down Expand Up @@ -271,7 +260,7 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty
extends NettyClientHelpers
with CommonHandlers {
final protected val statusPromise = Promise[AssociationHandle]()
def statusFuture = statusPromise.future
def statusFuture: Future[AssociationHandle] = statusPromise.future

final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress): Unit = {
init(channel, remoteSocketAddress, remoteAddress)(statusPromise.success)
Expand All @@ -287,7 +276,7 @@ private[transport] object NettyTransport {
val FrameLengthFieldLength = 4
def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = {
@nowarn("msg=deprecated")
def always(c: ChannelFuture) = NettyFutureBridge(c).recover { case _ => c.channel() }
def always(c: ChannelFuture): Future[Channel] = NettyFutureBridge(c).recover { case _ => c.channel() }
for {
_ <- always { channel.writeAndFlush(Unpooled.EMPTY_BUFFER) } // Force flush by waiting on a final dummy write
_ <- always { channel.disconnect() }
Expand Down Expand Up @@ -404,23 +393,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

}

private val serverPipelineInitializer: ChannelInitializer[SocketChannel] = new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
val pipeline = newPipeline(ch)
if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false))
val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log)
pipeline.addLast("ServerHandler", handler)
}
private val serverPipelineInitializer: ChannelInitializer[SocketChannel] = (ch: SocketChannel) => {
val pipeline = newPipeline(ch)
if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false))
val handler = new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log)
pipeline.addLast("ServerHandler", handler)
}

private def clientPipelineInitializer(remoteAddress: Address): ChannelInitializer[SocketChannel] =
new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
val pipeline = newPipeline(ch)
if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true))
val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log)
pipeline.addLast("clienthandler", handler)
}
(ch: SocketChannel) => {
val pipeline = newPipeline(ch)
if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true))
val handler = new TcpClientHandler(NettyTransport.this, remoteAddress, log)
pipeline.addLast("clienthandler", handler)
}

private val inboundBootstrap: ServerBootstrap = {
Expand Down Expand Up @@ -569,7 +554,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
}

override def shutdown(): Future[Boolean] = {
def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ => true).recover { case _ => false }
def always(c: ChannelGroupFuture): Future[Boolean] = NettyFutureBridge(c).map(_ => true).recover { case _ => false }
for {
// Force flush by trying to write an empty buffer and wait for success
unbindStatus <- always(channelGroup.close(ChannelMatchers.isServerChannel))
Expand Down

0 comments on commit a806ca0

Please sign in to comment.