Skip to content

Commit

Permalink
=remote Make ues of Netty's default resolver.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 16, 2023
1 parent ee8b992 commit ebf6cc5
Showing 1 changed file with 49 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,77 +462,72 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering

// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
case Address(_, _, Some(host), Some(port)) =>
Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
}

override def listen: Future[(Address, Promise[AssociationEventListener])] = {
@nowarn("msg=deprecated")
val bindPort = settings.BindPortSelector

for {
address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort))
} yield {
try {
val newServerChannel = inboundBootstrap.bind(address).sync().channel()

// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)

serverChannel = newServerChannel

@nowarn("msg=deprecated")
val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector)

addressFromSocketAddress(
newServerChannel.localAddress(),
schemeIdentifier,
system.name,
Some(settings.Hostname),
port) match {
case Some(address) =>
addressFromSocketAddress(newServerChannel.localAddress(), schemeIdentifier, system.name, None,
None) match {
case Some(address) => boundTo = address
case None =>
throw new NettyTransportException(
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
associationListenerPromise.future.foreach { _ =>
newServerChannel.config().setAutoRead(true)
}
(address, associationListenerPromise)
case None =>
throw new NettyTransportException(
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
} catch {
case NonFatal(e) => {
log.error("failed to bind to {}, shutting down Netty transport", address)
try {
shutdown()
} catch { case NonFatal(_) => } // ignore possible exception during shutdown
throw e
}
try {
val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel()
// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)

serverChannel = newServerChannel

@nowarn("msg=deprecated")
val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector)

addressFromSocketAddress(
newServerChannel.localAddress(),
schemeIdentifier,
system.name,
Some(settings.Hostname),
port) match {
case Some(address) =>
addressFromSocketAddress(newServerChannel.localAddress(), schemeIdentifier, system.name, None,
None) match {
case Some(address) => boundTo = address
case None =>
throw new NettyTransportException(
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
associationListenerPromise.future.foreach { _ =>
newServerChannel.config().setAutoRead(true)
}
Future.successful((address, associationListenerPromise))
case None =>
throw new NettyTransportException(
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
} catch {
case NonFatal(e) => {
log.error("failed to bind to host:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort)
try {
shutdown()
} catch {
case NonFatal(_) =>
} // ignore possible exception during shutdown
throw e
}
}
}

// Need to do like this for binary compatibility reasons
private[pekko] def boundAddress = boundTo

private def extractHostAndPort(addr: Address): (String, Int) = addr match {
case Address(_, _, Some(host), Some(port)) => (host, port)
case _ => throw new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")
}

override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound"))
else {
val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress)

(for {
socketAddress <- addressToSocketAddress(remoteAddress)
readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel =>
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
Expand Down

0 comments on commit ebf6cc5

Please sign in to comment.