Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended Queries: use TLV format for optional data #1072

Merged
merged 16 commits into from
Aug 22, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
@@ -154,7 +154,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
if (remoteFeatures.hasChannelRangeQueriesOptional || remoteFeatures.hasChannelRangeQueriesMandatory) {
// if they support channel queries, always ask for their filter
// NB: we always add extended info; if peer doesn't understand them it will ignore them
router ! SendChannelQuery(remoteNodeId, d.transport, flags_opt = Some(ExtendedQueryFlags.TIMESTAMPS_AND_CHECKSUMS))
router ! SendChannelQuery(remoteNodeId, d.transport, flags_opt = Some(QueryChannelRangeExtension(QueryChannelRangeExtension.WANT_ALL)))
}

// let's bring existing/requested channels online
107 changes: 66 additions & 41 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@

package fr.acinq.eclair.router

import java.util.zip.Adler32
import java.util.zip.CRC32

import akka.Done
import akka.actor.{ActorRef, Props, Status}
@@ -76,13 +76,13 @@ case class RouteResponse(hops: Seq[Hop], ignoreNodes: Set[PublicKey], ignoreChan
}
case class ExcludeChannel(desc: ChannelDesc) // this is used when we get a TemporaryChannelFailure, to give time for the channel to recover (note that exclusions are directed)
case class LiftChannelExclusion(desc: ChannelDesc)
case class SendChannelQuery(remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[ExtendedQueryFlags])
case class SendChannelQuery(remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeExtension])
case object GetRoutingState
case class RoutingState(channels: Iterable[ChannelAnnouncement], updates: Iterable[ChannelUpdate], nodes: Iterable[NodeAnnouncement])
case class Stash(updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])
case class Rebroadcast(channels: Map[ChannelAnnouncement, Set[ActorRef]], updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])

case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Byte)
case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Long)

case class Sync(pending: List[RoutingMessage], total: Int)

@@ -431,7 +431,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
// ask for everything
// we currently send only one query_channel_range message per peer, when we just (re)connected to it, so we don't
// have to worry about sending a new query_channel_range when another query is still in progress
val query = QueryChannelRange(nodeParams.chainHash, firstBlockNum = 0, numberOfBlocks = Int.MaxValue, extendedQueryFlags_opt = flags_opt)
val query = QueryChannelRange(nodeParams.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, queryExtension = flags_opt)
log.info("sending query_channel_range={}", query)
remote ! query

@@ -508,37 +508,51 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
val shortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks)
split(shortChannelIds)
.foreach(chunk =>
transport ! ReplyChannelRange(chainHash, chunk.firstBlock, chunk.numBlocks,
.foreach(chunk => {
val (timestamps, checksums) = routingMessage.queryExtension match {
case Some(extension) if extension.wantChecksums | extension.wantTimestamps =>
// we always compute timestamps and checksums even if we don't need both, overhead is negligible
val (timestamps, checksums) = chunk.shortChannelIds.map(getChannelDigestInfo(d.channels, d.updates)).unzip
val encodedTimestamps = if (extension.wantTimestamps) Some(EncodedTimestamps(EncodingType.UNCOMPRESSED, timestamps)) else None
val encodedChecksums = if (extension.wantChecksums) Some(EncodedChecksums(checksums)) else None
(encodedTimestamps, encodedChecksums)
case _ => (None, None)
}
val reply = ReplyChannelRange(chainHash, chunk.firstBlock, chunk.numBlocks,
complete = 1,
shortChannelIds = EncodedShortChannelIds(EncodingType.UNCOMPRESSED, chunk.shortChannelIds),
extendedQueryFlags_opt = extendedQueryFlags_opt,
extendedInfo_opt = extendedQueryFlags_opt map {
case ExtendedQueryFlags.TIMESTAMPS_AND_CHECKSUMS => ExtendedInfo(chunk.shortChannelIds.map(getChannelDigestInfo(d.channels, d.updates)))
}))
timestamps = timestamps,
checksums = checksums)
transport ! reply
})
stay

case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@ReplyChannelRange(chainHash, _, _, _, shortChannelIds, extendedQueryFlags_opt, extendedInfo_opt)), d) =>
case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@ReplyChannelRange(chainHash, _, _, _, shortChannelIds, _)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
val shortChannelIdAndFlags = shortChannelIds.array
.zipWithIndex
.map { case (shortChannelId: ShortChannelId, idx) => ShortChannelIdAndFlag(shortChannelId, computeFlag(d.channels, d.updates)(shortChannelId, extendedInfo_opt.map(_.array(idx)))) }
.map {
case (shortChannelId: ShortChannelId, idx) => {
val timestamps = routingMessage.timestamps.map(_.timestamps(idx))
val checksums = routingMessage.checksums.map(_.checksums(idx))
ShortChannelIdAndFlag(shortChannelId, computeFlag(d.channels, d.updates)(shortChannelId, timestamps, checksums))
}
}
.filter(_.flag != 0)
val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) {
case ((c, u), ShortChannelIdAndFlag(_, flag)) =>
val c1 = c + (if (QueryFlagTypes.includeAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryFlagTypes.includeUpdate1(flag)) 1 else 0) + (if (QueryFlagTypes.includeUpdate2(flag)) 1 else 0)
val c1 = c + (if (QueryFlagType.includeAnnouncement(flag)) 1 else 0)
val u1 = u + (if (QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryFlagType.includeUpdate2(flag)) 1 else 0)
(c1, u1)
}
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={} queryFlags=${extendedQueryFlags_opt.getOrElse("n/a")}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding)
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding)
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(SHORTID_WINDOW)
.map(chunk => QueryShortChannelIds(chainHash,
shortChannelIds = EncodedShortChannelIds(shortChannelIds.encoding, chunk.map(_.shortChannelId)),
queryFlags_opt = extendedQueryFlags_opt map {
case _ => EncodedQueryFlags(shortChannelIds.encoding, chunk.map(_.flag))
}))
if (routingMessage.timestamps.isDefined || routingMessage.checksums.isDefined) Some(EncodedQueryFlags(shortChannelIds.encoding, chunk.map(_.flag))) else None
))
.toList
val (sync1, replynow_opt) = updateSync(d.sync, remoteNodeId, replies)
// we only send a rely right away if there were no pending requests
@@ -554,16 +568,16 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
case ((c, u), (shortChannelId, idx)) =>
var c1 = c
var u1 = u
val flag = queryFlags_opt.map(_.array(idx)).getOrElse(QueryFlagTypes.INCLUDE_ALL)
val flag = routingMessage.queryFlags.map(_.array(idx)).getOrElse(QueryFlagType.INCLUDE_ALL)
d.channels.get(shortChannelId) match {
case None => log.warning("received query for shortChannelId={} that we don't have", shortChannelId)
case Some(ca) =>
if (QueryFlagTypes.includeAnnouncement(flag)) {
if (QueryFlagType.includeAnnouncement(flag)) {
transport ! ca
c1 = c1 + 1
}
if (QueryFlagTypes.includeUpdate1(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => transport ! u; u1 = u1 + 1 }
if (QueryFlagTypes.includeUpdate2(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => transport ! u; u1 = u1 + 1 }
if (QueryFlagType.includeUpdate1(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).foreach { u => transport ! u; u1 = u1 + 1 }
if (QueryFlagType.includeUpdate2(flag)) d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).foreach { u => transport ! u; u1 = u1 + 1 }
}
(c1, u1)
}
@@ -721,7 +735,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
// when we're sending updates to ourselves
(transport_opt, remoteNodeId_opt) match {
case (Some(transport), Some(remoteNodeId)) =>
val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, List(u.shortChannelId)), queryFlags_opt = None)
val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, List(u.shortChannelId)), flags = None)
d.sync.get(remoteNodeId) match {
case Some(sync) =>
// we already have a pending request to that node, let's add this channel to the list and we'll get it later
@@ -828,25 +842,40 @@ object Router {
height >= firstBlockNum && height <= (firstBlockNum + numberOfBlocks)
}

def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(shortChannelId: ShortChannelId, theirInfo_opt: Option[TimestampsAndChecksums]): Byte = {
var flag = 0
theirInfo_opt match {
case Some(theirInfo) if channels.contains(shortChannelId) =>
val ourInfo = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
def computeFlag(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(
shortChannelId: ShortChannelId,
timestamps_opt: Option[Timestamps],
checksums_opt: Option[Checksums]): Long = {
var flag = 0L
(timestamps_opt, checksums_opt) match {
case (Some(theirTimestamps), Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (ourTimestamps, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is different from ours, or it is the same but ours is about to be stale
// - it is not stale itself
if (ourInfo.timestamp1 < theirInfo.timestamp1 && (ourInfo.checksum1 != theirInfo.checksum1 || isAlmostStale(ourInfo.timestamp1)) && !isStale(theirInfo.timestamp1)) flag = flag | QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_1
if (ourInfo.timestamp2 < theirInfo.timestamp2 && (ourInfo.checksum2 != theirInfo.checksum2 || isAlmostStale(ourInfo.timestamp1)) && !isStale(theirInfo.timestamp2)) flag = flag | QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_2
case None if channels.contains(shortChannelId) =>
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && (ourChecksums.checksum1 != theirChecksums.checksum1 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && (ourChecksums.checksum2 != theirChecksums.checksum2 || isAlmostStale(ourTimestamps.timestamp1)) && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (Some(theirTimestamps), None) if channels.contains(shortChannelId) =>
val (ourTimestamps, _) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// we request their channel_update if all those conditions are met:
// - it is more recent than ours
// - it is not stale itself
if (ourTimestamps.timestamp1 < theirTimestamps.timestamp1 && !isStale(theirTimestamps.timestamp1)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourTimestamps.timestamp2 < theirTimestamps.timestamp2 && !isStale(theirTimestamps.timestamp2)) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, Some(theirChecksums)) if channels.contains(shortChannelId) =>
val (_, ourChecksums) = Router.getChannelDigestInfo(channels, updates)(shortChannelId)
// this should not happen as we will not ask for checksums without asking for timestamps too
if (ourChecksums.checksum1 != theirChecksums.checksum1 && theirChecksums.checksum1 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1
if (ourChecksums.checksum2 != theirChecksums.checksum2 && theirChecksums.checksum2 != 0) flag = flag | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case (None, None) if channels.contains(shortChannelId) =>
// we know this channel: we only request their channel updates
flag = QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_2
flag = QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
case _ =>
// we don't know this channel: we request everything
flag = QueryFlagTypes.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_2
flag = QueryFlagType.INCLUDE_CHANNEL_ANNOUNCEMENT | QueryFlagType.INCLUDE_CHANNEL_UPDATE_1 | QueryFlagType.INCLUDE_CHANNEL_UPDATE_2
}
flag.toByte
flag
}

/**
@@ -900,25 +929,21 @@ object Router {
timestamp
}

def getChannelDigestInfo(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(shortChannelId: ShortChannelId): TimestampsAndChecksums = {
def getChannelDigestInfo(channels: SortedMap[ShortChannelId, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate])(shortChannelId: ShortChannelId): (Timestamps, Checksums) = {
val c = channels(shortChannelId)
val u1_opt = updates.get(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))
val u2_opt = updates.get(ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1))
val timestamp1 = u1_opt.map(_.timestamp).getOrElse(0L)
val timestamp2 = u2_opt.map(_.timestamp).getOrElse(0L)
val checksum1 = u1_opt.map(getChecksum).getOrElse(0L)
val checksum2 = u2_opt.map(getChecksum).getOrElse(0L)
TimestampsAndChecksums(
timestamp1 = timestamp1,
checksum1 = checksum1,
timestamp2 = timestamp2,
checksum2 = checksum2)
(Timestamps(timestamp1 = timestamp1, timestamp2 = timestamp2), Checksums(checksum1 = checksum1, checksum2 = checksum2))
}

def getChecksum(u: ChannelUpdate): Long = {
import u._
val data = serializationResult(LightningMessageCodecs.channelUpdateChecksumCodec.encode(shortChannelId :: messageFlags :: channelFlags :: cltvExpiryDelta :: htlcMinimumMsat :: feeBaseMsat :: feeProportionalMillionths :: htlcMaximumMsat :: HNil))
val checksum = new Adler32()
val checksum = new CRC32()
checksum.update(data.toArray)
checksum.getValue
}
20 changes: 11 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/wire/CommonCodecs.scala
Original file line number Diff line number Diff line change
@@ -18,14 +18,15 @@ package fr.acinq.eclair.wire

import java.net.{Inet4Address, Inet6Address, InetAddress}

import fr.acinq.bitcoin.{ByteVector32, ByteVector64}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, ByteVector64}
import fr.acinq.eclair.{ShortChannelId, UInt64}
import org.apache.commons.codec.binary.Base32
import scodec.{Attempt, Codec, DecodeResult, Err, SizeBound}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
import scodec.{Attempt, Codec, DecodeResult, Err, SizeBound}

import scala.Ordering.Implicits._
import scala.util.Try

/**
@@ -56,24 +57,25 @@ object CommonCodecs {
val uint64L: Codec[UInt64] = bytes(8).xmap(b => UInt64(b.reverse), a => a.toByteVector.padLeft(8).reverse)

/**
* We impose a minimal encoding on varint values to ensure that signed hashes can be reproduced easily.
* We impose a minimal encoding on some values (such as varint and truncated int) to ensure that signed hashes can be
* re-computed correctly.
* If a value could be encoded with less bytes, it's considered invalid and results in a failed decoding attempt.
*
* @param codec the integer codec (depends on the value).
* @param codec the value codec (depends on the value).
* @param min the minimal value that should be encoded.
*/
def uint64min(codec: Codec[UInt64], min: UInt64): Codec[UInt64] = codec.exmap({
case i if i < min => Attempt.failure(Err("varint was not minimally encoded"))
def minimalvalue[A : Ordering](codec: Codec[A], min: A): Codec[A] = codec.exmap({
case i if i < min => Attempt.failure(Err("value was not minimally encoded"))
case i => Attempt.successful(i)
}, Attempt.successful)

// Bitcoin-style varint codec (CompactSize).
// See https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers for reference.
val varint: Codec[UInt64] = discriminatorWithDefault(
discriminated[UInt64].by(uint8L)
.\(0xff) { case i if i >= UInt64(0x100000000L) => i }(uint64min(uint64L, UInt64(0x100000000L)))
.\(0xfe) { case i if i >= UInt64(0x10000) => i }(uint64min(uint32L.xmap(UInt64(_), _.toBigInt.toLong), UInt64(0x10000)))
.\(0xfd) { case i if i >= UInt64(0xfd) => i }(uint64min(uint16L.xmap(UInt64(_), _.toBigInt.toInt), UInt64(0xfd))),
.\(0xff) { case i if i >= UInt64(0x100000000L) => i }(minimalvalue(uint64L, UInt64(0x100000000L)))
.\(0xfe) { case i if i >= UInt64(0x10000) => i }(minimalvalue(uint32L.xmap(UInt64(_), _.toBigInt.toLong), UInt64(0x10000)))
.\(0xfd) { case i if i >= UInt64(0xfd) => i }(minimalvalue(uint16L.xmap(UInt64(_), _.toBigInt.toInt), UInt64(0xfd))),
uint8L.xmap(UInt64(_), _.toBigInt.toInt)
)

Loading