From c380a4e19f9f931e80f688a4650394cc58cf8d67 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 16 Sep 2023 04:06:46 +0800 Subject: [PATCH] =remote Make ues of Netty's default resolver. --- .../transport/netty/NettyTransport.scala | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index c331bd5938f..ec59ea6101d 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -462,23 +462,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering - // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 - def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match { - case Address(_, _, Some(host), Some(port)) => - Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } } - case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")) - } - override def listen: Future[(Address, Promise[AssociationEventListener])] = { @nowarn("msg=deprecated") val bindPort = settings.BindPortSelector for { - address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) + _ <- Future.unit } yield { try { - val newServerChannel = inboundBootstrap.bind(address).sync().channel() - + val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel() // Block reads until a handler actor is registered newServerChannel.config().setAutoRead(false) channelGroup.add(newServerChannel) @@ -512,7 +504,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } } catch { case NonFatal(e) => { - log.error("failed to bind to {}, shutting down Netty transport", address) + log.error("failed to bind to addr:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort) try { shutdown() } catch { case NonFatal(_) => } // ignore possible exception during shutdown @@ -525,14 +517,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA // Need to do like this for binary compatibility reasons private[pekko] def boundAddress = boundTo + def extractHostAndPort(addr: Address): (String, Int) = addr match { + case Address(_, _, Some(host), Some(port)) => (host, port) + case _ => throw new IllegalArgumentException(s"Address [$addr] does not contain host or port information.") + } + override def associate(remoteAddress: Address): Future[AssociationHandle] = { if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound")) else { val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) (for { - socketAddress <- addressToSocketAddress(remoteAddress) - readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel => + (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress))) + readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel => if (EnableSsl) blocking { channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()