diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index c331bd5938f..728975f2789 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -462,62 +462,52 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering - // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 - def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match { - case Address(_, _, Some(host), Some(port)) => - Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } } - case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")) - } - override def listen: Future[(Address, Promise[AssociationEventListener])] = { @nowarn("msg=deprecated") val bindPort = settings.BindPortSelector - for { - address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) - } yield { - try { - val newServerChannel = inboundBootstrap.bind(address).sync().channel() - - // Block reads until a handler actor is registered - newServerChannel.config().setAutoRead(false) - channelGroup.add(newServerChannel) - - serverChannel = newServerChannel - - @nowarn("msg=deprecated") - val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector) - - addressFromSocketAddress( - newServerChannel.localAddress(), - schemeIdentifier, - system.name, - Some(settings.Hostname), - port) match { - case Some(address) => - addressFromSocketAddress(newServerChannel.localAddress(), schemeIdentifier, system.name, None, - None) match { - case Some(address) => boundTo = address - case None => - throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") - } - associationListenerPromise.future.foreach { _ => - newServerChannel.config().setAutoRead(true) - } - (address, associationListenerPromise) - case None => - throw new NettyTransportException( - s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") - } - } catch { - case NonFatal(e) => { - log.error("failed to bind to {}, shutting down Netty transport", address) - try { - shutdown() - } catch { case NonFatal(_) => } // ignore possible exception during shutdown - throw e - } + try { + val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel() + // Block reads until a handler actor is registered + newServerChannel.config().setAutoRead(false) + channelGroup.add(newServerChannel) + + serverChannel = newServerChannel + + @nowarn("msg=deprecated") + val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector) + + addressFromSocketAddress( + newServerChannel.localAddress(), + schemeIdentifier, + system.name, + Some(settings.Hostname), + port) match { + case Some(address) => + addressFromSocketAddress(newServerChannel.localAddress(), schemeIdentifier, system.name, None, + None) match { + case Some(address) => boundTo = address + case None => + throw new NettyTransportException( + s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") + } + associationListenerPromise.future.foreach { _ => + newServerChannel.config().setAutoRead(true) + } + Future.successful((address, associationListenerPromise)) + case None => + throw new NettyTransportException( + s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") + } + } catch { + case NonFatal(e) => { + log.error("failed to bind to addr:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort) + try { + shutdown() + } catch { + case NonFatal(_) => + } // ignore possible exception during shutdown + throw e } } } @@ -525,14 +515,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA // Need to do like this for binary compatibility reasons private[pekko] def boundAddress = boundTo + private def extractHostAndPort(addr: Address): (String, Int) = addr match { + case Address(_, _, Some(host), Some(port)) => (host, port) + case _ => throw new IllegalArgumentException(s"Address [$addr] does not contain host or port information.") + } + override def associate(remoteAddress: Address): Future[AssociationHandle] = { if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound")) else { val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) (for { - socketAddress <- addressToSocketAddress(remoteAddress) - readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel => + (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress))) + readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel => if (EnableSsl) blocking { channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()