diff --git a/eclair-core/pom.xml b/eclair-core/pom.xml index 037c642ff2..e4323480b1 100644 --- a/eclair-core/pom.xml +++ b/eclair-core/pom.xml @@ -205,6 +205,17 @@ guava ${guava.version} + + + io.kamon + kamon-core_${scala.version.short} + ${kamon.version} + + + io.kamon + kamon-akka_${scala.version.short} + ${kamon.version} + com.softwaremill.quicklens diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index f9345f6868..ce3f9fc2da 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -101,7 +101,7 @@ object NodeParams { ConfigFactory.parseProperties(System.getProperties) .withFallback(ConfigFactory.parseFile(new File(datadir, "eclair.conf"))) .withFallback(overrideDefaults) - .withFallback(ConfigFactory.load()).getConfig("eclair") + .withFallback(ConfigFactory.load()) def getSeed(datadir: File): ByteVector = { val seedPath = new File(datadir, "seed.dat") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala b/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala new file mode 100644 index 0000000000..47d083d9d7 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import kamon.Kamon + +import scala.concurrent.{ExecutionContext, Future} + +object KamonExt { + + def time[T](name: String)(f: => T) = { + val timer = Kamon.timer(name).withoutTags().start() + try { + f + } finally { + timer.stop() + } + } + + def timeFuture[T](name: String)(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = { + val timer = Kamon.timer(name).withoutTags().start() + val res = f + res onComplete { case _ => timer.stop } + res + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 4c770f4faf..5979599b02 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -81,7 +81,8 @@ class Setup(datadir: File, secureRandom.nextInt() datadir.mkdirs() - val config = NodeParams.loadConfiguration(datadir, overrideDefaults) + val appConfig = NodeParams.loadConfiguration(datadir, overrideDefaults) + val config = appConfig.getConfig("eclair") val seed = seed_opt.getOrElse(NodeParams.getSeed(datadir)) val chain = config.getString("chain") val chaindir = new File(datadir, chain) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala index c7b7e492da..ff277febca 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/rpc/ExtendedBitcoinClient.scala @@ -21,6 +21,7 @@ import fr.acinq.eclair.ShortChannelId.coordinates import fr.acinq.eclair.TxCoordinates import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult} import fr.acinq.eclair.wire.ChannelAnnouncement +import kamon.Kamon import org.json4s.JsonAST._ import scala.concurrent.{ExecutionContext, Future} @@ -149,26 +150,36 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) { def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = { val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId) - - for { - blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex)) - txid: String <- rpcClient.invoke("getblock", blockHash).map { - case json => Try { - val JArray(txs) = json \ "tx" - txs(txIndex).extract[String] - } getOrElse ByteVector32.Zeroes.toHex - } - tx <- getRawTransaction(txid) - unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true) - fundingTxStatus <- if (unspent) { - Future.successful(UtxoStatus.Unspent) - } else { - // if this returns true, it means that the spending tx is *not* in the blockchain - isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map { - case res => UtxoStatus.Spent(spendingTxConfirmed = !res) + val span = Kamon.spanBuilder("validate-bitcoin-client").start() + for { + _ <- Future.successful(0) + span0 = Kamon.spanBuilder("getblockhash").start() + blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex)) + _ = span0.finish() + span1 = Kamon.spanBuilder("getblock").start() + txid: String <- rpcClient.invoke("getblock", blockHash).map { + case json => Try { + val JArray(txs) = json \ "tx" + txs(txIndex).extract[String] + } getOrElse ByteVector32.Zeroes.toHex } - } - } yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus))) + _ = span1.finish() + span2 = Kamon.spanBuilder("getrawtx").start() + tx <- getRawTransaction(txid) + _ = span2.finish() + span3 = Kamon.spanBuilder("utxospendable-mempool").start() + unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true) + _ = span3.finish() + fundingTxStatus <- if (unspent) { + Future.successful(UtxoStatus.Unspent) + } else { + // if this returns true, it means that the spending tx is *not* in the blockchain + isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map { + case res => UtxoStatus.Spent(spendingTxConfirmed = !res) + } + } + _ = span.finish() + } yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus))) } recover { case t: Throwable => ValidateResult(c, Left(t)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Authenticator.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Authenticator.scala index f77b263547..2583672c2b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Authenticator.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Authenticator.scala @@ -27,6 +27,7 @@ import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted import fr.acinq.eclair.io.Authenticator.{Authenticated, AuthenticationFailed, PendingAuth} import fr.acinq.eclair.wire.LightningMessageCodecs import fr.acinq.eclair.{Logs, NodeParams} +import kamon.Kamon /** * The purpose of this class is to serve as a buffer for newly connection before they are authenticated @@ -43,6 +44,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo def ready(switchboard: ActorRef, authenticating: Map[ActorRef, PendingAuth]): Receive = { case pending@PendingAuth(connection, remoteNodeId_opt, address, _) => log.debug(s"authenticating connection to ${address.getHostString}:${address.getPort} (pending=${authenticating.size} handlers=${context.children.size})") + Kamon.counter("peers.connecting.count").withTag("state", "authenticating").increment() val transport = context.actorOf(TransportHandler.props( KeyPair(nodeParams.nodeId.value, nodeParams.privateKey.value), remoteNodeId_opt.map(_.value), @@ -56,6 +58,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo import pendingAuth.{address, remoteNodeId_opt} val outgoing = remoteNodeId_opt.isDefined log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (outgoing) "outgoing" else "incoming"}") + Kamon.counter("peers.connecting.count").withTag("state", "authenticated").increment() switchboard ! Authenticated(connection, transport, remoteNodeId, address, remoteNodeId_opt.isDefined, pendingAuth.origin_opt) context become ready(switchboard, authenticating - transport) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index ed54b12b29..fdc3509138 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -32,6 +32,7 @@ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.router._ import fr.acinq.eclair.wire._ import fr.acinq.eclair.{secureRandom, wire, _} +import kamon.Kamon import scodec.Attempt import scodec.bits.ByteVector @@ -535,6 +536,22 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A case DISCONNECTED -> _ if nodeParams.autoReconnect => cancelTimer(RECONNECT_TIMER) } + onTransition { + case _ -> CONNECTED => + Metrics.connectedPeers.increment() + context.system.eventStream.publish(PeerConnected(self, remoteNodeId)) + case CONNECTED -> DISCONNECTED => + Metrics.connectedPeers.decrement() + context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) + } + + onTermination { + case StopEvent(_, CONNECTED, d: ConnectedData) => + // the transition handler won't be fired if we go directly from CONNECTED to closed + Metrics.connectedPeers.decrement() + context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId)) + } + def createNewChannel(nodeParams: NodeParams, funder: Boolean, fundingAmount: Satoshi, origin_opt: Option[ActorRef]): (ActorRef, LocalParams) = { val defaultFinalScriptPubKey = Helpers.getFinalScriptPubKey(wallet, nodeParams.chainHash) val localParams = makeChannelParams(nodeParams, defaultFinalScriptPubKey, funder, fundingAmount) @@ -640,6 +657,12 @@ object Peer { // @formatter:on + object Metrics { + val peers = Kamon.rangeSampler("peers.count").withoutTags() + val connectedPeers = Kamon.rangeSampler("peers.connected.count").withoutTags() + val channels = Kamon.rangeSampler("channels.count").withoutTags() + } + def makeChannelParams(nodeParams: NodeParams, defaultFinalScriptPubKey: ByteVector, isFunder: Boolean, fundingAmount: Satoshi): LocalParams = { val entropy = new Array[Byte](16) secureRandom.nextBytes(entropy) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala new file mode 100644 index 0000000000..9d25a2b04a --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2019 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.io + +import akka.actor.ActorRef +import fr.acinq.bitcoin.Crypto.PublicKey + +sealed trait PeerEvent + +case class PeerConnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent + +case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala index 2c39b004fd..2d58a99d03 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala @@ -23,6 +23,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.io.Tcp.SO.KeepAlive import akka.io.{IO, Tcp} import fr.acinq.eclair.NodeParams +import kamon.Kamon import scala.concurrent.Promise @@ -52,6 +53,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke def listening(listener: ActorRef): Receive = { case Connected(remote, _) => log.info(s"connected to $remote") + Kamon.counter("peers.connecting.count").withTag("state", "connected").increment() val connection = sender authenticator ! Authenticator.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None) listener ! ResumeAccepting(batchSize = 1) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala index 0c79bd663c..6a8eaa82bd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala @@ -18,10 +18,13 @@ package fr.acinq.eclair.payment import akka.actor.{Actor, ActorLogging, Props} import fr.acinq.bitcoin.ByteVector32 -import fr.acinq.eclair.{MilliSatoshi, NodeParams} -import fr.acinq.eclair.channel.Helpers.Closing.{LocalClose, MutualClose, RecoveryClose, RemoteClose, RevokedClose} +import fr.acinq.eclair.NodeParams +import fr.acinq.eclair.channel.Channel.{LocalError, RemoteError} +import fr.acinq.eclair.channel.Helpers.Closing._ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent} +import kamon.Kamon + import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -40,26 +43,61 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging { override def receive: Receive = { - case e: PaymentSent => db.add(e) - - case e: PaymentReceived => db.add(e) - - case e: PaymentRelayed => db.add(e) + case e: PaymentSent => + Kamon + .histogram("payment.hist") + .withTag("direction", "sent") + .record(e.amount.truncateToSatoshi.toLong) + db.add(e) + + case e: PaymentReceived => + Kamon + .histogram("payment.hist") + .withTag("direction", "received") + .record(e.amount.truncateToSatoshi.toLong) + db.add(e) + + case e: PaymentRelayed => + Kamon + .histogram("payment.hist") + .withTag("direction", "relayed") + .withTag("type", "total") + .record(e.amountIn.truncateToSatoshi.toLong) + Kamon + .histogram("payment.hist") + .withTag("direction", "relayed") + .withTag("type", "fee") + .record((e.amountIn - e.amountOut).truncateToSatoshi.toLong) + db.add(e) case e: NetworkFeePaid => db.add(e) case e: AvailableBalanceChanged => balanceEventThrottler ! e - case e: ChannelErrorOccured => db.add(e) + case e: ChannelErrorOccured => + val metric = Kamon.counter("channels.errors") + e.error match { + case LocalError(_) if e.isFatal => metric.withTag("origin", "local").withTag("fatal", "yes").increment() + case LocalError(_) if !e.isFatal => metric.withTag("origin", "local").withTag("fatal", "no").increment() + case RemoteError(_) => metric.withTag("origin", "remote").increment() + } + db.add(e) case e: ChannelStateChanged => + val metric = Kamon.counter("channels.lifecycle") + // NB: order matters! e match { case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) => + metric.withTag("event", "created").increment() db.add(ChannelLifecycleEvent(d.channelId, remoteNodeId, d.commitments.commitInput.txOut.amount, d.commitments.localParams.isFunder, !d.commitments.announceChannel, "created")) + case ChannelStateChanged(_, _, _, WAIT_FOR_INIT_INTERNAL, _, _) => + case ChannelStateChanged(_, _, _, _, CLOSING, _) => + metric.withTag("event", "closing").increment() case _ => () } case e: ChannelClosed => + Kamon.counter("channels.lifecycle").withTag("event", "closed").increment() val event = e.closingType match { case MutualClose => "mutual" case LocalClose => "local" diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 45bc52f198..ca93d70a7e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -36,6 +36,8 @@ import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge} import fr.acinq.eclair.router.Graph.{RichWeight, RoutingHeuristics, WeightRatios} import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire._ +import kamon.Kamon +import kamon.context.Context import shapeless.HNil import scala.annotation.tailrec @@ -275,86 +277,100 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ stay case Event(v@ValidateResult(c, _), d0) => - d0.awaiting.get(c) match { - case Some(origin +: _) => origin ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement - case _ => () - } - log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) - val publicChannel_opt = v match { - case ValidateResult(c, Left(t)) => - log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) - None - case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => - val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) - // let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2) - val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2))) - if (tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript) { - log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) + Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) { + Kamon.runWithSpan(Kamon.currentSpan(), finishSpan = true) { + Kamon.runWithSpan(Kamon.spanBuilder("process-validate-result").start(), finishSpan = true) { d0.awaiting.get(c) match { - case Some(origins) => origins.foreach(_ ! InvalidAnnouncement(c)) + case Some(origin +: _) => origin ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement case _ => () } - None - } else { - watcher ! WatchSpentBasic(self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId)) - // TODO: check feature bit set - log.debug("added channel channelId={}", c.shortChannelId) - val capacity = tx.txOut(outputIndex).amount - context.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity) :: Nil)) - db.addChannel(c, tx.txid, capacity) - - // in case we just validated our first local channel, we announce the local node - if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) { - log.info("first local channel validated, announcing local node") - val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.globalFeatures) - self ! nodeAnn + log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size) + val publicChannel_opt = v match { + case ValidateResult(c, Left(t)) => + log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage) + None + case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) => + val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId) + val (fundingOutputScript, ok) = Kamon.runWithSpan(Kamon.spanBuilder("checked-pubkeyscript").start(), finishSpan = true) { + // let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2) + val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(c.bitcoinKey1, c.bitcoinKey2))) + val ok = tx.txOut.size < outputIndex + 1 || fundingOutputScript != tx.txOut(outputIndex).publicKeyScript + (fundingOutputScript, ok) + } + if (ok) { + log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c) + d0.awaiting.get(c) match { + case Some(origins) => origins.foreach(_ ! InvalidAnnouncement(c)) + case _ => () + } + None + } else { + watcher ! WatchSpentBasic(self, tx, outputIndex, BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(c.shortChannelId)) + // TODO: check feature bit set + log.debug("added channel channelId={}", c.shortChannelId) + val capacity = tx.txOut(outputIndex).amount + context.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity) :: Nil)) + Kamon.runWithSpan(Kamon.spanBuilder("add-to-db").start(), finishSpan = true) { + db.addChannel(c, tx.txid, capacity) + } + // in case we just validated our first local channel, we announce the local node + if (!d0.nodes.contains(nodeParams.nodeId) && isRelatedTo(c, nodeParams.nodeId)) { + log.info("first local channel validated, announcing local node") + val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.globalFeatures) + self ! nodeAnn + } + Some(PublicChannel(c, tx.txid, capacity, None, None)) + } + case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => + if (fundingTxStatus.spendingTxConfirmed) { + log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) + // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those + d0.awaiting.get(c) match { + case Some(origins) => origins.foreach(_ ! ChannelClosed(c)) + case _ => () + } + } else { + log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) + } + // there may be a record if we have just restarted + db.removeChannel(c.shortChannelId) + None } - Some(PublicChannel(c, tx.txid, capacity, None, None)) - } - case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => - if (fundingTxStatus.spendingTxConfirmed) { - log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid) - // the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those - d0.awaiting.get(c) match { - case Some(origins) => origins.foreach(_ ! ChannelClosed(c)) - case _ => () + val span1 = Kamon.spanBuilder("reprocess-stash").start + // we also reprocess node and channel_update announcements related to channels that were just analyzed + val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId) + val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId)) + // and we remove the reprocessed messages from the stash + val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys) + // we remove channel from awaiting map + val awaiting1 = d0.awaiting - c + span1.finish() + + publicChannel_opt match { + case Some(pc) => + Kamon.runWithSpan(Kamon.spanBuilder("build-new-state").start, finishSpan = true) { + // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update + // right after the channel_announcement, channel_updates will be moved from private to public at that time + val d1 = d0.copy( + channels = d0.channels + (c.shortChannelId -> pc), + privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before + rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue + stash = stash1, + awaiting = awaiting1) + // we only reprocess updates and nodes if validation succeeded + val d2 = reprocessUpdates.foldLeft(d1) { + case (d, (u, origins)) => origins.foldLeft(d) { case (d, origin) => handle(u, origin, d) } // we reprocess the same channel_update for every origin (to preserve origin information) + } + val d3 = reprocessNodes.foldLeft(d2) { + case (d, (n, origins)) => origins.foldLeft(d) { case (d, origin) => handle(n, origin, d) } // we reprocess the same node_announcement for every origins (to preserve origin information) + } + stay using d3 + } + case None => + stay using d0.copy(stash = stash1, awaiting = awaiting1) } - } else { - log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid) } - // there may be a record if we have just restarted - db.removeChannel(c.shortChannelId) - None - } - - // we also reprocess node and channel_update announcements related to channels that were just analyzed - val reprocessUpdates = d0.stash.updates.filterKeys(u => u.shortChannelId == c.shortChannelId) - val reprocessNodes = d0.stash.nodes.filterKeys(n => isRelatedTo(c, n.nodeId)) - // and we remove the reprocessed messages from the stash - val stash1 = d0.stash.copy(updates = d0.stash.updates -- reprocessUpdates.keys, nodes = d0.stash.nodes -- reprocessNodes.keys) - // we remove channel from awaiting map - val awaiting1 = d0.awaiting - c - - publicChannel_opt match { - case Some(pc) => - // note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update - // right after the channel_announcement, channel_updates will be moved from private to public at that time - val d1 = d0.copy( - channels = d0.channels + (c.shortChannelId -> pc), - privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before - rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue - stash = stash1, - awaiting = awaiting1) - // we only reprocess updates and nodes if validation succeeded - val d2 = reprocessUpdates.foldLeft(d1) { - case (d, (u, origins)) => origins.foldLeft(d) { case (d, origin) => handle(u, origin, d) } // we reprocess the same channel_update for every origin (to preserve origin information) - } - val d3 = reprocessNodes.foldLeft(d2) { - case (d, (n, origins)) => origins.foldLeft(d) { case (d, origin) => handle(n, origin, d) } // we reprocess the same node_announcement for every origins (to preserve origin information) - } - stay using d3 - case None => - stay using d0.copy(stash = stash1, awaiting = awaiting1) + } } case Event(WatchEventSpentBasic(BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT(shortChannelId)), d) if d.channels.contains(shortChannelId) => @@ -541,7 +557,11 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ stay } else { log.info("validating shortChannelId={}", c.shortChannelId) - watcher ! ValidateRequest(c) + Kamon.runWithContextEntry(shortChannelIdKey, c.shortChannelId) { + Kamon.runWithSpan(Kamon.spanBuilder("validate-channel").tag("shortChannelId", c.shortChannelId.toString).start(), finishSpan = false) { + watcher ! ValidateRequest(c) + } + } // we don't acknowledge the message just yet stay using d.copy(awaiting = d.awaiting + (c -> Seq(sender))) } @@ -556,104 +576,125 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ log.debug("received node announcement for nodeId={}", n.nodeId) stay using handle(n, sender, d) - case Event(PeerRoutingMessage(transport, _, routingMessage@QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, extendedQueryFlags_opt)), d) => + case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, extendedQueryFlags_opt)), d) => sender ! TransportHandler.ReadAck(routingMessage) - log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", firstBlockNum, numberOfBlocks, extendedQueryFlags_opt) - // keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks] - val shortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _)) - log.info("replying with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks) - split(shortChannelIds, nodeParams.routerConf.channelRangeChunkSize) - .foreach(chunk => { - val (timestamps, checksums) = routingMessage.queryFlags_opt match { - case Some(extension) if extension.wantChecksums | extension.wantTimestamps => - // we always compute timestamps and checksums even if we don't need both, overhead is negligible - val (timestamps, checksums) = chunk.shortChannelIds.map(getChannelDigestInfo(d.channels)).unzip - val encodedTimestamps = if (extension.wantTimestamps) Some(ReplyChannelRangeTlv.EncodedTimestamps(nodeParams.routerConf.encodingType, timestamps)) else None - val encodedChecksums = if (extension.wantChecksums) Some(ReplyChannelRangeTlv.EncodedChecksums(checksums)) else None - (encodedTimestamps, encodedChecksums) - case _ => (None, None) + Kamon.runWithContextEntry(remoteNodeIdKey, remoteNodeId.toString) { + Kamon.runWithSpan(Kamon.spanBuilder("query-channel-range").start(), finishSpan = true) { + log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", firstBlockNum, numberOfBlocks, extendedQueryFlags_opt) + // keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks] + val shortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _)) + log.info("replying with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks) + val chunks = Kamon.runWithSpan(Kamon.spanBuilder("split-channel-ids").start(), finishSpan = true) { + split(shortChannelIds, nodeParams.routerConf.channelRangeChunkSize) } - transport ! ReplyChannelRange(chainHash, chunk.firstBlock, chunk.numBlocks, - complete = 1, - shortChannelIds = EncodedShortChannelIds(nodeParams.routerConf.encodingType, chunk.shortChannelIds), - timestamps = timestamps, - checksums = checksums) - }) - stay + Kamon.runWithSpan(Kamon.spanBuilder("compute-timestamps-checksums").start(), finishSpan = true) { + chunks.foreach { chunk => + val (timestamps, checksums) = routingMessage.queryFlags_opt match { + case Some(extension) if extension.wantChecksums | extension.wantTimestamps => + // we always compute timestamps and checksums even if we don't need both, overhead is negligible + val (timestamps, checksums) = chunk.shortChannelIds.map(getChannelDigestInfo(d.channels)).unzip + val encodedTimestamps = if (extension.wantTimestamps) Some(ReplyChannelRangeTlv.EncodedTimestamps(nodeParams.routerConf.encodingType, timestamps)) else None + val encodedChecksums = if (extension.wantChecksums) Some(ReplyChannelRangeTlv.EncodedChecksums(checksums)) else None + (encodedTimestamps, encodedChecksums) + case _ => (None, None) + } + transport ! ReplyChannelRange(chainHash, chunk.firstBlock, chunk.numBlocks, + complete = 1, + shortChannelIds = EncodedShortChannelIds(nodeParams.routerConf.encodingType, chunk.shortChannelIds), + timestamps = timestamps, + checksums = checksums) + } + } + stay + } + } case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@ReplyChannelRange(chainHash, _, _, _, shortChannelIds, _)), d) => sender ! TransportHandler.ReadAck(routingMessage) - @tailrec - def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { - ids match { - case Nil => acc.reverse - case head :: tail => - val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, nodeParams.routerConf.requestNodeAnnouncements) - // 0 means nothing to query, just don't include it - val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc - loop(tail, timestamps.drop(1), checksums.drop(1), acc1) - } - } + Kamon.runWithContextEntry(remoteNodeIdKey, remoteNodeId.toString) { + Kamon.runWithSpan(Kamon.spanBuilder("reply-channel-range").start(), finishSpan = true) { + + @tailrec + def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { + ids match { + case Nil => acc.reverse + case head :: tail => + val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, nodeParams.routerConf.requestNodeAnnouncements) + // 0 means nothing to query, just don't include it + val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc + loop(tail, timestamps.drop(1), checksums.drop(1), acc1) + } + } - val timestamps_opt = routingMessage.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) - val checksums_opt = routingMessage.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) + val timestamps_opt = routingMessage.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) + val checksums_opt = routingMessage.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) - val shortChannelIdAndFlags = loop(shortChannelIds.array, timestamps_opt, checksums_opt) + val shortChannelIdAndFlags = Kamon.runWithSpan(Kamon.spanBuilder("compute-flags").start(), finishSpan = true) { + loop(shortChannelIds.array, timestamps_opt, checksums_opt) + } - val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { - case ((c, u), ShortChannelIdAndFlag(_, flag)) => - val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) - val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) - (c1, u1) + val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { + case ((c, u), ShortChannelIdAndFlag(_, flag)) => + val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) + val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) + (c1, u1) + } + log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding) + // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) + val replies = shortChannelIdAndFlags + .grouped(nodeParams.routerConf.channelQueryChunkSize) + .map(chunk => QueryShortChannelIds(chainHash, + shortChannelIds = EncodedShortChannelIds(shortChannelIds.encoding, chunk.map(_.shortChannelId)), + if (routingMessage.timestamps_opt.isDefined || routingMessage.checksums_opt.isDefined) + TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(shortChannelIds.encoding, chunk.map(_.flag))) + else + TlvStream.empty + )) + .toList + val (sync1, replynow_opt) = addToSync(d.sync, remoteNodeId, replies) + // we only send a reply right away if there were no pending requests + replynow_opt.foreach(transport ! _) + val progress = syncProgress(sync1) + context.system.eventStream.publish(progress) + self ! progress + stay using d.copy(sync = sync1) + } } - log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding) - // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) - val replies = shortChannelIdAndFlags - .grouped(nodeParams.routerConf.channelQueryChunkSize) - .map(chunk => QueryShortChannelIds(chainHash, - shortChannelIds = EncodedShortChannelIds(shortChannelIds.encoding, chunk.map(_.shortChannelId)), - if (routingMessage.timestamps_opt.isDefined || routingMessage.checksums_opt.isDefined) - TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(shortChannelIds.encoding, chunk.map(_.flag))) - else - TlvStream.empty - )) - .toList - val (sync1, replynow_opt) = addToSync(d.sync, remoteNodeId, replies) - // we only send a reply right away if there were no pending requests - replynow_opt.foreach(transport ! _) - val progress = syncProgress(sync1) - context.system.eventStream.publish(progress) - self ! progress - stay using d.copy(sync = sync1) - case Event(PeerRoutingMessage(transport, _, routingMessage@QueryShortChannelIds(chainHash, shortChannelIds, _)), d) => + case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@QueryShortChannelIds(chainHash, shortChannelIds, _)), d) => sender ! TransportHandler.ReadAck(routingMessage) - val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) - - var channelCount = 0 - var updateCount = 0 - var nodeCount = 0 - - Router.processChannelQuery(d.nodes, d.channels)( - shortChannelIds.array, - flags, - ca => { - channelCount = channelCount + 1 - transport ! ca - }, - cu => { - updateCount = updateCount + 1 - transport ! cu - }, - na => { - nodeCount = nodeCount + 1 - transport ! na + + Kamon.runWithContextEntry(remoteNodeIdKey, remoteNodeId.toString) { + Kamon.runWithSpan(Kamon.spanBuilder("query-short-channel-ids").start(), finishSpan = true) { + + val flags = routingMessage.queryFlags_opt.map(_.array).getOrElse(List.empty[Long]) + + var channelCount = 0 + var updateCount = 0 + var nodeCount = 0 + + Router.processChannelQuery(d.nodes, d.channels)( + shortChannelIds.array, + flags, + ca => { + channelCount = channelCount + 1 + transport ! ca + }, + cu => { + updateCount = updateCount + 1 + transport ! cu + }, + na => { + nodeCount = nodeCount + 1 + transport ! na + } + ) + log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount) + transport ! ReplyShortChannelIdsEnd(chainHash, 1) + stay } - ) - log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", shortChannelIds.array.size, channelCount, updateCount, nodeCount) - transport ! ReplyShortChannelIdsEnd(chainHash, 1) - stay + } case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage: ReplyShortChannelIdsEnd), d) => sender ! TransportHandler.ReadAck(routingMessage) @@ -837,6 +878,9 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ object Router { + val shortChannelIdKey = Context.key[ShortChannelId]("shortChannelId", ShortChannelId(0)) + val remoteNodeIdKey = Context.key[String]("remoteNodeId", "unknown") + def props(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) = Props(new Router(nodeParams, watcher, initialized)) def toFakeUpdate(extraHop: ExtraHop, htlcMaximum: MilliSatoshi): ChannelUpdate = { diff --git a/eclair-node/pom.xml b/eclair-node/pom.xml index e2f893519a..6308b38513 100644 --- a/eclair-node/pom.xml +++ b/eclair-node/pom.xml @@ -61,6 +61,12 @@ fat ${project.name}-${project.version} -${git.commit.id.abbrev} + + + Java-Agents + kanela-agent-1.0.1.jar + + @@ -68,6 +74,17 @@ + + + + false + + bintray-kamon-io-releases + bintray + https://dl.bintray.com/kamon-io/releases + + + fr.acinq.eclair @@ -80,6 +97,7 @@ classutil_${scala.version.short} 1.4.0 + ch.qos.logback logback-classic @@ -102,6 +120,32 @@ akka-http-json4s_${scala.version.short} 1.19.0 + + + io.kamon + kamon-apm-reporter_${scala.version.short} + ${kamon.version} + + + io.kamon + kamon-system-metrics_${scala.version.short} + ${kamon.version} + + + io.kamon + kamon-jdbc_${scala.version.short} + ${kamon.version} + + + com.typesafe.akka @@ -115,5 +159,11 @@ 1.4.1 test + + + io.kamon + kanela-agent + 1.0.1 + diff --git a/eclair-node/src/main/resources/application.conf b/eclair-node/src/main/resources/application.conf index 3dbfab50b3..e8cb68f43b 100644 --- a/eclair-node/src/main/resources/application.conf +++ b/eclair-node/src/main/resources/application.conf @@ -1,3 +1,7 @@ +eclair { + enable-kamon = false +} + akka { loggers = ["akka.event.slf4j.Slf4jLogger"] diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala index 45ad4bde24..638ecad1e3 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala @@ -24,7 +24,9 @@ import akka.stream.{ActorMaterializer, BindFailedException} import com.typesafe.config.Config import fr.acinq.eclair.api.Service import grizzled.slf4j.Logging -import scala.concurrent.{Await, ExecutionContext} +import kamon.Kamon + +import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} /** @@ -40,6 +42,11 @@ object Boot extends App with Logging { implicit val system: ActorSystem = ActorSystem("eclair-node") implicit val ec: ExecutionContext = system.dispatcher val setup = new Setup(datadir) + + if (setup.config.getBoolean("enable-kamon")) { + Kamon.init(setup.appConfig) + } + plugins.foreach(_.onSetup(setup)) setup.bootstrap onComplete { case Success(kit) => diff --git a/pom.xml b/pom.xml index 3c8775b350..161a4adb19 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ 1.3.9 0.15 24.0-android + 2.0.0