From 62bf7cbc7dd43b2540d211eee2accf7d9c4de2e2 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 16 Sep 2023 04:06:46 +0800 Subject: [PATCH] =remote Make ues of Netty's default resolver. --- .../transport/netty/NettyTransport.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index c331bd5938f..d7ff5dca8fe 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -463,22 +463,24 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 + // Keep this for binary compatibility reasons def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match { case Address(_, _, Some(host), Some(port)) => - Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } } - case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")) + Future { + blocking { + new InetSocketAddress(InetAddress.getByName(host), port) + } + } + case _ => + Future.failed(new IllegalArgumentException(s"Address [$addr] must contain both host and port information.")) } override def listen: Future[(Address, Promise[AssociationEventListener])] = { @nowarn("msg=deprecated") val bindPort = settings.BindPortSelector - - for { - address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) - } yield { + Future.fromTry(Try { try { - val newServerChannel = inboundBootstrap.bind(address).sync().channel() - + val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel() // Block reads until a handler actor is registered newServerChannel.config().setAutoRead(false) channelGroup.add(newServerChannel) @@ -511,28 +513,34 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]") } } catch { - case NonFatal(e) => { - log.error("failed to bind to {}, shutting down Netty transport", address) + case NonFatal(e) => + log.error("failed to bind to host:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort) try { shutdown() - } catch { case NonFatal(_) => } // ignore possible exception during shutdown - throw e - } + } catch { + case NonFatal(_) => + } // ignore possible exception during shutdown + throw e; } - } + }) } // Need to do like this for binary compatibility reasons private[pekko] def boundAddress = boundTo + private def extractHostAndPort(addr: Address): (String, Int) = addr match { + case Address(_, _, Some(host), Some(port)) => (host, port) + case _ => throw new IllegalArgumentException(s"Address [$addr] must contain both host and port information.") + } + override def associate(remoteAddress: Address): Future[AssociationHandle] = { if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound")) else { val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress) (for { - socketAddress <- addressToSocketAddress(remoteAddress) - readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel => + (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress))) + readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel => if (EnableSsl) blocking { channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()