Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only sync with channel peers #1587

Merged
merged 7 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{wire, _}
import scodec.bits.ByteVector

import java.net.InetSocketAddress
Expand Down Expand Up @@ -164,6 +164,10 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe

case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d: ConnectedData) if d.channels.contains(TemporaryChannelId(temporaryChannelId)) =>
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
// we have our first channel with that peer: let's sync our routing table
if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) {
d.peerConnection ! PeerConnection.DoSync(replacePrevious = false)
}
// NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151)
// we won't clean it up, but we won't remember the temporary id on channel termination
stay using d.copy(channels = d.channels + (FinalChannelId(channelId) -> channel))
Expand Down Expand Up @@ -192,6 +196,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
log.info(s"channel closed: channelId=${channelIds.mkString("/")}")
if (d.channels.values.toSet - actor == Set.empty) {
log.info(s"that was the last open channel, closing the connection")
context.system.eventStream.publish(LastChannelClosed(self, remoteNodeId))
d.peerConnection ! PeerConnection.Kill(KillReason.NoRemainingChannel)
}
stay using d.copy(channels = d.channels -- channelIds)
Expand Down
43 changes: 20 additions & 23 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
Expand All @@ -34,6 +32,7 @@ import fr.acinq.eclair.{wire, _}
import scodec.Attempt
import scodec.bits.ByteVector

import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.util.Random

Expand Down Expand Up @@ -136,29 +135,12 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
} else {
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment()
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)

d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected)

def localHasFeature(f: Feature): Boolean = d.localInit.features.hasFeature(f)

def remoteHasFeature(f: Feature): Boolean = remoteInit.features.hasFeature(f)

val canUseChannelRangeQueries = localHasFeature(Features.ChannelRangeQueries) && remoteHasFeature(Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = localHasFeature(Features.ChannelRangeQueriesExtended) && remoteHasFeature(Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
// if they support channel queries we don't send routing info yet, if they want it they will query us
// we will query them, using extended queries if supported
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
if (d.doSync) {
log.info(s"sending sync channel range query with flags_opt=$flags_opt")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, flags_opt = flags_opt)
} else {
log.info("not syncing with this peer")
}
} else if (remoteHasFeature(Features.InitialRoutingSync)) {
// "old" nodes, do as before
log.info("peer requested a full routing table dump")
router ! GetRoutingState
if (d.doSync) {
self ! DoSync(replacePrevious = true)
} else {
log.info("not syncing with this peer")
}

// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
Expand Down Expand Up @@ -358,6 +340,20 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
stay using d.copy(behavior = behavior1)

case Event(DoSync(replacePrevious), d: ConnectedData) =>
val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
log.info(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt)
} else if (d.remoteInit.features.hasFeature(Features.InitialRoutingSync) && replacePrevious) {
// For "old" nodes that don't support channel queries, we send them the full routing table
log.info("peer requested a full routing table dump")
router ! GetRoutingState
}
stay

t-bast marked this conversation as resolved.
Show resolved Hide resolved
case Event(ResumeAnnouncements, d: ConnectedData) =>
log.info(s"resuming processing of network announcements for peer")
stay using d.copy(behavior = d.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false))
Expand Down Expand Up @@ -472,6 +468,7 @@ object PeerConnection {
case object InitTimeout
case object SendPing
case object ResumeAnnouncements
case class DoSync(replacePrevious: Boolean)
// @formatter:on

val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.UnknownMessage
import fr.acinq.eclair.wire
import fr.acinq.eclair.wire.UnknownMessage

import java.net.InetSocketAddress

sealed trait PeerEvent

Expand All @@ -31,4 +31,6 @@ case class PeerConnected(peer: ActorRef, nodeId: PublicKey, connectionInfo: Conn

case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent

case class LastChannelClosed(peer: ActorRef, nodeId: PublicKey) extends PeerEvent

case class UnknownMessageReceived(peer: ActorRef, nodeId: PublicKey, message: UnknownMessage, connectionInfo: ConnectionInfo) extends PeerEvent
23 changes: 17 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,

import Switchboard._

context.system.eventStream.subscribe(self, classOf[ChannelIdAssigned])
context.system.eventStream.subscribe(self, classOf[LastChannelClosed])

// we load channels from database
{
private def initialPeersWithChannels: Set[PublicKey] = {
// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
// Closed channels will be removed, other channels will be restored.
Expand All @@ -44,12 +47,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
nodeParams.db.channels.removeChannel(c.channelId)
})

channels
.groupBy(_.commitments.remoteParams.nodeId)
.map { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) }
val peerChannels = channels.groupBy(_.commitments.remoteParams.nodeId)
peerChannels.foreach { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) }
peerChannels.keySet
}

def receive: Receive = {
def receive: Receive = normal(initialPeersWithChannels)

def normal(peersWithChannels: Set[PublicKey]): Receive = {

case Peer.Connect(publicKey, _) if publicKey == nodeParams.nodeId =>
sender ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
Expand All @@ -75,9 +80,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
// if this is an incoming connection, we might not yet have created the peer
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty)
val features = nodeParams.featuresFor(authenticated.remoteNodeId)
val doSync = nodeParams.syncWhitelist.isEmpty || nodeParams.syncWhitelist.contains(authenticated.remoteNodeId)
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && peersWithChannels.contains(authenticated.remoteNodeId))
pm47 marked this conversation as resolved.
Show resolved Hide resolved
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync)

case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId))

case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId))

case Symbol("peers") => sender ! context.children

case GetRouterPeerConf => sender ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf)
Expand Down Expand Up @@ -119,6 +129,7 @@ object Switchboard {
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"

case object GetRouterPeerConf extends RemoteTypes

case class RouterPeerConf(routerConf: RouterConf, peerConf: PeerConnection.Conf) extends RemoteTypes

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ object EclairInternalsSerializer {
("chainsHash" | bytes32) ::
("remoteNodeId" | publicKey) ::
("to" | actorRefCodec(system)) ::
("replacePrevious" | bool(8)) ::
t-bast marked this conversation as resolved.
Show resolved Hide resolved
("flags_opt" | optionQueryChannelRangeTlv)).as[SendChannelQuery]

def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = (
Expand Down
13 changes: 10 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.info(s"subscribing listener=$listener to network events")
context.system.eventStream.subscribe(listener, classOf[NetworkEvent])
context.watch(listener)

override def receive: Receive = {
case Terminated(actor) if actor == listener=>
case Terminated(actor) if actor == listener =>
log.warning(s"unsubscribing listener=$listener to network events")
context.system.eventStream.unsubscribe(listener)
context stop self
Expand Down Expand Up @@ -519,7 +520,7 @@ object Router {
// @formatter:on

// @formatter:off
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
pm47 marked this conversation as resolved.
Show resolved Hide resolved
case object GetNetworkStats
case class GetNetworkStatsResponse(stats: Option[NetworkStats])
case object GetRoutingState
Expand Down Expand Up @@ -566,7 +567,13 @@ object Router {

case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Long)

case class Syncing(pending: List[RoutingMessage], total: Int)
/**
* @param remainingQueries remaining queries to send, the next one will be popped after we receive a [[ReplyShortChannelIdsEnd]]
* @param totalQueries total number of *queries* (not channels) that will be sent during this syncing session
*/
case class Syncing(remainingQueries: List[RoutingMessage], totalQueries: Int) {
def started: Boolean = totalQueries > 0
t-bast marked this conversation as resolved.
Show resolved Hide resolved
}

case class Data(nodes: Map[PublicKey, NodeAnnouncement],
channels: SortedMap[ShortChannelId, PublicChannel],
Expand Down
Loading