diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index d7ff5dca8fe..f52d4540e3b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -68,6 +68,17 @@ object NettyFutureBridge { p.future } + private[transport] def apply[T](nettyFuture: io.netty.util.concurrent.Future[T]): Future[T] = { + val p = Promise[T]() + nettyFuture.addListener((future: io.netty.util.concurrent.Future[T]) => + p.complete( + Try( + if (future.isSuccess) future.get() + else if (future.isCancelled) throw new CancellationException + else throw future.cause()))) + p.future + } + def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = { import pekko.util.ccompat.JavaConverters._ val p = Promise[ChannelGroup]() @@ -540,14 +551,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA (for { (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress))) - readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel => - if (EnableSsl) - blocking { - channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly() - } - channel.config.setAutoRead(false) - channel - } + channel <- NettyFutureBridge(bootstrap.connect(host, port)) + readyChannel <- if (EnableSsl) { + NettyFutureBridge(channel.pipeline().get(classOf[SslHandler]).handshakeFuture()) + } else Future.successful(channel) handle <- readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture } yield handle).recover { case _: CancellationException => throw new NettyTransportExceptionNoStack("Connection was cancelled")