Skip to content

Commit

Permalink
=remote Make ues of Netty's default resolver.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 16, 2023
1 parent ee8b992 commit c380a4e
Showing 1 changed file with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,23 +462,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA

override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering

// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
case Address(_, _, Some(host), Some(port)) =>
Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
}

override def listen: Future[(Address, Promise[AssociationEventListener])] = {
@nowarn("msg=deprecated")
val bindPort = settings.BindPortSelector

for {
address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort))
_ <- Future.unit
} yield {
try {
val newServerChannel = inboundBootstrap.bind(address).sync().channel()

val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel()
// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)
Expand Down Expand Up @@ -512,7 +504,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
}
} catch {
case NonFatal(e) => {
log.error("failed to bind to {}, shutting down Netty transport", address)
log.error("failed to bind to addr:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort)
try {
shutdown()
} catch { case NonFatal(_) => } // ignore possible exception during shutdown
Expand All @@ -525,14 +517,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
// Need to do like this for binary compatibility reasons
private[pekko] def boundAddress = boundTo

def extractHostAndPort(addr: Address): (String, Int) = addr match {
case Address(_, _, Some(host), Some(port)) => (host, port)
case _ => throw new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")
}

override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound"))
else {
val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress)

(for {
socketAddress <- addressToSocketAddress(remoteAddress)
readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel =>
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
Expand Down

0 comments on commit c380a4e

Please sign in to comment.