Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate channel_updates in auditDb when restarting #1918

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
log.info("refreshing channel_update due to configuration changes old={} new={}", normal.channelUpdate, candidateChannelUpdate)
candidateChannelUpdate
}

val hasChanged = !Announcements.areSameWithoutFlags(candidateChannelUpdate, normal.channelUpdate)
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, hasChanged, normal.commitments))
thomash-acinq marked this conversation as resolved.
Show resolved Hide resolved

// we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
// we take into account the date of the last update so that we don't send superfluous updates when we restart the app
val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp)
Expand Down Expand Up @@ -1879,25 +1883,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => ()
}

val previousChannelUpdate_opt = stateData match {
case data: DATA_NORMAL => Some(data.channelUpdate)
case _ => None
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
// LocalChannelUpdate is already published when restoring the channel
Copy link
Member

@t-bast t-bast Aug 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferred having it here, it was clearer when it was explicitly done in the transition phases...we've spent a lot of time ensuring the retransmit logic worked properly, I'd avoid changing that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand but the problem is that at this place we don't have enough information to know if the channel update changed or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a channelUpdateBeforeRestore_opt in DATA_NORMAL so that we have access to it in the transition function and put everything back there.

Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments))
val hasChanged = stateData match {
case data: DATA_NORMAL => !Announcements.areSameWithoutFlags(data.channelUpdate, normal.channelUpdate)
case _ => true
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, hasChanged, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
// in any other case (e.g. OFFLINE->SYNCING) we do nothing
()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, tempora

case class ShortChannelIdAssigned(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, previousShortChannelId: Option[ShortChannelId]) extends ChannelEvent

case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, previousChannelUpdate_opt: Option[ChannelUpdate], commitments: AbstractCommitments) extends ChannelEvent
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, hasChanged: Boolean, commitments: AbstractCommitments) extends ChannelEvent
thomash-acinq marked this conversation as resolved.
Show resolved Hide resolved

case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
channelsDb.updateChannelMeta(e.channelId, event)

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case _ => auditDb.addChannelUpdate(u)
if (u.hasChanged) {
auditDb.addChannelUpdate(u)
}

}

override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameWithoutFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0)

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package fr.acinq.eclair.channel

import akka.testkit.TestProbe
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.TestConstants.Alice
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory
import fr.acinq.eclair.crypto.Generators
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams}
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner}
import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck}
Expand Down Expand Up @@ -122,4 +125,62 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha
val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil))
Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
}

test("restore channel without configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[HasCommitments])
val oldStateData = alice.stateData.asInstanceOf[HasCommitments]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(TestConstants.Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
val u = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(!u.hasChanged)
}

test("restore channel with configuration change") { f =>
import f._
val sender = TestProbe()

// we start by storing the current state
assert(alice.stateData.isInstanceOf[HasCommitments])
val oldStateData = alice.stateData.asInstanceOf[HasCommitments]

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// we restart Alice with a different configuration
val newFees = RelayFees(765 msat, 2345)
val newConfig = TestConstants.Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees))
val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
newAlice ! INPUT_RESTORED(oldStateData)
newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit)
bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit)
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(newAlice)
awaitCond(newAlice.stateName == NORMAL)
val u = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(u.hasChanged)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ class AuditDbSpec extends AnyFunSuite {
val scid = ShortChannelId(123)
val remoteNodeId = randomKey().publicKey
val u = Announcements.makeChannelUpdate(randomBytes32(), randomKey(), remoteNodeId, scid, CltvExpiryDelta(56), 2000 msat, 1000 msat, 999, 1000000000 msat)
dbs.audit.addChannelUpdate(LocalChannelUpdate(null, channelId, scid, remoteNodeId, None, u, None, null))
dbs.audit.addChannelUpdate(LocalChannelUpdate(null, channelId, scid, remoteNodeId, None, u, true, null))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {
// we then forge a new channel_update for B-C...
val channelUpdateBC = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, nodes("B").nodeParams.privateKey, nodes("C").nodeParams.nodeId, shortIdBC, nodes("B").nodeParams.expiryDelta + 1, nodes("C").nodeParams.htlcMinimum, nodes("B").nodeParams.relayParams.publicChannelFees.feeBase, nodes("B").nodeParams.relayParams.publicChannelFees.feeProportionalMillionths, 500000000 msat)
// ...and notify B's relayer
nodes("B").system.eventStream.publish(LocalChannelUpdate(system.deadLetters, commitmentBC.channelId, shortIdBC, commitmentBC.remoteParams.nodeId, None, channelUpdateBC, None, commitmentBC))
nodes("B").system.eventStream.publish(LocalChannelUpdate(system.deadLetters, commitmentBC.channelId, shortIdBC, commitmentBC.remoteParams.nodeId, None, channelUpdateBC, true, commitmentBC))
// we retrieve a payment hash from D
val amountMsat = 4200000.msat
sender.send(nodes("D").paymentHandler, ReceivePayment(Some(amountMsat), "1 coffee"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val channelId = randomBytes32()
val update = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, randomKey(), remoteNodeId, shortChannelId, CltvExpiryDelta(10), 100 msat, 1000 msat, 100, capacity.toMilliSatoshi)
val commitments = PaymentPacketSpec.makeCommitments(ByteVector32.Zeroes, availableBalanceForSend, testCapacity = capacity)
LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, None, commitments)
LocalChannelUpdate(null, channelId, shortChannelId, remoteNodeId, None, update, true, commitments)
}

val (a, b) = (randomKey().publicKey, randomKey().publicKey)
Expand Down Expand Up @@ -426,8 +426,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
channels
}

channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, None, makeCommitments(channelId_ab, -2000 msat, 300000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, None, makeCommitments(channelId_bc, 400000 msat, -5000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, true, makeCommitments(channelId_ab, -2000 msat, 300000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, true, makeCommitments(channelId_bc, 400000 msat, -5000 msat)))

val channels1 = getOutgoingChannels(true)
assert(channels1.size === 2)
Expand All @@ -445,13 +445,13 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
val channels3 = getOutgoingChannels(true)
assert(channels3.size === 1 && channels3.head.commitments.availableBalanceForSend === 100000.msat)

channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab.copy(channelFlags = 2), None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab.copy(channelFlags = 2), true, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels4 = getOutgoingChannels(true)
assert(channels4.isEmpty)
val channels5 = getOutgoingChannels(false)
assert(channels5.size === 1)

channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, channelUpdate_ab.shortChannelId, a, None, channelUpdate_ab, true, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels6 = getOutgoingChannels(true)
assert(channels6.size === 1)

Expand All @@ -461,7 +461,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a
assert(channels7.isEmpty)

// We should receive the updated channel update containing the new shortChannelId:
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), None, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
channelRelayer ! WrappedLocalChannelUpdate(LocalChannelUpdate(null, channelId_ab, ShortChannelId(42), a, None, channelUpdate_ab.copy(shortChannelId = ShortChannelId(42)), true, makeCommitments(channelId_ab, 100000 msat, 200000 msat)))
val channels8 = getOutgoingChannels(true)
assert(channels8.size === 1)
assert(channels8.head.channelUpdate.shortChannelId === ShortChannelId(42))
Expand Down Expand Up @@ -494,6 +494,6 @@ object ChannelRelayerSpec {
val channelId = channelIds(shortChannelId)
val update = ChannelUpdate(ByteVector64(randomBytes(64)), Block.RegtestGenesisBlock.hash, shortChannelId, 0, 1, Announcements.makeChannelFlags(isNode1 = true, enabled), CltvExpiryDelta(100), htlcMinimum, 1000 msat, 100, Some(capacity.toMilliSatoshi))
val commitments = PaymentPacketSpec.makeCommitments(channelId, testAvailableBalanceForSend = balance, testCapacity = capacity)
LocalChannelUpdate(null, channelId, shortChannelId, outgoingNodeId, None, update, None, commitments)
LocalChannelUpdate(null, channelId, shortChannelId, outgoingNodeId, None, update, true, commitments)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat
assert(sender.expectMessageType[Relayer.OutgoingChannels].channels.isEmpty)

// We publish a channel update, that should be picked up by the channel relayer
system.eventStream ! EventStream.Publish(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, None, makeCommitments(channelId_bc)))
system.eventStream ! EventStream.Publish(LocalChannelUpdate(null, channelId_bc, channelUpdate_bc.shortChannelId, c, None, channelUpdate_bc, true, makeCommitments(channelId_bc)))
eventually(PatienceConfiguration.Timeout(30 seconds), PatienceConfiguration.Interval(1 second)) {
childActors.channelRelayer ! ChannelRelayer.GetOutgoingChannels(sender.ref.toClassic, GetOutgoingChannels())
val channels = sender.expectMessageType[Relayer.OutgoingChannels].channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_gh))
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_hg))
// then private channels
sender.send(router, LocalChannelUpdate(sender.ref, randomBytes32(), channelId_ag, g, None, update_ag, None, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false)))
sender.send(router, LocalChannelUpdate(sender.ref, randomBytes32(), channelId_ag, g, None, update_ag, true, CommitmentsSpec.makeCommitments(30000000 msat, 8000000 msat, a, g, announceChannel = false)))
// watcher receives the get tx requests
assert(watcher.expectMsgType[ValidateRequest].ann === chan_ab)
assert(watcher.expectMsgType[ValidateRequest].ann === chan_bc)
Expand Down
Loading