Skip to content

Commit

Permalink
publish transactions during transitions
Browse files Browse the repository at this point in the history
Follow up to #1082.

The goal is to be able to publish transactions only after we have
persisted the state. Otherwise we may run into corner cases like [1]
where a refund tx has been published, but we haven't kept track of it
and generate a different one (with different fees) the next time.

As a side effect, we can now remove the special case that we were
doing when publishing the funding tx, and remove the `store` function.

NB: the new `calling` transition method isn't restricted to publishing
transactions but that is the only use case for now.

[1] ACINQ/eclair-mobile#206
  • Loading branch information
pm47 committed Jul 25, 2019
1 parent e62adf2 commit 57b6b73
Showing 1 changed file with 44 additions and 56 deletions.
100 changes: 44 additions & 56 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -467,24 +467,26 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val now = Platform.currentTime.milliseconds.toSeconds
context.system.eventStream.publish(ChannelSignatureReceived(self, commitments))
log.info(s"publishing funding tx for channelId=$channelId fundingTxid=${commitInput.outPoint.txid}")
// we do this to make sure that the channel state has been written to disk when we publish the funding tx
val nextState = store(DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), now, None, Left(fundingCreated)))
blockchain ! WatchSpent(self, commitments.commitInput.outPoint.txid, commitments.commitInput.outPoint.index.toInt, commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT) // TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchConfirmed(self, commitments.commitInput.outPoint.txid, commitments.commitInput.txOut.publicKeyScript, nodeParams.minDepthBlocks, BITCOIN_FUNDING_DEPTHOK)
log.info(s"committing txid=${fundingTx.txid}")
wallet.commit(fundingTx).onComplete {
case Success(true) =>
// NB: funding tx isn't confirmed at this point, so technically we didn't really pay the network fee yet, so this is a (fair) approximation
feePaid(fundingTxFee, fundingTx, "funding", commitments.channelId)
replyToUser(Right(s"created channel $channelId"))
case Success(false) =>
replyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx"))))
self ! BITCOIN_FUNDING_PUBLISH_FAILED // fail-fast: this should be returned only when we are really sure the tx has *not* been published
case Failure(t) =>
replyToUser(Left(LocalError(t)))
log.error(t, s"error while committing funding tx: ") // tx may still have been published, can't fail-fast
// we will publish the funding tx only after the channel state has been written to disk because we want to
// make sure we first persist the commitment that returns back the funds to us in case of problem
def publishFundingTx(): Unit = {
wallet.commit(fundingTx).onComplete {
case Success(true) =>
// NB: funding tx isn't confirmed at this point, so technically we didn't really pay the network fee yet, so this is a (fair) approximation
feePaid(fundingTxFee, fundingTx, "funding", commitments.channelId)
replyToUser(Right(s"created channel $channelId"))
case Success(false) =>
replyToUser(Left(LocalError(new RuntimeException("couldn't publish funding tx"))))
self ! BITCOIN_FUNDING_PUBLISH_FAILED // fail-fast: this should be returned only when we are really sure the tx has *not* been published
case Failure(t) =>
replyToUser(Left(LocalError(t)))
log.error(t, s"error while committing funding tx: ") // tx may still have been published, can't fail-fast
}
}
goto(WAIT_FOR_FUNDING_CONFIRMED) using nextState
goto(WAIT_FOR_FUNDING_CONFIRMED) using DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments, Some(fundingTx), now, None, Left(fundingCreated)) storing() calling(publishFundingTx)
}

case Event(CMD_CLOSE(_) | CMD_FORCECLOSE, d: DATA_WAIT_FOR_FUNDING_SIGNED) =>
Expand Down Expand Up @@ -1210,25 +1212,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
Try(Commitments.sendFulfill(d.commitments, c)) match {
case Success((commitments1, _)) =>
log.info(s"got valid payment preimage, recalculating transactions to redeem the corresponding htlc on-chain")
val localCommitPublished1 = d.localCommitPublished.map {
localCommitPublished =>
val localCommitPublished1 = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx)
doPublish(localCommitPublished1)
localCommitPublished1
}
val remoteCommitPublished1 = d.remoteCommitPublished.map {
remoteCommitPublished =>
val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx)
doPublish(remoteCommitPublished1)
remoteCommitPublished1
}
val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map {
remoteCommitPublished =>
val remoteCommitPublished1 = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx)
doPublish(remoteCommitPublished1)
remoteCommitPublished1
val localCommitPublished1 = d.localCommitPublished.map(localCommitPublished => Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, commitments1, localCommitPublished.commitTx))
val remoteCommitPublished1 = d.remoteCommitPublished.map(remoteCommitPublished => Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx))
val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map(remoteCommitPublished => Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, commitments1, commitments1.remoteCommit, remoteCommitPublished.commitTx))
def republish(): Unit = {
localCommitPublished1.foreach(doPublish)
remoteCommitPublished1.foreach(doPublish)
nextRemoteCommitPublished1.foreach(doPublish)
}
stay using d.copy(commitments = commitments1, localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1) storing()
stay using d.copy(commitments = commitments1, localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1) storing() calling(republish)
case Failure(cause) => handleCommandError(cause, c)
}

Expand Down Expand Up @@ -1900,13 +1892,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def handleMutualClose(closingTx: Transaction, d: Either[DATA_NEGOTIATING, DATA_CLOSING]) = {
log.info(s"closing tx published: closingTxId=${closingTx.txid}")

doPublish(closingTx)

val nextData = d match {
case Left(negotiating) => DATA_CLOSING(negotiating.commitments, fundingTx = None, waitingSince = now, negotiating.closingTxProposed.flatten.map(_.unsignedTx), mutualClosePublished = closingTx :: Nil)
case Right(closing) => closing.copy(mutualClosePublished = closing.mutualClosePublished :+ closingTx)
}
goto(CLOSING) using nextData storing()

goto(CLOSING) using nextData storing() calling doPublish(closingTx)
}

def doPublish(closingTx: Transaction): Unit = {
Expand All @@ -1929,7 +1920,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val commitTx = d.commitments.localCommit.publishableTxs.commitTx.tx

val localCommitPublished = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, d.commitments, commitTx)
doPublish(localCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(localCommitPublished = Some(localCommitPublished))
Expand All @@ -1938,7 +1928,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, localCommitPublished = Some(localCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(localCommitPublished))
}
}

Expand Down Expand Up @@ -1994,7 +1984,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
require(commitTx.txid == d.commitments.remoteCommit.txid, "txid mismatch")

val remoteCommitPublished = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, d.commitments, d.commitments.remoteCommit, commitTx)
doPublish(remoteCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(remoteCommitPublished = Some(remoteCommitPublished))
Expand All @@ -2003,7 +1992,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, remoteCommitPublished = Some(remoteCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def handleRemoteSpentFuture(commitTx: Transaction, d: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT) = {
Expand All @@ -2013,8 +2002,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val remoteCommitPublished = Helpers.Closing.claimRemoteCommitMainOutput(keyManager, d.commitments, remotePerCommitmentPoint, commitTx)
val nextData = DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, Nil, futureRemoteCommitPublished = Some(remoteCommitPublished))

doPublish(remoteCommitPublished)
goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def handleRemoteSpentNext(commitTx: Transaction, d: HasCommitments) = {
Expand All @@ -2024,7 +2012,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
require(commitTx.txid == remoteCommit.txid, "txid mismatch")

val remoteCommitPublished = Helpers.Closing.claimRemoteCommitTxOutputs(keyManager, d.commitments, remoteCommit, commitTx)
doPublish(remoteCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(nextRemoteCommitPublished = Some(remoteCommitPublished))
Expand All @@ -2033,7 +2020,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, nextRemoteCommitPublished = Some(remoteCommitPublished))
}

goto(CLOSING) using nextData storing()
goto(CLOSING) using nextData storing() calling(doPublish(remoteCommitPublished))
}

def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
Expand Down Expand Up @@ -2062,15 +2049,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val exc = FundingTxSpent(d.channelId, tx)
val error = Error(d.channelId, exc.getMessage)

doPublish(revokedCommitPublished)

val nextData = d match {
case closing: DATA_CLOSING => closing.copy(revokedCommitPublished = closing.revokedCommitPublished :+ revokedCommitPublished)
case negotiating: DATA_NEGOTIATING => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, negotiating.closingTxProposed.flatten.map(_.unsignedTx), revokedCommitPublished = revokedCommitPublished :: Nil)
// NB: if there is a revoked commitment, we can't be in DATA_WAIT_FOR_FUNDING_CONFIRMED so we don't have the case where fundingTx is defined
case _ => DATA_CLOSING(d.commitments, fundingTx = None, waitingSince = now, mutualCloseProposed = Nil, revokedCommitPublished = revokedCommitPublished :: Nil)
}
goto(CLOSING) using nextData storing() sending error
goto(CLOSING) using nextData storing() calling(doPublish(revokedCommitPublished)) sending error
case None =>
// the published tx was neither their current commitment nor a revoked one
log.error(s"couldn't identify txid=${tx.txid}, something very bad is going on!!!")
Expand Down Expand Up @@ -2104,9 +2089,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// let's try to spend our current local tx
val commitTx = d.commitments.localCommit.publishableTxs.commitTx.tx
val localCommitPublished = Helpers.Closing.claimCurrentLocalCommitTxOutputs(keyManager, d.commitments, commitTx)
doPublish(localCommitPublished)

goto(ERR_INFORMATION_LEAK) sending error
goto(ERR_INFORMATION_LEAK) calling(doPublish(localCommitPublished)) sending error
}

def handleSync(channelReestablish: ChannelReestablish, d: HasCommitments): Commitments = {
Expand Down Expand Up @@ -2215,21 +2199,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.eventStream.publish(NetworkFeePaid(self, remoteNodeId, channelId, tx, fee, desc))
}

def store(d: HasCommitments) = {
log.debug(s"updating database record for channelId={}", d.channelId)
nodeParams.db.channels.addOrUpdateChannel(d)
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
d
}

implicit def state2mystate(state: FSM.State[fr.acinq.eclair.channel.State, Data]): MyState = MyState(state)

case class MyState(state: FSM.State[fr.acinq.eclair.channel.State, Data]) {

def storing(): FSM.State[fr.acinq.eclair.channel.State, Data] = {
state.stateData match {
case d: HasCommitments =>
store(d)
log.debug(s"updating database record for channelId={}", d.channelId)
nodeParams.db.channels.addOrUpdateChannel(d)
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
state
case _ =>
log.error(s"can't store data=${state.stateData} in state=${state.stateName}")
Expand All @@ -2247,6 +2226,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
state
}

/**
* This method allows performing actions during the transition, e.g. after a call to [[MyState.storing]]. This is
* particularly useful to publish transactions only after we are sure that the state has been persisted.
*/
def calling(f: => Unit): FSM.State[fr.acinq.eclair.channel.State, Data] = {
f
state
}

}

def now = Platform.currentTime.milliseconds.toSeconds
Expand Down

0 comments on commit 57b6b73

Please sign in to comment.