Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NodeAddress everywhere instead of InetAddress #2202

Merged
merged 6 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ object NodeParams extends Logging {

def chainFromHash(chainHash: ByteVector32): String = chain2Hash.map(_.swap).getOrElse(chainHash, throw new RuntimeException(s"invalid chainHash '$chainHash'"))

def parseSocks5ProxyParams(config: Config): Option[Socks5ProxyParams] = {
if (config.getBoolean("socks5.enabled")) {
Some(Socks5ProxyParams(
address = new InetSocketAddress(config.getString("socks5.host"), config.getInt("socks5.port")),
credentials_opt = None,
randomizeCredentials = config.getBoolean("socks5.randomize-credentials"),
useForIPv4 = config.getBoolean("socks5.use-for-ipv4"),
useForIPv6 = config.getBoolean("socks5.use-for-ipv6"),
useForTor = config.getBoolean("socks5.use-for-tor"),
useForWatchdogs = config.getBoolean("socks5.use-for-watchdogs"),
))
} else {
None
}
}

def makeNodeParams(config: Config, instanceId: UUID, nodeKeyManager: NodeKeyManager, channelKeyManager: ChannelKeyManager,
torAddress_opt: Option[NodeAddress], database: Databases, blockHeight: AtomicLong, feeEstimator: FeeEstimator,
pluginParams: Seq[PluginParams] = Nil): NodeParams = {
Expand Down Expand Up @@ -302,19 +318,7 @@ object NodeParams extends Logging {

val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet

val socksProxy_opt = if (config.getBoolean("socks5.enabled")) {
Some(Socks5ProxyParams(
address = new InetSocketAddress(config.getString("socks5.host"), config.getInt("socks5.port")),
credentials_opt = None,
randomizeCredentials = config.getBoolean("socks5.randomize-credentials"),
useForIPv4 = config.getBoolean("socks5.use-for-ipv4"),
useForIPv6 = config.getBoolean("socks5.use-for-ipv6"),
useForTor = config.getBoolean("socks5.use-for-tor"),
useForWatchdogs = config.getBoolean("socks5.use-for-watchdogs"),
))
} else {
None
}
val socksProxy_opt = parseSocks5ProxyParams(config)

val publicTorAddress_opt = if (config.getBoolean("tor.publish-onion-address")) torAddress_opt else None

Expand Down
27 changes: 17 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.{Props, _}
import akka.event.Logging.MDC
import akka.io.Tcp.SO.KeepAlive
Expand All @@ -28,14 +26,16 @@ import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.tor.Socks5Connection.{Socks5Connect, Socks5Connected, Socks5Error}
import fr.acinq.eclair.tor.{Socks5Connection, Socks5ProxyParams}
import fr.acinq.eclair.wire.protocol._

import java.net.InetSocketAddress
import scala.concurrent.duration._

/**
* Created by PM on 27/10/2015.
*
*/
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging {
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteNodeAddress: NodeAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging {

import context.system

Expand All @@ -44,7 +44,14 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],

def receive: Receive = {
case Symbol("connect") =>
val (peerOrProxyAddress, proxyParams_opt) = socks5ProxyParams_opt.map(proxyParams => (proxyParams, Socks5ProxyParams.proxyAddress(remoteAddress, proxyParams))) match {
// note that there is no resolution here, it's easier plain ip addresses, or unresolved tor hostnames
pm47 marked this conversation as resolved.
Show resolved Hide resolved
val remoteAddress = remoteNodeAddress match {
case addr: IPv4 => new InetSocketAddress(addr.ipv4, addr.port)
case addr: IPv6 => new InetSocketAddress(addr.ipv6, addr.port)
case addr: Tor2 => InetSocketAddress.createUnresolved(addr.host, addr.port)
case addr: Tor3 => InetSocketAddress.createUnresolved(addr.host, addr.port)
}
val (peerOrProxyAddress, proxyParams_opt) = socks5ProxyParams_opt.map(proxyParams => (proxyParams, Socks5ProxyParams.proxyAddress(remoteNodeAddress, proxyParams))) match {
case Some((proxyParams, Some(proxyAddress))) =>
log.info(s"connecting to SOCKS5 proxy ${str(proxyAddress)}")
(proxyAddress, Some(proxyParams))
Expand All @@ -53,14 +60,14 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
(remoteAddress, None)
}
IO(Tcp) ! Tcp.Connect(peerOrProxyAddress, timeout = Some(20 seconds), options = KeepAlive(true) :: Nil, pullMode = true)
context become connecting(proxyParams_opt)
context become connecting(proxyParams_opt, remoteAddress)
}

def connecting(proxyParams: Option[Socks5ProxyParams]): Receive = {
def connecting(proxyParams: Option[Socks5ProxyParams], remoteAddress: InetSocketAddress): Receive = {
case Tcp.CommandFailed(c: Tcp.Connect) =>
val peerOrProxyAddress = c.remoteAddress
log.info(s"connection failed to ${str(peerOrProxyAddress)}")
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteAddress))
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteNodeAddress))
context stop self

case Tcp.Connected(peerOrProxyAddress, _) =>
Expand All @@ -75,7 +82,7 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
context become {
case Tcp.CommandFailed(_: Socks5Connect) =>
log.info(s"connection failed to ${str(remoteAddress)} via SOCKS5 ${str(proxyAddress)}")
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteAddress))
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteNodeAddress))
context stop self
case Socks5Connected(_) =>
log.info(s"connected to ${str(remoteAddress)} via SOCKS5 proxy ${str(proxyAddress)}")
Expand Down Expand Up @@ -127,13 +134,13 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
switchboard = switchboard,
router = router
))
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt, isPersistent = isPersistent)
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteNodeAddress, origin_opt = origin_opt, isPersistent = isPersistent)
peerConnection
}
}

object Client {

def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent))
def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: NodeAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent))

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
Expand All @@ -26,6 +24,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol.NodeAddress

class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef) extends Actor with ActorLogging {

Expand Down Expand Up @@ -57,16 +56,16 @@ class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyP
log.warning("handling outgoing connection request locally")
self forward req
case _: DeadLetter =>
// we don't care about other dead letters
// we don't care about other dead letters
}
}

object ClientSpawner {

def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef): Props = Props(new ClientSpawner(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router))

case class ConnectionRequest(address: InetSocketAddress,
remoteNodeId: PublicKey,
case class ConnectionRequest(remoteNodeId: PublicKey,
address: NodeAddress,
origin: ActorRef,
isPersistent: Boolean) extends RemoteTypes
}
7 changes: 4 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/NodeURI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package fr.acinq.eclair.io

import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.protocol.NodeAddress
import scodec.bits.ByteVector

import scala.util.{Failure, Success, Try}

case class NodeURI(nodeId: PublicKey, address: HostAndPort) {
case class NodeURI(nodeId: PublicKey, address: NodeAddress) {
override def toString: String = s"$nodeId@$address"
}

Expand All @@ -40,8 +41,8 @@ object NodeURI {
@throws[IllegalArgumentException]
def parse(uri: String): NodeURI = {
uri.split("@") match {
case Array(nodeId, address) => (Try(PublicKey(ByteVector.fromValidHex(nodeId))), Try(HostAndPort.fromString(address).withDefaultPort(DEFAULT_PORT))) match {
case (Success(pk), Success(hostAndPort)) => NodeURI(pk, hostAndPort)
case Array(nodeId, address) => (Try(PublicKey(ByteVector.fromValidHex(nodeId))), Try(HostAndPort.fromString(address)).flatMap(hostAndPort => NodeAddress.fromParts(hostAndPort.getHost, hostAndPort.getPortOrDefault(DEFAULT_PORT)))) match {
case (Success(pk), Success(nodeAddress)) => NodeURI(pk, nodeAddress)
case (Failure(_), _) => throw new IllegalArgumentException("Invalid node id")
case (_, Failure(_)) => throw new IllegalArgumentException("Invalid host:port")
}
Expand Down
12 changes: 5 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneF
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
Expand All @@ -42,7 +41,6 @@ import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector

import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext

/**
Expand Down Expand Up @@ -331,12 +329,12 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA

def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeid: $remoteNodeId != ${connectionReady.remoteNodeId}")
log.debug("got authenticated connection to address {}:{}", connectionReady.address.getHostString, connectionReady.address.getPort)
log.debug("got authenticated connection to address {}", connectionReady.address)

if (connectionReady.outgoing) {
// we store the node address upon successful outgoing connection, so we can reconnect later
// any previous address is overwritten
NodeAddress.fromParts(connectionReady.address.getHostString, connectionReady.address.getPort).map(nodeAddress => nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeAddress))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
}

// let's bring existing/requested channels online
Expand Down Expand Up @@ -445,7 +443,7 @@ object Peer {
}
case object Nothing extends Data { override def channels = Map.empty }
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
case class ConnectedData(address: InetSocketAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef]) extends Data {
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef]) extends Data {
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
def localFeatures: Features[InitFeature] = localInit.features
def remoteFeatures: Features[InitFeature] = remoteInit.features
Expand All @@ -457,7 +455,7 @@ object Peer {
case object CONNECTED extends State

case class Init(storedChannels: Set[HasCommitments])
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef, isPersistent: Boolean) {
case class Connect(nodeId: PublicKey, address_opt: Option[NodeAddress], replyTo: ActorRef, isPersistent: Boolean) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
object Connect {
Expand All @@ -476,7 +474,7 @@ object Peer {
sealed trait PeerInfoResponse {
def nodeId: PublicKey
}
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[InetSocketAddress], channels: Int) extends PeerInfoResponse
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Int) extends PeerInfoResponse
case class PeerNotFound(nodeId: PublicKey) extends PeerInfoResponse { override def toString: String = s"peer $nodeId not found" }

case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import fr.acinq.eclair.{FSMDiagnosticActorLogging, Feature, Features, InitFeatur
import scodec.Attempt
import scodec.bits.ByteVector

import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.util.Random

Expand Down Expand Up @@ -87,8 +86,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
when(AUTHENTICATING) {
case Event(TransportHandler.HandshakeCompleted(remoteNodeId), d: AuthenticatingData) =>
cancelTimer(AUTH_TIMER)
import d.pendingAuth.address
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}")
log.info(s"connection authenticated (direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"})")
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment()
switchboard ! Authenticated(self, remoteNodeId)
goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport, d.isPersistent)
Expand All @@ -104,8 +102,8 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.info(s"using features=$localFeatures")
val localInit = IPAddress(d.pendingAuth.address.getAddress, d.pendingAuth.address.getPort) match {
case Some(remoteAddress) if !d.pendingAuth.outgoing && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
val localInit = d.pendingAuth.address match {
case remoteAddress if !d.pendingAuth.outgoing && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
case _ => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
}
d.transport ! localInit
Expand All @@ -120,7 +118,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.ReadAck(remoteInit)

log.info(s"peer is using features=${remoteInit.features}, networks=${remoteInit.networks.mkString(",")}")
remoteInit.remoteAddress_opt.foreach(address => log.info("peer reports that our IP address is {} (public={})", address.socketAddress.toString, NodeAddress.isPublicIPAddress(address)))
remoteInit.remoteAddress_opt.foreach(address => log.info("peer reports that our IP address is {} (public={})", address.toString, NodeAddress.isPublicIPAddress(address)))

val featureGraphErr_opt = Features.validateFeatureGraph(remoteInit.features)
if (remoteInit.networks.nonEmpty && remoteInit.networks.intersect(d.localInit.networks).isEmpty) {
Expand Down Expand Up @@ -552,12 +550,12 @@ object PeerConnection {
case object INITIALIZING extends State
case object CONNECTED extends State

case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) {
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: NodeAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features[InitFeature], doSync: Boolean) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes

sealed trait ConnectionResult extends RemoteTypes
object ConnectionResult {
Expand All @@ -569,7 +567,7 @@ object PeerConnection {
}

case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" }
case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
case class ConnectionFailed(address: NodeAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case class AlreadyConnected(peerConnection: ActorRef, peer: ActorRef) extends ConnectionResult.Failure with HasConnection { override def toString: String = "already connected" }
Expand Down
Loading