Skip to content

Commit

Permalink
=remote Make ues of Netty's default resolver.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 18, 2023
1 parent efaa82c commit 62bf7cb
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,24 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering

// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
// Keep this for binary compatibility reasons
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
case Address(_, _, Some(host), Some(port)) =>
Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
Future {
blocking {
new InetSocketAddress(InetAddress.getByName(host), port)
}
}
case _ =>
Future.failed(new IllegalArgumentException(s"Address [$addr] must contain both host and port information."))
}

override def listen: Future[(Address, Promise[AssociationEventListener])] = {
@nowarn("msg=deprecated")
val bindPort = settings.BindPortSelector

for {
address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort))
} yield {
Future.fromTry(Try {
try {
val newServerChannel = inboundBootstrap.bind(address).sync().channel()

val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel()
// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)
Expand Down Expand Up @@ -511,28 +513,34 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
} catch {
case NonFatal(e) => {
log.error("failed to bind to {}, shutting down Netty transport", address)
case NonFatal(e) =>
log.error("failed to bind to host:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort)
try {
shutdown()
} catch { case NonFatal(_) => } // ignore possible exception during shutdown
throw e
}
} catch {
case NonFatal(_) =>
} // ignore possible exception during shutdown
throw e;
}
}
})
}

// Need to do like this for binary compatibility reasons
private[pekko] def boundAddress = boundTo

private def extractHostAndPort(addr: Address): (String, Int) = addr match {
case Address(_, _, Some(host), Some(port)) => (host, port)
case _ => throw new IllegalArgumentException(s"Address [$addr] must contain both host and port information.")
}

override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound"))
else {
val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress)

(for {
socketAddress <- addressToSocketAddress(remoteAddress)
readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel =>
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
Expand Down

0 comments on commit 62bf7cb

Please sign in to comment.