Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitoring with Kamon (disabled by default) #1126

Merged
merged 8 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- MONITORING -->
<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-core_${scala.version.short}</artifactId>
<version>${kamon.version}</version>
</dependency>
<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-akka_${scala.version.short}</artifactId>
<version>${kamon.version}</version>
</dependency>
<!-- TESTS -->
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object NodeParams {
ConfigFactory.parseProperties(System.getProperties)
.withFallback(ConfigFactory.parseFile(new File(datadir, "eclair.conf")))
.withFallback(overrideDefaults)
.withFallback(ConfigFactory.load()).getConfig("eclair")
.withFallback(ConfigFactory.load())

def getSeed(datadir: File): ByteVector = {
val seedPath = new File(datadir, "seed.dat")
Expand Down
25 changes: 25 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package fr.acinq.eclair
pm47 marked this conversation as resolved.
Show resolved Hide resolved

import kamon.Kamon

import scala.concurrent.{ExecutionContext, Future}

object KamonExt {

def time[T](name: String)(f: => T) = {
val timer = Kamon.timer(name).withoutTags().start()
try {
f
} finally {
timer.stop()
}
}

def timeFuture[T](name: String)(f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val timer = Kamon.timer(name).withoutTags().start()
val res = f
res onComplete { case _ => timer.stop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be res onComplete { case Success(_) => timer.stop }? I'd assume we don't want to mix measurements for failed calls with the successful ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to stop the timer somehow? I don't think it matters anyway, because Kamon has first-class handling of distribution of latencies, and we would detect outliers easily.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's where we'd use tags or some equivalent.
Usually we should split our measurements on values of a specific tag with Success, Failure1, Failure2, etc depending on our set of possible failures.

res
}

}
3 changes: 2 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class Setup(datadir: File,
secureRandom.nextInt()

datadir.mkdirs()
val config = NodeParams.loadConfiguration(datadir, overrideDefaults)
val appConfig = NodeParams.loadConfiguration(datadir, overrideDefaults)
val config = appConfig.getConfig("eclair")
val seed = seed_opt.getOrElse(NodeParams.getSeed(datadir))
val chain = config.getString("chain")
val chaindir = new File(datadir, chain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.wire.ChannelAnnouncement
import kamon.Kamon
import org.json4s.JsonAST._

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -149,26 +150,36 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)

for {
blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex))
txid: String <- rpcClient.invoke("getblock", blockHash).map {
case json => Try {
val JArray(txs) = json \ "tx"
txs(txIndex).extract[String]
} getOrElse ByteVector32.Zeroes.toHex
}
tx <- getRawTransaction(txid)
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map {
case res => UtxoStatus.Spent(spendingTxConfirmed = !res)
val span = Kamon.spanBuilder("validate-bitcoin-client").start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's kamon recommendation for naming spans? Do they automatically included the containing namespace/class/object? It's usually recommended to use some kind of namespacing in spans, so either Kamon does it automatically for us or maybe we should do something like blockchain.bitcoind.rpc.extendedbitcoinclient.validate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, let's address this in a 2nd iteration

for {
_ <- Future.successful(0)
t-bast marked this conversation as resolved.
Show resolved Hide resolved
span0 = Kamon.spanBuilder("getblockhash").start()
t-bast marked this conversation as resolved.
Show resolved Hide resolved
blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: String <- rpcClient.invoke("getblock", blockHash).map {
case json => Try {
val JArray(txs) = json \ "tx"
txs(txIndex).extract[String]
} getOrElse ByteVector32.Zeroes.toHex
}
}
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we'd benefit a lot from a utility function in KamonExt to wrap a future in a span.
It would be nice to just write tx <- KamonExt.wrapInSpan("getrawtx")(getRawTransaction(txid)) which would automatically do the span.finish().
The name wrapInSpan isn't very good, needs some bikeshedding.

tx <- getRawTransaction(txid)
_ = span2.finish()
span3 = Kamon.spanBuilder("utxospendable-mempool").start()
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
_ = span3.finish()
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map {
case res => UtxoStatus.Spent(spendingTxConfirmed = !res)
}
}
_ = span.finish()
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))

} recover { case t: Throwable => ValidateResult(c, Left(t)) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
import fr.acinq.eclair.io.Authenticator.{Authenticated, AuthenticationFailed, PendingAuth}
import fr.acinq.eclair.wire.LightningMessageCodecs
import fr.acinq.eclair.{Logs, NodeParams}
import kamon.Kamon

/**
* The purpose of this class is to serve as a buffer for newly connection before they are authenticated
Expand All @@ -43,6 +44,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
def ready(switchboard: ActorRef, authenticating: Map[ActorRef, PendingAuth]): Receive = {
case pending@PendingAuth(connection, remoteNodeId_opt, address, _) =>
log.debug(s"authenticating connection to ${address.getHostString}:${address.getPort} (pending=${authenticating.size} handlers=${context.children.size})")
Kamon.counter("peers.connecting.count").withTag("state", "authenticating").increment()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to centralize counters for each component (like the Metrics object you created for Peer).
For example in Authenticator's companion object, define private val connectingPeersCounter = Kamon.counter("peers.connecting.count").
This is usually easier for future maintenance.

val transport = context.actorOf(TransportHandler.props(
KeyPair(nodeParams.nodeId.value, nodeParams.privateKey.value),
remoteNodeId_opt.map(_.value),
Expand All @@ -56,6 +58,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
import pendingAuth.{address, remoteNodeId_opt}
val outgoing = remoteNodeId_opt.isDefined
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (outgoing) "outgoing" else "incoming"}")
Kamon.counter("peers.connecting.count").withTag("state", "authenticated").increment()
switchboard ! Authenticated(connection, transport, remoteNodeId, address, remoteNodeId_opt.isDefined, pendingAuth.origin_opt)
context become ready(switchboard, authenticating - transport)

Expand Down
23 changes: 23 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{secureRandom, wire, _}
import kamon.Kamon
import scodec.Attempt
import scodec.bits.ByteVector

Expand Down Expand Up @@ -535,6 +536,22 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
case DISCONNECTED -> _ if nodeParams.autoReconnect => cancelTimer(RECONNECT_TIMER)
}

onTransition {
case _ -> CONNECTED =>
Metrics.connectedPeers.increment()
context.system.eventStream.publish(PeerConnected(self, remoteNodeId))
case CONNECTED -> DISCONNECTED =>
Metrics.connectedPeers.decrement()
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

onTermination {
case StopEvent(_, CONNECTED, d: ConnectedData) =>
// the transition handler won't be fired if we go directly from CONNECTED to closed
Metrics.connectedPeers.decrement()
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
}

def createNewChannel(nodeParams: NodeParams, funder: Boolean, fundingAmount: Satoshi, origin_opt: Option[ActorRef]): (ActorRef, LocalParams) = {
val defaultFinalScriptPubKey = Helpers.getFinalScriptPubKey(wallet, nodeParams.chainHash)
val localParams = makeChannelParams(nodeParams, defaultFinalScriptPubKey, funder, fundingAmount)
Expand Down Expand Up @@ -640,6 +657,12 @@ object Peer {

// @formatter:on

object Metrics {
val peers = Kamon.rangeSampler("peers.count").withoutTags()
val connectedPeers = Kamon.rangeSampler("peers.connected.count").withoutTags()
val channels = Kamon.rangeSampler("channels.count").withoutTags()
}

def makeChannelParams(nodeParams: NodeParams, defaultFinalScriptPubKey: ByteVector, isFunder: Boolean, fundingAmount: Satoshi): LocalParams = {
val entropy = new Array[Byte](16)
secureRandom.nextBytes(entropy)
Expand Down
26 changes: 26 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.io

import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey

sealed trait PeerEvent

case class PeerConnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent

case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import fr.acinq.eclair.NodeParams
import kamon.Kamon

import scala.concurrent.Promise

Expand Down Expand Up @@ -52,6 +53,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke
def listening(listener: ActorRef): Receive = {
case Connected(remote, _) =>
log.info(s"connected to $remote")
Kamon.counter("peers.connecting.count").withTag("state", "connected").increment()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same counter used in Authenticator. So definitely worth centralizing somewhere to make sure these two actors keep using the same counter correctly

val connection = sender
authenticator ! Authenticator.PendingAuth(connection, remoteNodeId_opt = None, address = remote, origin_opt = None)
listener ! ResumeAccepting(batchSize = 1)
Expand Down
54 changes: 46 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package fr.acinq.eclair.payment

import akka.actor.{Actor, ActorLogging, Props}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.{MilliSatoshi, NodeParams}
import fr.acinq.eclair.channel.Helpers.Closing.{LocalClose, MutualClose, RecoveryClose, RemoteClose, RevokedClose}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel.Channel.{LocalError, RemoteError}
import fr.acinq.eclair.channel.Helpers.Closing._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent}
import kamon.Kamon

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand All @@ -40,26 +43,61 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {

override def receive: Receive = {

case e: PaymentSent => db.add(e)

case e: PaymentReceived => db.add(e)

case e: PaymentRelayed => db.add(e)
case e: PaymentSent =>
Kamon
.histogram("payment.hist")
.withTag("direction", "sent")
.record(e.amount.truncateToSatoshi.toLong)
db.add(e)

case e: PaymentReceived =>
Kamon
.histogram("payment.hist")
.withTag("direction", "received")
.record(e.amount.truncateToSatoshi.toLong)
db.add(e)

case e: PaymentRelayed =>
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("type", "total")
.record(e.amountIn.truncateToSatoshi.toLong)
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("type", "fee")
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
db.add(e)

case e: NetworkFeePaid => db.add(e)

case e: AvailableBalanceChanged => balanceEventThrottler ! e

case e: ChannelErrorOccured => db.add(e)
case e: ChannelErrorOccured =>
val metric = Kamon.counter("channels.errors")
e.error match {
case LocalError(_) if e.isFatal => metric.withTag("origin", "local").withTag("fatal", "yes").increment()
case LocalError(_) if !e.isFatal => metric.withTag("origin", "local").withTag("fatal", "no").increment()
case RemoteError(_) => metric.withTag("origin", "remote").increment()
}
db.add(e)

case e: ChannelStateChanged =>
val metric = Kamon.counter("channels.lifecycle")
// NB: order matters!
e match {
case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) =>
metric.withTag("event", "created").increment()
db.add(ChannelLifecycleEvent(d.channelId, remoteNodeId, d.commitments.commitInput.txOut.amount, d.commitments.localParams.isFunder, !d.commitments.announceChannel, "created"))
case ChannelStateChanged(_, _, _, WAIT_FOR_INIT_INTERNAL, _, _) =>
case ChannelStateChanged(_, _, _, _, CLOSING, _) =>
metric.withTag("event", "closing").increment()
case _ => ()
}

case e: ChannelClosed =>
Kamon.counter("channels.lifecycle").withTag("event", "closed").increment()
val event = e.closingType match {
case MutualClose => "mutual"
case LocalClose => "local"
Expand Down
Loading